Logo ROOT   6.21/01
Reference Guide
TProof.cxx
Go to the documentation of this file.
1 // @(#)root/proof:$Id: a2a50e759072c37ccbc65ecbcce735a76de86e95 $
2 // Author: Fons Rademakers 13/02/97
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 /**
12  \defgroup proof PROOF
13 
14  Classes defining the Parallel ROOT Facility, PROOF, a framework for parallel analysis of ROOT TTrees.
15 
16 */
17 
18 /**
19  \defgroup proofkernel PROOF kernel Libraries
20  \ingroup proof
21 
22  The PROOF kernel libraries (libProof, libProofPlayer, libProofDraw) contain the classes defining
23  the kernel of the PROOF facility, i.e. the protocol and the utilities to steer data processing
24  and handling of results.
25 
26 */
27 
28 /** \class TProof
29 \ingroup proofkernel
30 
31 This class controls a Parallel ROOT Facility, PROOF, cluster.
32 It fires the worker servers, it keeps track of how many workers are
33 running, it keeps track of the workers running status, it broadcasts
34 messages to all workers, it collects results, etc.
35 
36 */
37 
38 #include <stdlib.h>
39 #include <fcntl.h>
40 #include <errno.h>
41 #ifdef WIN32
42 # include <io.h>
43 # include <sys/stat.h>
44 # include <sys/types.h>
45 # include "snprintf.h"
46 #else
47 # include <unistd.h>
48 #endif
49 #include <vector>
50 
51 #include "RConfigure.h"
52 #include "Riostream.h"
53 #include "Getline.h"
54 #include "TBrowser.h"
55 #include "TChain.h"
56 #include "TCondor.h"
57 #include "TDSet.h"
58 #include "TError.h"
59 #include "TEnv.h"
60 #include "TEntryList.h"
61 #include "TEventList.h"
62 #include "TFile.h"
63 #include "TFileInfo.h"
64 #include "TFTP.h"
65 #include "THashList.h"
66 #include "TInterpreter.h"
67 #include "TKey.h"
68 #include "TMap.h"
69 #include "TMath.h"
70 #include "TMessage.h"
71 #include "TMonitor.h"
72 #include "TObjArray.h"
73 #include "TObjString.h"
74 #include "TParameter.h"
75 #include "TProof.h"
76 #include "TProofNodeInfo.h"
77 #include "TProofOutputFile.h"
78 #include "TVirtualProofPlayer.h"
79 #include "TVirtualPacketizer.h"
80 #include "TProofServ.h"
81 #include "TPluginManager.h"
82 #include "TQueryResult.h"
83 #include "TRandom.h"
84 #include "TRegexp.h"
85 #include "TROOT.h"
86 #include "TSlave.h"
87 #include "TSocket.h"
88 #include "TSortedList.h"
89 #include "TSystem.h"
90 #include "TTree.h"
91 #include "TUrl.h"
92 #include "TFileCollection.h"
93 #include "TDataSetManager.h"
94 #include "TDataSetManagerFile.h"
95 #include "TMacro.h"
96 #include "TSelector.h"
97 #include "TPRegexp.h"
98 #include "TPackMgr.h"
99 
100 #include <mutex>
101 
103 
104 // Rotating indicator
105 char TProofMergePrg::fgCr[4] = {'-', '\\', '|', '/'};
106 
107 TList *TProof::fgProofEnvList = 0; // List of env vars for proofserv
108 TPluginHandler *TProof::fgLogViewer = 0; // Log viewer handler
109 
111 
112 //----- PROOF Interrupt signal handler -----------------------------------------
113 ////////////////////////////////////////////////////////////////////////////////
114 /// TProof interrupt handler.
115 
117 {
118  if (!fProof->IsTty() || fProof->GetRemoteProtocol() < 22) {
119 
120  // Cannot ask the user : abort any remote processing
122 
123  } else {
124  // Real stop or request to switch to asynchronous?
125  const char *a = 0;
126  if (fProof->GetRemoteProtocol() < 22) {
127  a = Getline("\nSwitch to asynchronous mode not supported remotely:"
128  "\nEnter S/s to stop, Q/q to quit, any other key to continue: ");
129  } else {
130  a = Getline("\nEnter A/a to switch asynchronous, S/s to stop, Q/q to quit,"
131  " any other key to continue: ");
132  }
133  if (a[0] == 'Q' || a[0] == 'S' || a[0] == 'q' || a[0] == 's') {
134 
135  Info("Notify","Processing interrupt signal ... %c", a[0]);
136 
137  // Stop or abort any remote processing
138  Bool_t abort = (a[0] == 'Q' || a[0] == 'q') ? kTRUE : kFALSE;
139  fProof->StopProcess(abort);
140 
141  } else if ((a[0] == 'A' || a[0] == 'a') && fProof->GetRemoteProtocol() >= 22) {
142  // Stop any remote processing
144  }
145  }
146 
147  return kTRUE;
148 }
149 
150 //----- Input handler for messages from TProofServ -----------------------------
151 ////////////////////////////////////////////////////////////////////////////////
152 /// Constructor
153 
155  : TFileHandler(s->GetDescriptor(),1),
156  fSocket(s), fProof(p)
157 {
158 }
159 
160 ////////////////////////////////////////////////////////////////////////////////
161 /// Handle input
162 
164 {
166  return kTRUE;
167 }
168 
169 
170 //------------------------------------------------------------------------------
171 
173 
174 ////////////////////////////////////////////////////////////////////////////////
175 /// Used to sort slaveinfos by ordinal.
176 
178 {
179  if (!obj) return 1;
180 
181  const TSlaveInfo *si = dynamic_cast<const TSlaveInfo*>(obj);
182 
183  if (!si) return fOrdinal.CompareTo(obj->GetName());
184 
185  const char *myord = GetOrdinal();
186  const char *otherord = si->GetOrdinal();
187  while (myord && otherord) {
188  Int_t myval = atoi(myord);
189  Int_t otherval = atoi(otherord);
190  if (myval < otherval) return 1;
191  if (myval > otherval) return -1;
192  myord = strchr(myord, '.');
193  if (myord) myord++;
194  otherord = strchr(otherord, '.');
195  if (otherord) otherord++;
196  }
197  if (myord) return -1;
198  if (otherord) return 1;
199  return 0;
200 }
201 
202 ////////////////////////////////////////////////////////////////////////////////
203 /// Used to compare slaveinfos by ordinal.
204 
206 {
207  if (!obj) return kFALSE;
208  const TSlaveInfo *si = dynamic_cast<const TSlaveInfo*>(obj);
209  if (!si) return kFALSE;
210  return (strcmp(GetOrdinal(), si->GetOrdinal()) == 0);
211 }
212 
213 ////////////////////////////////////////////////////////////////////////////////
214 /// Print slave info. If opt = "active" print only the active
215 /// slaves, if opt="notactive" print only the not active slaves,
216 /// if opt = "bad" print only the bad slaves, else
217 /// print all slaves.
218 
219 void TSlaveInfo::Print(Option_t *opt) const
220 {
221  TString stat = fStatus == kActive ? "active" :
222  fStatus == kBad ? "bad" :
223  "not active";
224 
225  Bool_t newfmt = kFALSE;
226  TString oo(opt);
227  if (oo.Contains("N")) {
228  newfmt = kTRUE;
229  oo.ReplaceAll("N","");
230  }
231  if (oo == "active" && fStatus != kActive) return;
232  if (oo == "notactive" && fStatus != kNotActive) return;
233  if (oo == "bad" && fStatus != kBad) return;
234 
235  if (newfmt) {
236  TString msd, si, datadir;
237  if (!(fMsd.IsNull())) msd.Form("| msd: %s ", fMsd.Data());
238  if (!(fDataDir.IsNull())) datadir.Form("| datadir: %s ", fDataDir.Data());
239  if (fSysInfo.fCpus > 0) {
240  si.Form("| %s, %d cores, %d MB ram", fHostName.Data(),
242  } else {
243  si.Form("| %s", fHostName.Data());
244  }
245  Printf("Worker: %9s %s %s%s| %s", fOrdinal.Data(), si.Data(), msd.Data(), datadir.Data(), stat.Data());
246 
247  } else {
248  TString msd = fMsd.IsNull() ? "<null>" : fMsd.Data();
249 
250  std::cout << "Slave: " << fOrdinal
251  << " hostname: " << fHostName
252  << " msd: " << msd
253  << " perf index: " << fPerfIndex
254  << " " << stat
255  << std::endl;
256  }
257 }
258 
259 ////////////////////////////////////////////////////////////////////////////////
260 /// Setter for fSysInfo
261 
263 {
264  fSysInfo.fOS = si.fOS; // OS
265  fSysInfo.fModel = si.fModel; // computer model
266  fSysInfo.fCpuType = si.fCpuType; // type of cpu
267  fSysInfo.fCpus = si.fCpus; // number of cpus
268  fSysInfo.fCpuSpeed = si.fCpuSpeed; // cpu speed in MHz
269  fSysInfo.fBusSpeed = si.fBusSpeed; // bus speed in MHz
270  fSysInfo.fL2Cache = si.fL2Cache; // level 2 cache size in KB
271  fSysInfo.fPhysRam = si.fPhysRam; // Physical RAM
272 }
273 
275 
276 //------------------------------------------------------------------------------
277 
278 ////////////////////////////////////////////////////////////////////////////////
279 /// Destructor
280 
282 {
283  // Just delete the list, the objects are owned by other list
284  if (fWorkers) {
287  }
288 }
289 ////////////////////////////////////////////////////////////////////////////////
290 /// Increase number of already merged workers by 1
291 
293 {
294  if (AreAllWorkersMerged())
295  Error("SetMergedWorker", "all workers have been already merged before!");
296  else
297  fMergedWorkers++;
298 }
299 
300 ////////////////////////////////////////////////////////////////////////////////
301 /// Add new worker to the list of workers to be merged by this merger
302 
304 {
305  if (!fWorkers)
306  fWorkers = new TList();
307  if (fWorkersToMerge == fWorkers->GetSize()) {
308  Error("AddWorker", "all workers have been already assigned to this merger");
309  return;
310  }
311  fWorkers->Add(sl);
312 }
313 
314 ////////////////////////////////////////////////////////////////////////////////
315 /// Return if merger has already merged all workers, i.e. if it has finished its merging job
316 
318 {
319  return (fWorkersToMerge == fMergedWorkers);
320 }
321 
322 ////////////////////////////////////////////////////////////////////////////////
323 /// Return if the determined number of workers has been already assigned to this merger
324 
326 {
327  if (!fWorkers)
328  return kFALSE;
329 
330  return (fWorkers->GetSize() == fWorkersToMerge);
331 }
332 
333 ////////////////////////////////////////////////////////////////////////////////
334 /// This a private API function.
335 /// It checks whether the connection string contains a PoD cluster protocol.
336 /// If it does, then the connection string will be changed to reflect
337 /// a real PROOF connection string for a PROOF cluster managed by PoD.
338 /// PoD: http://pod.gsi.de .
339 /// Return -1 if the PoD request failed; return 0 otherwise.
340 
341 static Int_t PoDCheckUrl(TString *_cluster)
342 {
343  if ( !_cluster )
344  return 0;
345 
346  // trim spaces from both sides of the string
347  *_cluster = _cluster->Strip( TString::kBoth );
348  // PoD protocol string
349  const TString pod_prot("pod");
350 
351  // URL test
352  // TODO: The URL test is to support remote PoD servers (not managed by pod-remote)
353  TUrl url( _cluster->Data() );
354  if( pod_prot.CompareTo(url.GetProtocol(), TString::kIgnoreCase) )
355  return 0;
356 
357  // PoD cluster is used
358  // call pod-info in a batch mode (-b).
359  // pod-info will find either a local PoD cluster or
360  // a remote one, manged by pod-remote.
361  *_cluster = gSystem->GetFromPipe("pod-info -c -b");
362  if( 0 == _cluster->Length() ) {
363  Error("PoDCheckUrl", "PoD server is not running");
364  return -1;
365  }
366  return 0;
367 }
368 
369 ////////////////////////////////////////////////////////////////////////////////
370 /// Create a PROOF environment. Starting PROOF involves either connecting
371 /// to a master server, which in turn will start a set of slave servers, or
372 /// directly starting as master server (if master = ""). Masterurl is of
373 /// the form: [proof[s]://]host[:port]. Conffile is the name of the config
374 /// file describing the remote PROOF cluster (this argument alows you to
375 /// describe different cluster configurations).
376 /// The default is proof.conf. Confdir is the directory where the config
377 /// file and other PROOF related files are (like motd and noproof files).
378 /// Loglevel is the log level (default = 1). User specified custom config
379 /// files will be first looked for in $HOME/.conffile.
380 
381 TProof::TProof(const char *masterurl, const char *conffile, const char *confdir,
382  Int_t loglevel, const char *alias, TProofMgr *mgr)
383  : fUrl(masterurl)
384 {
385  // Default initializations
386  InitMembers();
387 
388  // This may be needed during init
389  fManager = mgr;
390 
391  // Default server type
393 
394  // Default query mode
395  fQueryMode = kSync;
396 
397  // Parse the main URL, adjusting the missing fields and setting the relevant
398  // bits
401 
402  // Protocol and Host
403  if (!masterurl || strlen(masterurl) <= 0) {
404  fUrl.SetProtocol("proof");
405  fUrl.SetHost("__master__");
406  } else if (!(strstr(masterurl, "://"))) {
407  fUrl.SetProtocol("proof");
408  }
409  // Port
410  if (fUrl.GetPort() == TUrl(" ").GetPort())
411  fUrl.SetPort(TUrl("proof:// ").GetPort());
412 
413  // Make sure to store the FQDN, so to get a solid reference for subsequent checks
414  if (!strcmp(fUrl.GetHost(), "__master__"))
415  fMaster = fUrl.GetHost();
416  else if (!strlen(fUrl.GetHost()))
417  fMaster = gSystem->GetHostByName(gSystem->HostName()).GetHostName();
418  else
419  fMaster = gSystem->GetHostByName(fUrl.GetHost()).GetHostName();
420 
421  // Server type
422  if (strlen(fUrl.GetOptions()) > 0) {
423  TString opts(fUrl.GetOptions());
424  if (!(strncmp(fUrl.GetOptions(),"std",3))) {
426  opts.Remove(0,3);
427  fUrl.SetOptions(opts.Data());
428  } else if (!(strncmp(fUrl.GetOptions(),"lite",4))) {
430  opts.Remove(0,4);
431  fUrl.SetOptions(opts.Data());
432  }
433  }
434 
435  // Instance type
439  if (fMaster == "__master__") {
440  fMasterServ = kTRUE;
443  } else if (fMaster == "prooflite") {
444  // Client and master are merged
445  fMasterServ = kTRUE;
447  }
448  // Flag that we are a client
450  if (!gSystem->Getenv("ROOTPROOFCLIENT")) gSystem->Setenv("ROOTPROOFCLIENT","");
451 
452  Init(masterurl, conffile, confdir, loglevel, alias);
453 
454  // If the user was not set, get it from the master
455  if (strlen(fUrl.GetUser()) <= 0) {
456  TString usr, emsg;
457  if (Exec("gProofServ->GetUser()", "0", kTRUE) == 0) {
458  TObjString *os = fMacroLog.GetLineWith("const char");
459  if (os) {
460  Ssiz_t fst = os->GetString().First('\"');
461  Ssiz_t lst = os->GetString().Last('\"');
462  usr = os->GetString()(fst+1, lst-fst-1);
463  } else {
464  emsg = "could not find 'const char *' string in macro log";
465  }
466  } else {
467  emsg = "could not retrieve user info";
468  }
469  if (!emsg.IsNull()) {
470  // Get user logon name
472  if (pw) {
473  usr = pw->fUser;
474  delete pw;
475  }
476  Warning("TProof", "%s: using local default %s", emsg.Data(), usr.Data());
477  }
478  // Set the user name in the main URL
479  fUrl.SetUser(usr.Data());
480  }
481 
482  // If called by a manager, make sure it stays in last position
483  // for cleaning
484  if (mgr) {
486  gROOT->GetListOfSockets()->Remove(mgr);
487  gROOT->GetListOfSockets()->Add(mgr);
488  }
489 
490  // Old-style server type: we add this to the list and set the global pointer
492  if (!gROOT->GetListOfProofs()->FindObject(this))
493  gROOT->GetListOfProofs()->Add(this);
494 
495  // Still needed by the packetizers: needs to be changed
496  gProof = this;
497 }
498 
499 ////////////////////////////////////////////////////////////////////////////////
500 /// Protected constructor to be used by classes deriving from TProof
501 /// (they have to call Init themselves and override StartSlaves
502 /// appropriately).
503 ///
504 /// This constructor simply closes any previous gProof and sets gProof
505 /// to this instance.
506 
507 TProof::TProof() : fUrl(""), fServType(TProofMgr::kXProofd)
508 {
509  // Default initializations
510  InitMembers();
511 
512  if (!gROOT->GetListOfProofs()->FindObject(this))
513  gROOT->GetListOfProofs()->Add(this);
514 
515  gProof = this;
516 }
517 
518 ////////////////////////////////////////////////////////////////////////////////
519 /// Default initializations
520 
522 {
523  fValid = kFALSE;
524  fTty = kFALSE;
525  fRecvMessages = 0;
526  fSlaveInfo = 0;
530  fLastPollWorkers_s = -1;
531  fActiveSlaves = 0;
532  fInactiveSlaves = 0;
533  fUniqueSlaves = 0;
534  fAllUniqueSlaves = 0;
535  fNonUniqueMasters = 0;
536  fActiveMonitor = 0;
537  fUniqueMonitor = 0;
538  fAllUniqueMonitor = 0;
539  fCurrentMonitor = 0;
540  fBytesRead = 0;
541  fRealTime = 0;
542  fCpuTime = 0;
543  fIntHandler = 0;
544  fProgressDialog = 0;
547  fPlayer = 0;
548  fFeedback = 0;
549  fChains = 0;
550  fDSet = 0;
551  fNotIdle = 0;
552  fSync = kTRUE;
554  fIsWaiting = kFALSE;
555  fRedirLog = kFALSE;
556  fLogFileW = 0;
557  fLogFileR = 0;
560  fMacroLog.SetName("ProofLogMacro");
561 
562  fWaitingSlaves = 0;
563  fQueries = 0;
564  fOtherQueries = 0;
565  fDrawQueries = 0;
566  fMaxDrawQueries = 1;
567  fSeqNum = 0;
568 
569  fSessionID = -1;
570  fEndMaster = kFALSE;
571 
572  fPackMgr = 0;
574 
575  fInputData = 0;
576 
577  fPrintProgress = 0;
578 
579  fLoadedMacros = 0;
580 
581  fProtocol = -1;
582  fSlaves = 0;
584  fBadSlaves = 0;
585  fAllMonitor = 0;
586  fDataReady = kFALSE;
587  fBytesReady = 0;
588  fTotalBytes = 0;
589  fAvailablePackages = 0;
590  fEnabledPackages = 0;
591  fRunningDSets = 0;
592 
593  fCollectTimeout = -1;
594 
595  fManager = 0;
596  fQueryMode = kSync;
598 
601  fMergers = 0;
602  fMergersCount = -1;
604  fWorkersToMerge = 0;
606 
607  fPerfTree = "";
608 
609  fWrksOutputReady = 0;
610 
611  fSelector = 0;
612 
613  fPrepTime = 0.;
614 
615  // Check if the user defined a list of environment variables to send over:
616  // include them into the dedicated list
617  if (gSystem->Getenv("PROOF_ENVVARS")) {
618  TString envs(gSystem->Getenv("PROOF_ENVVARS")), env, envsfound;
619  Int_t from = 0;
620  while (envs.Tokenize(env, from, ",")) {
621  if (!env.IsNull()) {
622  if (!gSystem->Getenv(env)) {
623  Warning("Init", "request for sending over undefined environemnt variable '%s' - ignoring", env.Data());
624  } else {
625  if (!envsfound.IsNull()) envsfound += ",";
626  envsfound += env;
627  TProof::DelEnvVar(env);
628  TProof::AddEnvVar(env, gSystem->Getenv(env));
629  }
630  }
631  }
632  if (envsfound.IsNull()) {
633  Warning("Init", "none of the requested env variables were found: '%s'", envs.Data());
634  } else {
635  Info("Init", "the following environment variables have been added to the list to be sent to the nodes: '%s'", envsfound.Data());
636  }
637  }
638 
639  // Done
640  return;
641 }
642 
643 ////////////////////////////////////////////////////////////////////////////////
644 /// Clean up PROOF environment.
645 
647 {
648  if (fChains) {
649  while (TChain *chain = dynamic_cast<TChain*> (fChains->First()) ) {
650  // remove "chain" from list
651  chain->SetProof(0);
652  RemoveChain(chain);
653  }
654  }
655 
656  // remove links to packages enabled on the client
657  if (TestBit(TProof::kIsClient)) {
658  // iterate over all packages
659  TList *epl = fPackMgr->GetListOfEnabled();
660  TIter nxp(epl);
661  while (TObjString *pck = (TObjString *)(nxp())) {
662  FileStat_t stat;
663  if (gSystem->GetPathInfo(pck->String(), stat) == 0) {
664  // check if symlink, if so unlink
665  // NOTE: GetPathInfo() returns 1 in case of symlink that does not point to
666  // existing file or to a directory, but if fIsLink is true the symlink exists
667  if (stat.fIsLink)
668  gSystem->Unlink(pck->String());
669  }
670  }
671  }
672 
673  Close();
699  if (fWrksOutputReady) {
701  delete fWrksOutputReady;
702  }
703 
704  // remove file with redirected logs
705  if (TestBit(TProof::kIsClient)) {
706  if (fLogFileR)
707  fclose(fLogFileR);
708  if (fLogFileW)
709  fclose(fLogFileW);
710  if (fLogFileName.Length() > 0)
712  }
713 
714  // Remove for the global list
715  gROOT->GetListOfProofs()->Remove(this);
716  // ... and from the manager list
717  if (fManager && fManager->IsValid())
718  fManager->DiscardSession(this);
719 
720  if (gProof && gProof == this) {
721  // Set previous as default
722  TIter pvp(gROOT->GetListOfProofs(), kIterBackward);
723  while ((gProof = (TProof *)pvp())) {
725  break;
726  }
727  }
728 
729  // For those interested in our destruction ...
730  Emit("~TProof()");
731  Emit("CloseWindow()");
732 }
733 
734 ////////////////////////////////////////////////////////////////////////////////
735 /// Start the PROOF environment. Starting PROOF involves either connecting
736 /// to a master server, which in turn will start a set of slave servers, or
737 /// directly starting as master server (if master = ""). For a description
738 /// of the arguments see the TProof ctor. Returns the number of started
739 /// master or slave servers, returns 0 in case of error, in which case
740 /// fValid remains false.
741 
742 Int_t TProof::Init(const char *, const char *conffile,
743  const char *confdir, Int_t loglevel, const char *alias)
744 {
746 
747  fValid = kFALSE;
748 
749  // Connected to terminal?
750  fTty = (isatty(0) == 0 || isatty(1) == 0) ? kFALSE : kTRUE;
751 
752  // If in attach mode, options is filled with additional info
753  Bool_t attach = kFALSE;
754  if (strlen(fUrl.GetOptions()) > 0) {
755  attach = kTRUE;
756  // A flag from the GUI
757  TString opts = fUrl.GetOptions();
758  if (opts.Contains("GUI")) {
760  opts.Remove(opts.Index("GUI"));
761  fUrl.SetOptions(opts);
762  }
763  }
764 
765  if (TestBit(TProof::kIsMaster)) {
766  // Fill default conf file and conf dir
767  if (!conffile || !conffile[0])
769  if (!confdir || !confdir[0])
771  // The group; the client receives it in the kPROOF_SESSIONTAG message
773  } else {
774  fConfDir = confdir;
775  fConfFile = conffile;
776  }
777 
778  // Analysise the conffile field
779  if (fConfFile.Contains("workers=0")) fConfFile.ReplaceAll("workers=0", "masteronly");
781 
783  fLogLevel = loglevel;
786  fImage = fMasterServ ? "" : "<local>";
787  fIntHandler = 0;
788  fStatus = 0;
789  fRecvMessages = new TList;
791  fSlaveInfo = 0;
792  fChains = new TList;
793  fAvailablePackages = 0;
794  fEnabledPackages = 0;
795  fRunningDSets = 0;
797  fInputData = 0;
799  fPrintProgress = 0;
800 
801  // Timeout for some collect actions
802  fCollectTimeout = gEnv->GetValue("Proof.CollectTimeout", -1);
803 
804  // Should the workers be started dynamically; default: no
805  fDynamicStartup = gEnv->GetValue("Proof.DynamicStartup", kFALSE);
806 
807  // Default entry point for the data pool is the master
809  fDataPoolUrl.Form("root://%s", fMaster.Data());
810  else
811  fDataPoolUrl = "";
812 
813  fProgressDialog = 0;
815 
816  // Default alias is the master name
817  TString al = (alias) ? alias : fMaster.Data();
818  SetAlias(al);
819 
820  // Client logging of messages from the master and slaves
821  fRedirLog = kFALSE;
822  if (TestBit(TProof::kIsClient)) {
823  fLogFileName.Form("%s/ProofLog_%d", gSystem->TempDirectory(), gSystem->GetPid());
824  if ((fLogFileW = fopen(fLogFileName, "w")) == 0)
825  Error("Init", "could not create temporary logfile");
826  if ((fLogFileR = fopen(fLogFileName, "r")) == 0)
827  Error("Init", "could not open temp logfile for reading");
828  }
830 
831  // Status of cluster
832  fNotIdle = 0;
833  // Query type
834  fSync = (attach) ? kFALSE : kTRUE;
835  // Not enqueued
836  fIsWaiting = kFALSE;
837 
838  // Counters
839  fBytesRead = 0;
840  fRealTime = 0;
841  fCpuTime = 0;
842 
843  // List of queries
844  fQueries = 0;
845  fOtherQueries = 0;
846  fDrawQueries = 0;
847  fMaxDrawQueries = 1;
848  fSeqNum = 0;
849 
850  // Remote ID of the session
851  fSessionID = -1;
852 
853  // Part of active query
854  fWaitingSlaves = 0;
855 
856  // Make remote PROOF player
857  fPlayer = 0;
858  MakePlayer();
859 
860  fFeedback = new TList;
861  fFeedback->SetOwner();
862  fFeedback->SetName("FeedbackList");
864 
865  // sort slaves by descending performance index
867  fActiveSlaves = new TList;
868  fInactiveSlaves = new TList;
869  fUniqueSlaves = new TList;
870  fAllUniqueSlaves = new TList;
871  fNonUniqueMasters = new TList;
872  fBadSlaves = new TList;
873  fAllMonitor = new TMonitor;
874  fActiveMonitor = new TMonitor;
875  fUniqueMonitor = new TMonitor;
877  fCurrentMonitor = 0;
878 
881 
882  fLoadedMacros = 0;
883  fPackMgr = 0;
884 
885  // Enable optimized sending of streamer infos to use embedded backward/forward
886  // compatibility support between different ROOT versions and different versions of
887  // users classes
888  Bool_t enableSchemaEvolution = gEnv->GetValue("Proof.SchemaEvolution",1);
889  if (enableSchemaEvolution) {
891  } else {
892  Info("TProof", "automatic schema evolution in TMessage explicitly disabled");
893  }
894 
895  if (IsMaster()) {
896  // to make UploadPackage() method work on the master as well.
898  } else {
899 
900  TString sandbox;
901  if (GetSandbox(sandbox, kTRUE) != 0) {
902  Error("Init", "failure asserting sandbox directory %s", sandbox.Data());
903  return 0;
904  }
905 
906  // Package Dir
907  TString packdir = gEnv->GetValue("Proof.PackageDir", "");
908  if (packdir.IsNull())
909  packdir.Form("%s/%s", sandbox.Data(), kPROOF_PackDir);
910  if (AssertPath(packdir, kTRUE) != 0) {
911  Error("Init", "failure asserting directory %s", packdir.Data());
912  return 0;
913  }
914  fPackMgr = new TPackMgr(packdir);
915  if (gDebug > 0)
916  Info("Init", "package directory set to %s", packdir.Data());
917  }
918 
919  if (!IsMaster()) {
920  // List of directories where to look for global packages
921  TString globpack = gEnv->GetValue("Proof.GlobalPackageDirs","");
922  TProofServ::ResolveKeywords(globpack);
923  Int_t nglb = TPackMgr::RegisterGlobalPath(globpack);
924  if (gDebug > 0)
925  Info("Init", " %d global package directories registered", nglb);
926  }
927 
928  // Master may want dynamic startup
929  if (fDynamicStartup) {
930  if (!IsMaster()) {
931  // If on client - start the master
932  if (!StartSlaves(attach))
933  return 0;
934  }
935  } else {
936 
937  // Master Only mode (for operations requiring only the master, e.g. dataset browsing,
938  // result retrieving, ...)
939  Bool_t masterOnly = gEnv->GetValue("Proof.MasterOnly", kFALSE);
940  if (!IsMaster() || !masterOnly) {
941  // Start slaves (the old, static, per-session way)
942  if (!StartSlaves(attach))
943  return 0;
944  // Client: Is Master in dynamic startup mode?
945  if (!IsMaster()) {
946  Int_t dyn = 0;
947  GetRC("Proof.DynamicStartup", dyn);
948  if (dyn != 0) fDynamicStartup = kTRUE;
949  }
950  }
951  }
952  // we are now properly initialized
953  fValid = kTRUE;
954 
955  // De-activate monitor (will be activated in Collect)
957 
958  // By default go into parallel mode
959  Int_t nwrk = GetRemoteProtocol() > 35 ? -1 : 9999;
960  TNamed *n = 0;
961  if (TProof::GetEnvVars() &&
962  (n = (TNamed *) TProof::GetEnvVars()->FindObject("PROOF_NWORKERS"))) {
963  TString s(n->GetTitle());
964  if (s.IsDigit()) nwrk = s.Atoi();
965  }
966  GoParallel(nwrk, attach);
967 
968  // Send relevant initial state to slaves
969  if (!attach)
971  else if (!IsIdle())
972  // redirect log
973  fRedirLog = kTRUE;
974 
975  // Done at this point, the alias will be communicated to the coordinator, if any
977  SetAlias(al);
978 
979  SetActive(kFALSE);
980 
981  if (IsValid()) {
982 
983  // Activate input handler
985 
987  gROOT->GetListOfSockets()->Add(this);
988  }
989 
990  AskParallel();
991 
992  return fActiveSlaves->GetSize();
993 }
994 
995 ////////////////////////////////////////////////////////////////////////////////
996 /// Set the sandbox path from ' Proof.Sandbox' or the alternative var 'rc'.
997 /// Use the existing setting or the default if nothing is found.
998 /// If 'assert' is kTRUE, make also sure that the path exists.
999 /// Return 0 on success, -1 on failure
1000 
1001 Int_t TProof::GetSandbox(TString &sb, Bool_t assert, const char *rc)
1002 {
1003  // Get it from 'rc', if defined
1004  if (rc && strlen(rc)) sb = gEnv->GetValue(rc, sb);
1005  // Or use the default 'rc'
1006  if (sb.IsNull()) sb = gEnv->GetValue("Proof.Sandbox", "");
1007  // If nothing found , use the default
1008  if (sb.IsNull()) sb.Form("~/%s", kPROOF_WorkDir);
1009  // Expand special settings
1010  if (sb == ".") {
1011  sb = gSystem->pwd();
1012  } else if (sb == "..") {
1013  sb = gSystem->GetDirName(gSystem->pwd());
1014  }
1015  gSystem->ExpandPathName(sb);
1016 
1017  // Assert the path, if required
1018  if (assert && AssertPath(sb, kTRUE) != 0) return -1;
1019  // Done
1020  return 0;
1021 }
1022 
1023 ////////////////////////////////////////////////////////////////////////////////
1024 /// The config file field may contain special instructions which need to be
1025 /// parsed at the beginning, e.g. for debug runs with valgrind.
1026 /// Several options can be given separated by a ','
1027 
1028 void TProof::ParseConfigField(const char *config)
1029 {
1030  TString sconf(config), opt;
1031  Ssiz_t from = 0;
1032  Bool_t cpuPin = kFALSE;
1033 
1034  // Analysise the field
1035  const char *cq = (IsLite()) ? "\"" : "";
1036  while (sconf.Tokenize(opt, from, ",")) {
1037  if (opt.IsNull()) continue;
1038 
1039  if (opt.BeginsWith("valgrind")) {
1040  // Any existing valgrind setting? User can give full settings, which we fully respect,
1041  // or pass additional options for valgrind by prefixing 'valgrind_opts:'. For example,
1042  // TProof::AddEnvVar("PROOF_MASTER_WRAPPERCMD", "valgrind_opts:--time-stamp --leak-check=full"
1043  // will add option "--time-stamp --leak-check=full" to our default options
1044  TString mst, top, sub, wrk, all;
1045  TList *envs = fgProofEnvList;
1046  TNamed *n = 0;
1047  if (envs) {
1048  if ((n = (TNamed *) envs->FindObject("PROOF_WRAPPERCMD")))
1049  all = n->GetTitle();
1050  if ((n = (TNamed *) envs->FindObject("PROOF_MASTER_WRAPPERCMD")))
1051  mst = n->GetTitle();
1052  if ((n = (TNamed *) envs->FindObject("PROOF_TOPMASTER_WRAPPERCMD")))
1053  top = n->GetTitle();
1054  if ((n = (TNamed *) envs->FindObject("PROOF_SUBMASTER_WRAPPERCMD")))
1055  sub = n->GetTitle();
1056  if ((n = (TNamed *) envs->FindObject("PROOF_SLAVE_WRAPPERCMD")))
1057  wrk = n->GetTitle();
1058  }
1059  if (all != "" && mst == "") mst = all;
1060  if (all != "" && top == "") top = all;
1061  if (all != "" && sub == "") sub = all;
1062  if (all != "" && wrk == "") wrk = all;
1063  if (all != "" && all.BeginsWith("valgrind_opts:")) {
1064  // The field is used to add an option Reset the setting
1065  Info("ParseConfigField","valgrind run: resetting 'PROOF_WRAPPERCMD':"
1066  " must be set again for next run , if any");
1067  TProof::DelEnvVar("PROOF_WRAPPERCMD");
1068  }
1069  TString var, cmd;
1070  cmd.Form("%svalgrind -v --suppressions=<rootsys>/etc/valgrind-root.supp", cq);
1071  TString mstlab("NO"), wrklab("NO");
1072  Bool_t doMaster = (opt == "valgrind" || (opt.Contains("master") &&
1073  !opt.Contains("topmaster") && !opt.Contains("submaster")))
1074  ? kTRUE : kFALSE;
1075  if (doMaster) {
1076  if (!IsLite()) {
1077  // Check if we have to add a var
1078  if (mst == "" || mst.BeginsWith("valgrind_opts:")) {
1079  mst.ReplaceAll("valgrind_opts:","");
1080  var.Form("%s --log-file=<logfilemst>.valgrind.log %s", cmd.Data(), mst.Data());
1081  TProof::AddEnvVar("PROOF_MASTER_WRAPPERCMD", var);
1082  mstlab = "YES";
1083  } else if (mst != "") {
1084  mstlab = "YES";
1085  }
1086  } else {
1087  if (opt.Contains("master")) {
1088  Warning("ParseConfigField",
1089  "master valgrinding does not make sense for PROOF-Lite: ignoring");
1090  opt.ReplaceAll("master", "");
1091  if (!opt.Contains("workers")) return;
1092  }
1093  if (opt == "valgrind" || opt == "valgrind=") opt = "valgrind=workers";
1094  }
1095  }
1096  if (opt.Contains("topmaster")) {
1097  // Check if we have to add a var
1098  if (top == "" || top.BeginsWith("valgrind_opts:")) {
1099  top.ReplaceAll("valgrind_opts:","");
1100  var.Form("%s --log-file=<logfilemst>.valgrind.log %s", cmd.Data(), top.Data());
1101  TProof::AddEnvVar("PROOF_TOPMASTER_WRAPPERCMD", var);
1102  mstlab = "YES";
1103  } else if (top != "") {
1104  mstlab = "YES";
1105  }
1106  }
1107  if (opt.Contains("submaster")) {
1108  // Check if we have to add a var
1109  if (sub == "" || sub.BeginsWith("valgrind_opts:")) {
1110  sub.ReplaceAll("valgrind_opts:","");
1111  var.Form("%s --log-file=<logfilemst>.valgrind.log %s", cmd.Data(), sub.Data());
1112  TProof::AddEnvVar("PROOF_SUBMASTER_WRAPPERCMD", var);
1113  mstlab = "YES";
1114  } else if (sub != "") {
1115  mstlab = "YES";
1116  }
1117  }
1118  if (opt.Contains("=workers") || opt.Contains("+workers")) {
1119  // Check if we have to add a var
1120  if (wrk == "" || wrk.BeginsWith("valgrind_opts:")) {
1121  wrk.ReplaceAll("valgrind_opts:","");
1122  var.Form("%s --log-file=<logfilewrk>.__valgrind__.log %s%s", cmd.Data(), wrk.Data(), cq);
1123  TProof::AddEnvVar("PROOF_SLAVE_WRAPPERCMD", var);
1124  TString nwrks("2");
1125  Int_t inw = opt.Index('#');
1126  if (inw != kNPOS) {
1127  nwrks = opt(inw+1, opt.Length());
1128  if (!nwrks.IsDigit()) nwrks = "2";
1129  }
1130  // Set the relevant variables
1131  if (!IsLite()) {
1132  TProof::AddEnvVar("PROOF_NWORKERS", nwrks);
1133  } else {
1134  gEnv->SetValue("ProofLite.Workers", nwrks.Atoi());
1135  }
1136  wrklab = nwrks;
1137  // Register the additional worker log in the session file
1138  // (for the master this is done automatically)
1139  TProof::AddEnvVar("PROOF_ADDITIONALLOG", "__valgrind__.log*");
1140  } else if (wrk != "") {
1141  wrklab = "ALL";
1142  }
1143  }
1144  // Increase the relevant timeouts
1145  if (!IsLite()) {
1146  TProof::AddEnvVar("PROOF_INTWAIT", "5000");
1147  gEnv->SetValue("Proof.SocketActivityTimeout", 6000);
1148  } else {
1149  gEnv->SetValue("ProofLite.StartupTimeOut", 5000);
1150  }
1151  // Warn for slowness
1152  Printf(" ");
1153  if (!IsLite()) {
1154  Printf(" ---> Starting a debug run with valgrind (master:%s, workers:%s)", mstlab.Data(), wrklab.Data());
1155  } else {
1156  Printf(" ---> Starting a debug run with valgrind (workers:%s)", wrklab.Data());
1157  }
1158  Printf(" ---> Please be patient: startup may be VERY slow ...");
1159  Printf(" ---> Logs will be available as special tags in the log window (from the progress dialog or TProof::LogViewer()) ");
1160  Printf(" ---> (Reminder: this debug run makes sense only if you are running a debug version of ROOT)");
1161  Printf(" ");
1162 
1163  } else if (opt.BeginsWith("igprof-pp")) {
1164 
1165  // IgProf profiling on master and worker. PROOF does not set the
1166  // environment for you: proper environment variables (like PATH and
1167  // LD_LIBRARY_PATH) should be set externally
1168 
1169  Printf("*** Requested IgProf performance profiling ***");
1170  TString addLogExt = "__igprof.pp__.log";
1171  TString addLogFmt = "igprof -pk -pp -t proofserv.exe -o %s.%s";
1172  TString tmp;
1173 
1174  if (IsLite()) {
1175  addLogFmt.Append("\"");
1176  addLogFmt.Prepend("\"");
1177  }
1178 
1179  tmp.Form(addLogFmt.Data(), "<logfilemst>", addLogExt.Data());
1180  TProof::AddEnvVar("PROOF_MASTER_WRAPPERCMD", tmp.Data());
1181 
1182  tmp.Form(addLogFmt.Data(), "<logfilewrk>", addLogExt.Data());
1183  TProof::AddEnvVar("PROOF_SLAVE_WRAPPERCMD", tmp.Data() );
1184 
1185  TProof::AddEnvVar("PROOF_ADDITIONALLOG", addLogExt.Data());
1186 
1187  } else if (opt.BeginsWith("cpupin=")) {
1188  // Enable CPU pinning. Takes as argument the list of processor IDs
1189  // that will be used in order. Processor IDs are numbered from 0,
1190  // use likwid to see how they are organized. A possible parameter
1191  // format would be:
1192  //
1193  // cpupin=3+4+0+9+10+22+7
1194  //
1195  // Only the specified processor IDs will be used in a round-robin
1196  // fashion, dealing with the fact that you can request more workers
1197  // than the number of processor IDs you have specified.
1198  //
1199  // To use all available processors in their order:
1200  //
1201  // cpupin=*
1202 
1203  opt.Remove(0, 7);
1204 
1205  // Remove any char which is neither a number nor a plus '+'
1206  for (Ssiz_t i=0; i<opt.Length(); i++) {
1207  Char_t c = opt[i];
1208  if ((c != '+') && ((c < '0') || (c > '9')))
1209  opt[i] = '_';
1210  }
1211  opt.ReplaceAll("_", "");
1212  TProof::AddEnvVar("PROOF_SLAVE_CPUPIN_ORDER", opt);
1213  cpuPin = kTRUE;
1214  } else if (opt.BeginsWith("workers=")) {
1215 
1216  // Request for a given number of workers (within the max) or worker
1217  // startup combination:
1218  // workers=5 start max 5 workers (or less, if less are assigned)
1219  // workers=2x start max 2 workers per node (or less, if less are assigned)
1220  opt.ReplaceAll("workers=","");
1221  TProof::AddEnvVar("PROOF_NWORKERS", opt);
1222  }
1223  }
1224 
1225  // In case of PROOF-Lite, enable CPU pinning when requested (Linux only)
1226  #ifdef R__LINUX
1227  if (IsLite() && cpuPin) {
1228  Printf("*** Requested CPU pinning ***");
1229  const TList *ev = GetEnvVars();
1230  const char *pinCmd = "taskset -c <cpupin>";
1231  TString val;
1232  TNamed *p;
1233  if (ev && (p = dynamic_cast<TNamed *>(ev->FindObject("PROOF_SLAVE_WRAPPERCMD")))) {
1234  val = p->GetTitle();
1235  val.Insert(val.Length()-1, " ");
1236  val.Insert(val.Length()-1, pinCmd);
1237  }
1238  else {
1239  val.Form("\"%s\"", pinCmd);
1240  }
1241  TProof::AddEnvVar("PROOF_SLAVE_WRAPPERCMD", val.Data());
1242  }
1243  #endif
1244 }
1245 
1246 ////////////////////////////////////////////////////////////////////////////////
1247 /// Make sure that 'path' exists; if 'writable' is kTRUE, make also sure
1248 /// that the path is writable
1249 
1250 Int_t TProof::AssertPath(const char *inpath, Bool_t writable)
1251 {
1252  if (!inpath || strlen(inpath) <= 0) {
1253  Error("AssertPath", "undefined input path");
1254  return -1;
1255  }
1256 
1257  TString path(inpath);
1258  gSystem->ExpandPathName(path);
1259 
1260  if (gSystem->AccessPathName(path, kFileExists)) {
1261  if (gSystem->mkdir(path, kTRUE) != 0) {
1262  Error("AssertPath", "could not create path %s", path.Data());
1263  return -1;
1264  }
1265  }
1266  // It must be writable
1267  if (gSystem->AccessPathName(path, kWritePermission) && writable) {
1268  if (gSystem->Chmod(path, 0666) != 0) {
1269  Error("AssertPath", "could not make path %s writable", path.Data());
1270  return -1;
1271  }
1272  }
1273 
1274  // Done
1275  return 0;
1276 }
1277 
1278 ////////////////////////////////////////////////////////////////////////////////
1279 /// Set manager and schedule its destruction after this for clean
1280 /// operations.
1281 
1283 {
1284  fManager = mgr;
1285 
1286  if (mgr) {
1288  gROOT->GetListOfSockets()->Remove(mgr);
1289  gROOT->GetListOfSockets()->Add(mgr);
1290  }
1291 }
1292 
1293 ////////////////////////////////////////////////////////////////////////////////
1294 /// Works on the master node only.
1295 /// It starts workers on the machines in workerList and sets the paths,
1296 /// packages and macros as on the master.
1297 /// It is a subbstitute for StartSlaves(...)
1298 /// The code is mostly the master part of StartSlaves,
1299 /// with the parallel startup removed.
1300 
1302 {
1303  if (!IsMaster()) {
1304  Error("AddWorkers", "AddWorkers can only be called on the master!");
1305  return -1;
1306  }
1307 
1308  if (!workerList || !(workerList->GetSize())) {
1309  Error("AddWorkers", "empty list of workers!");
1310  return -2;
1311  }
1312 
1313  // Code taken from master part of StartSlaves with the parllel part removed
1314 
1315  fImage = gProofServ->GetImage();
1316  if (fImage.IsNull())
1317  fImage.Form("%s:%s", TUrl(gSystem->HostName()).GetHostFQDN(), gProofServ->GetWorkDir());
1318 
1319  // Get all workers
1320  UInt_t nSlaves = workerList->GetSize();
1321  UInt_t nSlavesDone = 0;
1322  Int_t ord = 0;
1323 
1324  // Loop over all new workers and start them (if we had already workers it means we are
1325  // increasing parallelism or that is not the first time we are called)
1326  Bool_t goMoreParallel = (fSlaves->GetEntries() > 0) ? kTRUE : kFALSE;
1327 
1328  // A list of TSlave objects for workers that are being added
1329  TList *addedWorkers = new TList();
1330  if (!addedWorkers) {
1331  // This is needed to silence Coverity ...
1332  Error("AddWorkers", "cannot create new list for the workers to be added");
1333  return -2;
1334  }
1335  addedWorkers->SetOwner(kFALSE);
1336  TListIter next(workerList);
1337  TObject *to;
1338  TProofNodeInfo *worker;
1339  TSlaveInfo *dummysi = new TSlaveInfo();
1340  while ((to = next())) {
1341  // Get the next worker from the list
1342  worker = (TProofNodeInfo *)to;
1343 
1344  // Read back worker node info
1345  const Char_t *image = worker->GetImage().Data();
1346  const Char_t *workdir = worker->GetWorkDir().Data();
1347  Int_t perfidx = worker->GetPerfIndex();
1348  Int_t sport = worker->GetPort();
1349  if (sport == -1)
1350  sport = fUrl.GetPort();
1351 
1352  // Create worker server
1353  TString fullord;
1354  if (worker->GetOrdinal().Length() > 0) {
1355  fullord.Form("%s.%s", gProofServ->GetOrdinal(), worker->GetOrdinal().Data());
1356  } else {
1357  fullord.Form("%s.%d", gProofServ->GetOrdinal(), ord);
1358  }
1359 
1360  // Remove worker from the list of workers terminated gracefully
1361  dummysi->SetOrdinal(fullord);
1362  TSlaveInfo *rmsi = (TSlaveInfo *)fTerminatedSlaveInfos->Remove(dummysi);
1363  SafeDelete(rmsi);
1364 
1365  // Create worker server
1366  TString wn(worker->GetNodeName());
1367  if (wn == "localhost" || wn.BeginsWith("localhost.")) wn = gSystem->HostName();
1368  TUrl u(TString::Format("%s:%d", wn.Data(), sport));
1369  // Add group info in the password firdl, if any
1370  if (strlen(gProofServ->GetGroup()) > 0) {
1371  // Set also the user, otherwise the password is not exported
1372  if (strlen(u.GetUser()) <= 0)
1373  u.SetUser(gProofServ->GetUser());
1375  }
1376  TSlave *slave = 0;
1377  if (worker->IsWorker()) {
1378  slave = CreateSlave(u.GetUrl(), fullord, perfidx, image, workdir);
1379  } else {
1380  slave = CreateSubmaster(u.GetUrl(), fullord,
1381  image, worker->GetMsd(), worker->GetNWrks());
1382  }
1383 
1384  // Add to global list (we will add to the monitor list after
1385  // finalizing the server startup)
1386  Bool_t slaveOk = kTRUE;
1387  fSlaves->Add(slave);
1388  if (slave->IsValid()) {
1389  addedWorkers->Add(slave);
1390  } else {
1391  slaveOk = kFALSE;
1392  fBadSlaves->Add(slave);
1393  Warning("AddWorkers", "worker '%s' is invalid", slave->GetOrdinal());
1394  }
1395 
1396  PDB(kGlobal,3)
1397  Info("AddWorkers", "worker on host %s created"
1398  " and added to list (ord: %s)", worker->GetName(), slave->GetOrdinal());
1399 
1400  // Notify opening of connection
1401  nSlavesDone++;
1403  m << TString("Opening connections to workers") << nSlaves
1404  << nSlavesDone << slaveOk;
1405  gProofServ->GetSocket()->Send(m);
1406 
1407  ord++;
1408  } //end of the worker loop
1409  SafeDelete(dummysi);
1410 
1411  // Cleanup
1412  SafeDelete(workerList);
1413 
1414  nSlavesDone = 0;
1415 
1416  // Here we finalize the server startup: in this way the bulk
1417  // of remote operations are almost parallelized
1418  TIter nxsl(addedWorkers);
1419  TSlave *sl = 0;
1420  while ((sl = (TSlave *) nxsl())) {
1421 
1422  // Finalize setup of the server
1423  if (sl->IsValid())
1424  sl->SetupServ(TSlave::kSlave, 0);
1425 
1426  // Monitor good slaves
1427  Bool_t slaveOk = kTRUE;
1428  if (sl->IsValid()) {
1429  fAllMonitor->Add(sl->GetSocket());
1430  PDB(kGlobal,3)
1431  Info("AddWorkers", "worker on host %s finalized"
1432  " and added to list", sl->GetOrdinal());
1433  } else {
1434  slaveOk = kFALSE;
1435  fBadSlaves->Add(sl);
1436  }
1437 
1438  // Notify end of startup operations
1439  nSlavesDone++;
1441  m << TString("Setting up worker servers") << nSlaves
1442  << nSlavesDone << slaveOk;
1443  gProofServ->GetSocket()->Send(m);
1444  }
1445 
1446  // Now set new state on the added workers (on all workers for simplicity)
1447  // use fEnabledPackages, fLoadedMacros,
1448  // gSystem->GetDynamicPath() and gSystem->GetIncludePath()
1449  // no need to load packages that are only loaded and not enabled (dyn mode)
1450  Int_t nwrk = GetRemoteProtocol() > 35 ? -1 : 9999;
1451  TNamed *n = 0;
1452  if (TProof::GetEnvVars() &&
1453  (n = (TNamed *) TProof::GetEnvVars()->FindObject("PROOF_NWORKERS"))) {
1454  TString s(n->GetTitle());
1455  if (s.IsDigit()) nwrk = s.Atoi();
1456  }
1457 
1458  if (fDynamicStartup && goMoreParallel) {
1459 
1460  PDB(kGlobal, 3)
1461  Info("AddWorkers", "will invoke GoMoreParallel()");
1462  Int_t nw = GoMoreParallel(nwrk);
1463  PDB(kGlobal, 3)
1464  Info("AddWorkers", "GoMoreParallel()=%d", nw);
1465 
1466  }
1467  else {
1468  // Not in Dynamic Workers mode
1469  PDB(kGlobal, 3)
1470  Info("AddWorkers", "will invoke GoParallel()");
1471  GoParallel(nwrk, kFALSE, 0);
1472  }
1473 
1474  // Set worker processing environment
1475  SetupWorkersEnv(addedWorkers, goMoreParallel);
1476 
1477  // Update list of current workers
1478  PDB(kGlobal, 3)
1479  Info("AddWorkers", "will invoke SaveWorkerInfo()");
1480  SaveWorkerInfo();
1481 
1482  // Inform the client that the number of workers has changed
1483  if (fDynamicStartup && gProofServ) {
1484  PDB(kGlobal, 3)
1485  Info("AddWorkers", "will invoke SendParallel()");
1487 
1488  if (goMoreParallel && fPlayer) {
1489  // In case we are adding workers dynamically to an existing process, we
1490  // should invoke a special player's Process() to set only added workers
1491  // to the proper state
1492  PDB(kGlobal, 3)
1493  Info("AddWorkers", "will send the PROCESS message to selected workers");
1494  fPlayer->JoinProcess(addedWorkers);
1495  // Update merger counters (new workers are not yet active)
1496  fMergePrg.SetNWrks(fActiveSlaves->GetSize() + addedWorkers->GetSize());
1497  }
1498  }
1499 
1500  // Cleanup
1501  delete addedWorkers;
1502 
1503  return 0;
1504 }
1505 
1506 ////////////////////////////////////////////////////////////////////////////////
1507 /// Set up packages, loaded macros, include and lib paths ...
1508 
1509 void TProof::SetupWorkersEnv(TList *addedWorkers, Bool_t increasingWorkers)
1510 {
1511  // Packages
1513  if (packs && packs->GetSize() > 0) {
1514  TIter nxp(packs);
1515  TPair *pck = 0;
1516  while ((pck = (TPair *) nxp())) {
1517  // Upload and Enable methods are intelligent and avoid
1518  // re-uploading or re-enabling of a package to a node that has it.
1519  if (fDynamicStartup && increasingWorkers) {
1520  // Upload only on added workers
1521  PDB(kGlobal, 3)
1522  Info("SetupWorkersEnv", "will invoke UploadPackage() and EnablePackage() on added workers");
1523  if (UploadPackage(pck->GetName(), kUntar, addedWorkers) >= 0)
1524  EnablePackage(pck->GetName(), (TList *) pck->Value(), kTRUE, addedWorkers);
1525  } else {
1526  PDB(kGlobal, 3)
1527  Info("SetupWorkersEnv", "will invoke UploadPackage() and EnablePackage() on all workers");
1528  if (UploadPackage(pck->GetName()) >= 0)
1529  EnablePackage(pck->GetName(), (TList *) pck->Value(), kTRUE);
1530  }
1531  }
1532  }
1533 
1534  // Loaded macros
1535  if (fLoadedMacros) {
1536  TIter nxp(fLoadedMacros);
1537  TObjString *os = 0;
1538  while ((os = (TObjString *) nxp())) {
1539  PDB(kGlobal, 3) {
1540  Info("SetupWorkersEnv", "will invoke Load() on selected workers");
1541  Printf("Loading a macro : %s", os->GetName());
1542  }
1543  Load(os->GetName(), kTRUE, kTRUE, addedWorkers);
1544  }
1545  }
1546 
1547  // Dynamic path
1548  TString dyn = gSystem->GetDynamicPath();
1549  dyn.ReplaceAll(":", " ");
1550  dyn.ReplaceAll("\"", " ");
1551  PDB(kGlobal, 3)
1552  Info("SetupWorkersEnv", "will invoke AddDynamicPath() on selected workers");
1553  AddDynamicPath(dyn, kFALSE, addedWorkers, kFALSE); // Do not Collect
1554 
1555  // Include path
1556  TString inc = gSystem->GetIncludePath();
1557  inc.ReplaceAll("-I", " ");
1558  inc.ReplaceAll("\"", " ");
1559  PDB(kGlobal, 3)
1560  Info("SetupWorkersEnv", "will invoke AddIncludePath() on selected workers");
1561  AddIncludePath(inc, kFALSE, addedWorkers, kFALSE); // Do not Collect
1562 
1563  // Done
1564  return;
1565 }
1566 
1567 ////////////////////////////////////////////////////////////////////////////////
1568 /// Used for shuting down the workres after a query is finished.
1569 /// Sends each of the workers from the workerList, a kPROOF_STOP message.
1570 /// If the workerList == 0, shutdown all the workers.
1571 
1573 {
1574  if (!IsMaster()) {
1575  Error("RemoveWorkers", "RemoveWorkers can only be called on the master!");
1576  return -1;
1577  }
1578 
1579  fFileMap.clear(); // This could be avoided if CopyFromCache was used in SendFile
1580 
1581  if (!workerList) {
1582  // shutdown all the workers
1583  TIter nxsl(fSlaves);
1584  TSlave *sl = 0;
1585  while ((sl = (TSlave *) nxsl())) {
1586  // Shut down the worker assumig that it is not processing
1587  TerminateWorker(sl);
1588  }
1589 
1590  } else {
1591  if (!(workerList->GetSize())) {
1592  Error("RemoveWorkers", "The list of workers should not be empty!");
1593  return -2;
1594  }
1595 
1596  // Loop over all the workers and stop them
1597  TListIter next(workerList);
1598  TObject *to;
1599  TProofNodeInfo *worker;
1600  while ((to = next())) {
1601  TSlave *sl = 0;
1602  if (!strcmp(to->ClassName(), "TProofNodeInfo")) {
1603  // Get the next worker from the list
1604  worker = (TProofNodeInfo *)to;
1605  TIter nxsl(fSlaves);
1606  while ((sl = (TSlave *) nxsl())) {
1607  // Shut down the worker assumig that it is not processing
1608  if (sl->GetName() == worker->GetNodeName())
1609  break;
1610  }
1611  } else if (to->InheritsFrom(TSlave::Class())) {
1612  sl = (TSlave *) to;
1613  } else {
1614  Warning("RemoveWorkers","unknown object type: %s - it should be"
1615  " TProofNodeInfo or inheriting from TSlave", to->ClassName());
1616  }
1617  // Shut down the worker assumig that it is not processing
1618  if (sl) {
1619  if (gDebug > 0)
1620  Info("RemoveWorkers","terminating worker %s", sl->GetOrdinal());
1621  TerminateWorker(sl);
1622  }
1623  }
1624  }
1625 
1626  // Update also the master counter
1627  if (gProofServ && fSlaves->GetSize() <= 0) gProofServ->ReleaseWorker("master");
1628 
1629  return 0;
1630 }
1631 
1632 ////////////////////////////////////////////////////////////////////////////////
1633 /// Start up PROOF slaves.
1634 
1636 {
1637  // If this is a master server, find the config file and start slave
1638  // servers as specified in the config file
1639  if (TestBit(TProof::kIsMaster)) {
1640 
1641  Int_t pc = 0;
1642  TList *workerList = new TList;
1643  // Get list of workers
1644  if (gProofServ->GetWorkers(workerList, pc) == TProofServ::kQueryStop) {
1645  TString emsg("no resource currently available for this session: please retry later");
1646  if (gDebug > 0) Info("StartSlaves", "%s", emsg.Data());
1647  gProofServ->SendAsynMessage(emsg.Data());
1648  return kFALSE;
1649  }
1650  // Setup the workers
1651  if (AddWorkers(workerList) < 0)
1652  return kFALSE;
1653 
1654  } else {
1655 
1656  // create master server
1657  Printf("Starting master: opening connection ...");
1658  TSlave *slave = CreateSubmaster(fUrl.GetUrl(), "0", "master", 0);
1659 
1660  if (slave->IsValid()) {
1661 
1662  // Notify
1663  fprintf(stderr,"Starting master:"
1664  " connection open: setting up server ... \r");
1665  StartupMessage("Connection to master opened", kTRUE, 1, 1);
1666 
1667  if (!attach) {
1668 
1669  // Set worker interrupt handler
1670  slave->SetInterruptHandler(kTRUE);
1671 
1672  // Finalize setup of the server
1674 
1675  if (slave->IsValid()) {
1676 
1677  // Notify
1678  Printf("Starting master: OK ");
1679  StartupMessage("Master started", kTRUE, 1, 1);
1680 
1681  // check protocol compatibility
1682  // protocol 1 is not supported anymore
1683  if (fProtocol == 1) {
1684  Error("StartSlaves",
1685  "client and remote protocols not compatible (%d and %d)",
1687  slave->Close("S");
1688  delete slave;
1689  return kFALSE;
1690  }
1691 
1692  fSlaves->Add(slave);
1693  fAllMonitor->Add(slave->GetSocket());
1694 
1695  // Unset worker interrupt handler
1696  slave->SetInterruptHandler(kFALSE);
1697 
1698  // Set interrupt PROOF handler from now on
1699  fIntHandler = new TProofInterruptHandler(this);
1700 
1701  // Give-up after 5 minutes
1702  Int_t rc = Collect(slave, 300);
1703  Int_t slStatus = slave->GetStatus();
1704  if (slStatus == -99 || slStatus == -98 || rc == 0) {
1705  fSlaves->Remove(slave);
1706  fAllMonitor->Remove(slave->GetSocket());
1707  if (slStatus == -99)
1708  Error("StartSlaves", "no resources available or problems setting up workers (check logs)");
1709  else if (slStatus == -98)
1710  Error("StartSlaves", "could not setup output redirection on master");
1711  else
1712  Error("StartSlaves", "setting up master");
1713  slave->Close("S");
1714  delete slave;
1715  return 0;
1716  }
1717 
1718  if (!slave->IsValid()) {
1719  fSlaves->Remove(slave);
1720  fAllMonitor->Remove(slave->GetSocket());
1721  slave->Close("S");
1722  delete slave;
1723  Error("StartSlaves",
1724  "failed to setup connection with PROOF master server");
1725  return kFALSE;
1726  }
1727 
1728  if (!gROOT->IsBatch() && TestBit(kUseProgressDialog)) {
1729  if ((fProgressDialog =
1730  gROOT->GetPluginManager()->FindHandler("TProofProgressDialog")))
1731  if (fProgressDialog->LoadPlugin() == -1)
1732  fProgressDialog = 0;
1733  }
1734  } else {
1735  // Notify
1736  Printf("Starting master: failure");
1737  }
1738  } else {
1739 
1740  // Notify
1741  Printf("Starting master: OK ");
1742  StartupMessage("Master attached", kTRUE, 1, 1);
1743 
1744  if (!gROOT->IsBatch() && TestBit(kUseProgressDialog)) {
1745  if ((fProgressDialog =
1746  gROOT->GetPluginManager()->FindHandler("TProofProgressDialog")))
1747  if (fProgressDialog->LoadPlugin() == -1)
1748  fProgressDialog = 0;
1749  }
1750 
1751  fSlaves->Add(slave);
1752  fIntHandler = new TProofInterruptHandler(this);
1753  }
1754 
1755  } else {
1756  delete slave;
1757  // Notify only if verbosity is on: most likely the failure has already been notified
1758  if (gDebug > 0)
1759  Error("StartSlaves", "failed to create (or connect to) the PROOF master server");
1760  return kFALSE;
1761  }
1762  }
1763 
1764  return kTRUE;
1765 }
1766 
1767 ////////////////////////////////////////////////////////////////////////////////
1768 /// Close all open slave servers.
1769 /// Client can decide to shutdown the remote session by passing option is 'S'
1770 /// or 's'. Default for clients is detach, if supported. Masters always
1771 /// shutdown the remote counterpart.
1772 
1774 {
1775  { std::lock_guard<std::recursive_mutex> lock(fCloseMutex);
1776 
1777  fValid = kFALSE;
1778  if (fSlaves) {
1779  if (fIntHandler)
1780  fIntHandler->Remove();
1781 
1782  TIter nxs(fSlaves);
1783  TSlave *sl = 0;
1784  while ((sl = (TSlave *)nxs()))
1785  sl->Close(opt);
1786 
1787  fActiveSlaves->Clear("nodelete");
1788  fUniqueSlaves->Clear("nodelete");
1789  fAllUniqueSlaves->Clear("nodelete");
1790  fNonUniqueMasters->Clear("nodelete");
1791  fBadSlaves->Clear("nodelete");
1792  fInactiveSlaves->Clear("nodelete");
1793  fSlaves->Delete();
1794  }
1795  }
1796 
1798  gROOT->GetListOfSockets()->Remove(this);
1799 
1800  if (fChains) {
1801  while (TChain *chain = dynamic_cast<TChain*> (fChains->First()) ) {
1802  // remove "chain" from list
1803  chain->SetProof(0);
1804  RemoveChain(chain);
1805  }
1806  }
1807 
1808  if (IsProofd()) {
1809 
1810  gROOT->GetListOfProofs()->Remove(this);
1811  if (gProof && gProof == this) {
1812  // Set previous proofd-related as default
1813  TIter pvp(gROOT->GetListOfProofs(), kIterBackward);
1814  while ((gProof = (TProof *)pvp())) {
1815  if (gProof->IsProofd())
1816  break;
1817  }
1818  }
1819  }
1820  }
1821 }
1822 
1823 ////////////////////////////////////////////////////////////////////////////////
1824 /// Create a new TSlave of type TSlave::kSlave.
1825 /// Note: creation of TSlave is private with TProof as a friend.
1826 /// Derived classes must use this function to create slaves.
1827 
1828 TSlave *TProof::CreateSlave(const char *url, const char *ord,
1829  Int_t perf, const char *image, const char *workdir)
1830 {
1831  TSlave* sl = TSlave::Create(url, ord, perf, image,
1832  this, TSlave::kSlave, workdir, 0);
1833 
1834  if (sl->IsValid()) {
1835  sl->SetInputHandler(new TProofInputHandler(this, sl->GetSocket()));
1836  // must set fParallel to 1 for slaves since they do not
1837  // report their fParallel with a LOG_DONE message
1838  sl->fParallel = 1;
1839  }
1840 
1841  return sl;
1842 }
1843 
1844 
1845 ////////////////////////////////////////////////////////////////////////////////
1846 /// Create a new TSlave of type TSlave::kMaster.
1847 /// Note: creation of TSlave is private with TProof as a friend.
1848 /// Derived classes must use this function to create slaves.
1849 
1850 TSlave *TProof::CreateSubmaster(const char *url, const char *ord,
1851  const char *image, const char *msd, Int_t nwk)
1852 {
1853  TSlave *sl = TSlave::Create(url, ord, 100, image, this,
1854  TSlave::kMaster, 0, msd, nwk);
1855 
1856  if (sl->IsValid()) {
1857  sl->SetInputHandler(new TProofInputHandler(this, sl->GetSocket()));
1858  }
1859 
1860  return sl;
1861 }
1862 
1863 ////////////////////////////////////////////////////////////////////////////////
1864 /// Find slave that has TSocket s. Returns 0 in case slave is not found.
1865 
1867 {
1868  TSlave *sl;
1869  TIter next(fSlaves);
1870 
1871  while ((sl = (TSlave *)next())) {
1872  if (sl->IsValid() && sl->GetSocket() == s)
1873  return sl;
1874  }
1875  return 0;
1876 }
1877 
1878 ////////////////////////////////////////////////////////////////////////////////
1879 /// Add to the fUniqueSlave list the active slaves that have a unique
1880 /// (user) file system image. This information is used to transfer files
1881 /// only once to nodes that share a file system (an image). Submasters
1882 /// which are not in fUniqueSlaves are put in the fNonUniqueMasters
1883 /// list. That list is used to trigger the transferring of files to
1884 /// the submaster's unique slaves without the need to transfer the file
1885 /// to the submaster.
1886 
1888 {
1889  fUniqueSlaves->Clear();
1894 
1895  TIter next(fActiveSlaves);
1896 
1897  while (TSlave *sl = dynamic_cast<TSlave*>(next())) {
1898  if (fImage == sl->fImage) {
1899  if (sl->GetSlaveType() == TSlave::kMaster) {
1900  fNonUniqueMasters->Add(sl);
1901  fAllUniqueSlaves->Add(sl);
1902  fAllUniqueMonitor->Add(sl->GetSocket());
1903  }
1904  continue;
1905  }
1906 
1907  TIter next2(fUniqueSlaves);
1908  TSlave *replace_slave = 0;
1909  Bool_t add = kTRUE;
1910  while (TSlave *sl2 = dynamic_cast<TSlave*>(next2())) {
1911  if (sl->fImage == sl2->fImage) {
1912  add = kFALSE;
1913  if (sl->GetSlaveType() == TSlave::kMaster) {
1914  if (sl2->GetSlaveType() == TSlave::kSlave) {
1915  // give preference to master
1916  replace_slave = sl2;
1917  add = kTRUE;
1918  } else if (sl2->GetSlaveType() == TSlave::kMaster) {
1919  fNonUniqueMasters->Add(sl);
1920  fAllUniqueSlaves->Add(sl);
1921  fAllUniqueMonitor->Add(sl->GetSocket());
1922  } else {
1923  Error("FindUniqueSlaves", "TSlave is neither Master nor Slave");
1924  R__ASSERT(0);
1925  }
1926  }
1927  break;
1928  }
1929  }
1930 
1931  if (add) {
1932  fUniqueSlaves->Add(sl);
1933  fAllUniqueSlaves->Add(sl);
1934  fUniqueMonitor->Add(sl->GetSocket());
1935  fAllUniqueMonitor->Add(sl->GetSocket());
1936  if (replace_slave) {
1937  fUniqueSlaves->Remove(replace_slave);
1938  fAllUniqueSlaves->Remove(replace_slave);
1939  fUniqueMonitor->Remove(replace_slave->GetSocket());
1940  fAllUniqueMonitor->Remove(replace_slave->GetSocket());
1941  }
1942  }
1943  }
1944 
1945  // will be actiavted in Collect()
1948 }
1949 
1950 ////////////////////////////////////////////////////////////////////////////////
1951 /// Return number of slaves as described in the config file.
1952 
1954 {
1955  return fSlaves->GetSize();
1956 }
1957 
1958 ////////////////////////////////////////////////////////////////////////////////
1959 /// Return number of active slaves, i.e. slaves that are valid and in
1960 /// the current computing group.
1961 
1963 {
1964  return fActiveSlaves->GetSize();
1965 }
1966 
1967 ////////////////////////////////////////////////////////////////////////////////
1968 /// Return number of inactive slaves, i.e. slaves that are valid but not in
1969 /// the current computing group.
1970 
1972 {
1973  return fInactiveSlaves->GetSize();
1974 }
1975 
1976 ////////////////////////////////////////////////////////////////////////////////
1977 /// Return number of unique slaves, i.e. active slaves that have each a
1978 /// unique different user files system.
1979 
1981 {
1982  return fUniqueSlaves->GetSize();
1983 }
1984 
1985 ////////////////////////////////////////////////////////////////////////////////
1986 /// Return number of bad slaves. This are slaves that we in the config
1987 /// file, but refused to startup or that died during the PROOF session.
1988 
1990 {
1991  return fBadSlaves->GetSize();
1992 }
1993 
1994 ////////////////////////////////////////////////////////////////////////////////
1995 /// Ask the for the statistics of the slaves.
1996 
1998 {
1999  if (!IsValid()) return;
2000 
2003 }
2004 
2005 ////////////////////////////////////////////////////////////////////////////////
2006 /// Get statistics about CPU time, real time and bytes read.
2007 /// If verbose, print the resuls (always available via GetCpuTime(), GetRealTime()
2008 /// and GetBytesRead()
2009 
2011 {
2012  if (fProtocol > 27) {
2013  // This returns the correct result
2014  AskStatistics();
2015  } else {
2016  // AskStatistics is buggy: parse the output of Print()
2017  RedirectHandle_t rh;
2018  gSystem->RedirectOutput(fLogFileName, "a", &rh);
2019  Print();
2020  gSystem->RedirectOutput(0, 0, &rh);
2021  TMacro *mp = GetLastLog();
2022  if (mp) {
2023  // Look for global directories
2024  TIter nxl(mp->GetListOfLines());
2025  TObjString *os = 0;
2026  while ((os = (TObjString *) nxl())) {
2027  TString s(os->GetName());
2028  if (s.Contains("Total MB's processed:")) {
2029  s.ReplaceAll("Total MB's processed:", "");
2030  if (s.IsFloat()) fBytesRead = (Long64_t) s.Atof() * (1024*1024);
2031  } else if (s.Contains("Total real time used (s):")) {
2032  s.ReplaceAll("Total real time used (s):", "");
2033  if (s.IsFloat()) fRealTime = s.Atof();
2034  } else if (s.Contains("Total CPU time used (s):")) {
2035  s.ReplaceAll("Total CPU time used (s):", "");
2036  if (s.IsFloat()) fCpuTime = s.Atof();
2037  }
2038  }
2039  delete mp;
2040  }
2041  }
2042 
2043  if (verbose) {
2044  Printf(" Real/CPU time (s): %.3f / %.3f; workers: %d; processed: %.2f MBs",
2045  GetRealTime(), GetCpuTime(), GetParallel(), float(GetBytesRead())/(1024*1024));
2046  }
2047 }
2048 
2049 ////////////////////////////////////////////////////////////////////////////////
2050 /// Ask the for the number of parallel slaves.
2051 
2053 {
2054  if (!IsValid()) return;
2055 
2058 }
2059 
2060 ////////////////////////////////////////////////////////////////////////////////
2061 /// Ask the master for the list of queries.
2062 
2064 {
2065  if (!IsValid() || TestBit(TProof::kIsMaster)) return (TList *)0;
2066 
2067  Bool_t all = ((strchr(opt,'A') || strchr(opt,'a'))) ? kTRUE : kFALSE;
2069  m << all;
2070  Broadcast(m, kActive);
2072 
2073  // This should have been filled by now
2074  return fQueries;
2075 }
2076 
2077 ////////////////////////////////////////////////////////////////////////////////
2078 /// Number of queries processed by this session
2079 
2081 {
2082  if (fQueries)
2083  return fQueries->GetSize() - fOtherQueries;
2084  return 0;
2085 }
2086 
2087 ////////////////////////////////////////////////////////////////////////////////
2088 /// Set max number of draw queries whose results are saved
2089 
2091 {
2092  if (max > 0) {
2093  if (fPlayer)
2094  fPlayer->SetMaxDrawQueries(max);
2095  fMaxDrawQueries = max;
2096  }
2097 }
2098 
2099 ////////////////////////////////////////////////////////////////////////////////
2100 /// Get max number of queries whose full results are kept in the
2101 /// remote sandbox
2102 
2104 {
2106  m << kFALSE;
2107  Broadcast(m, kActive);
2109 }
2110 
2111 ////////////////////////////////////////////////////////////////////////////////
2112 /// Return pointer to the list of query results in the player
2113 
2115 {
2116  return (fPlayer ? fPlayer->GetListOfResults() : (TList *)0);
2117 }
2118 
2119 ////////////////////////////////////////////////////////////////////////////////
2120 /// Return pointer to the full TQueryResult instance owned by the player
2121 /// and referenced by 'ref'. If ref = 0 or "", return the last query result.
2122 
2124 {
2125  return (fPlayer ? fPlayer->GetQueryResult(ref) : (TQueryResult *)0);
2126 }
2127 
2128 ////////////////////////////////////////////////////////////////////////////////
2129 /// Ask the master for the list of queries.
2130 /// Options:
2131 /// "A" show information about all the queries known to the
2132 /// server, i.e. even those processed by other sessions
2133 /// "L" show only information about queries locally available
2134 /// i.e. already retrieved. If "L" is specified, "A" is
2135 /// ignored.
2136 /// "F" show all details available about queries
2137 /// "H" print help menu
2138 /// Default ""
2139 
2141 {
2142  Bool_t help = ((strchr(opt,'H') || strchr(opt,'h'))) ? kTRUE : kFALSE;
2143  if (help) {
2144 
2145  // Help
2146 
2147  Printf("+++");
2148  Printf("+++ Options: \"A\" show all queries known to server");
2149  Printf("+++ \"L\" show retrieved queries");
2150  Printf("+++ \"F\" full listing of query info");
2151  Printf("+++ \"H\" print this menu");
2152  Printf("+++");
2153  Printf("+++ (case insensitive)");
2154  Printf("+++");
2155  Printf("+++ Use Retrieve(<#>) to retrieve the full"
2156  " query results from the master");
2157  Printf("+++ e.g. Retrieve(8)");
2158 
2159  Printf("+++");
2160 
2161  return;
2162  }
2163 
2164  if (!IsValid()) return;
2165 
2166  Bool_t local = ((strchr(opt,'L') || strchr(opt,'l'))) ? kTRUE : kFALSE;
2167 
2168  TObject *pq = 0;
2169  if (!local) {
2170  GetListOfQueries(opt);
2171 
2172  if (!fQueries) return;
2173 
2174  TIter nxq(fQueries);
2175 
2176  // Queries processed by other sessions
2177  if (fOtherQueries > 0) {
2178  Printf("+++");
2179  Printf("+++ Queries processed during other sessions: %d", fOtherQueries);
2180  Int_t nq = 0;
2181  while (nq++ < fOtherQueries && (pq = nxq()))
2182  pq->Print(opt);
2183  }
2184 
2185  // Queries processed by this session
2186  Printf("+++");
2187  Printf("+++ Queries processed during this session: selector: %d, draw: %d",
2189  while ((pq = nxq()))
2190  pq->Print(opt);
2191 
2192  } else {
2193 
2194  // Queries processed by this session
2195  Printf("+++");
2196  Printf("+++ Queries processed during this session: selector: %d, draw: %d",
2198 
2199  // Queries available locally
2200  TList *listlocal = fPlayer ? fPlayer->GetListOfResults() : (TList *)0;
2201  if (listlocal) {
2202  Printf("+++");
2203  Printf("+++ Queries available locally: %d", listlocal->GetSize());
2204  TIter nxlq(listlocal);
2205  while ((pq = nxlq()))
2206  pq->Print(opt);
2207  }
2208  }
2209  Printf("+++");
2210 }
2211 
2212 ////////////////////////////////////////////////////////////////////////////////
2213 /// See if the data is ready to be analyzed.
2214 
2215 Bool_t TProof::IsDataReady(Long64_t &totalbytes, Long64_t &bytesready)
2216 {
2217  if (!IsValid()) return kFALSE;
2218 
2219  TList submasters;
2220  TIter nextSlave(GetListOfActiveSlaves());
2221  while (TSlave *sl = dynamic_cast<TSlave*>(nextSlave())) {
2222  if (sl->GetSlaveType() == TSlave::kMaster) {
2223  submasters.Add(sl);
2224  }
2225  }
2226 
2227  fDataReady = kTRUE; //see if any submasters set it to false
2228  fBytesReady = 0;
2229  fTotalBytes = 0;
2230  //loop over submasters and see if data is ready
2231  if (submasters.GetSize() > 0) {
2232  Broadcast(kPROOF_DATA_READY, &submasters);
2233  Collect(&submasters);
2234  }
2235 
2236  bytesready = fBytesReady;
2237  totalbytes = fTotalBytes;
2238 
2239  EmitVA("IsDataReady(Long64_t,Long64_t)", 2, totalbytes, bytesready);
2240 
2241  PDB(kGlobal,2)
2242  Info("IsDataReady", "%lld / %lld (%s)",
2243  bytesready, totalbytes, fDataReady?"READY":"NOT READY");
2244 
2245  return fDataReady;
2246 }
2247 
2248 ////////////////////////////////////////////////////////////////////////////////
2249 /// Send interrupt to master or slave servers.
2250 
2252 {
2253  if (!IsValid()) return;
2254 
2255  TList *slaves = 0;
2256  if (list == kAll) slaves = fSlaves;
2257  if (list == kActive) slaves = fActiveSlaves;
2258  if (list == kUnique) slaves = fUniqueSlaves;
2259  if (list == kAllUnique) slaves = fAllUniqueSlaves;
2260 
2261  if (slaves->GetSize() == 0) return;
2262 
2263  TSlave *sl;
2264  TIter next(slaves);
2265 
2266  while ((sl = (TSlave *)next())) {
2267  if (sl->IsValid()) {
2268 
2269  // Ask slave to progate the interrupt request
2270  sl->Interrupt((Int_t)type);
2271  }
2272  }
2273 }
2274 
2275 ////////////////////////////////////////////////////////////////////////////////
2276 /// Returns number of slaves active in parallel mode. Returns 0 in case
2277 /// there are no active slaves. Returns -1 in case of error.
2278 
2280 {
2281  if (!IsValid()) return -1;
2282 
2283  // iterate over active slaves and return total number of slaves
2284  TIter nextSlave(GetListOfActiveSlaves());
2285  Int_t nparallel = 0;
2286  while (TSlave* sl = dynamic_cast<TSlave*>(nextSlave()))
2287  if (sl->GetParallel() >= 0)
2288  nparallel += sl->GetParallel();
2289 
2290  return nparallel;
2291 }
2292 
2293 ////////////////////////////////////////////////////////////////////////////////
2294 /// Returns list of TSlaveInfo's. In case of error return 0.
2295 
2297 {
2298  if (!IsValid()) return 0;
2299 
2300  if (fSlaveInfo == 0) {
2302  fSlaveInfo->SetOwner();
2303  } else {
2304  fSlaveInfo->Delete();
2305  }
2306 
2307  TList masters;
2308  TIter next(GetListOfSlaves());
2309  TSlave *slave;
2310 
2311  while ((slave = (TSlave *) next()) != 0) {
2312  if (slave->GetSlaveType() == TSlave::kSlave) {
2313  const char *name = IsLite() ? gSystem->HostName() : slave->GetName();
2314  TSlaveInfo *slaveinfo = new TSlaveInfo(slave->GetOrdinal(),
2315  name,
2316  slave->GetPerfIdx());
2317  fSlaveInfo->Add(slaveinfo);
2318 
2319  TIter nextactive(GetListOfActiveSlaves());
2320  TSlave *activeslave;
2321  while ((activeslave = (TSlave *) nextactive())) {
2322  if (TString(slaveinfo->GetOrdinal()) == activeslave->GetOrdinal()) {
2323  slaveinfo->SetStatus(TSlaveInfo::kActive);
2324  break;
2325  }
2326  }
2327 
2328  TIter nextbad(GetListOfBadSlaves());
2329  TSlave *badslave;
2330  while ((badslave = (TSlave *) nextbad())) {
2331  if (TString(slaveinfo->GetOrdinal()) == badslave->GetOrdinal()) {
2332  slaveinfo->SetStatus(TSlaveInfo::kBad);
2333  break;
2334  }
2335  }
2336  // Get system info if supported
2337  if (slave->IsValid()) {
2338  if (slave->GetSocket()->Send(kPROOF_GETSLAVEINFO) == -1)
2339  MarkBad(slave, "could not send kPROOF_GETSLAVEINFO message");
2340  else
2341  masters.Add(slave);
2342  }
2343 
2344  } else if (slave->GetSlaveType() == TSlave::kMaster) {
2345  if (slave->IsValid()) {
2346  if (slave->GetSocket()->Send(kPROOF_GETSLAVEINFO) == -1)
2347  MarkBad(slave, "could not send kPROOF_GETSLAVEINFO message");
2348  else
2349  masters.Add(slave);
2350  }
2351  } else {
2352  Error("GetSlaveInfo", "TSlave is neither Master nor Slave");
2353  R__ASSERT(0);
2354  }
2355  }
2356  if (masters.GetSize() > 0) Collect(&masters);
2357 
2358  return fSlaveInfo;
2359 }
2360 
2361 ////////////////////////////////////////////////////////////////////////////////
2362 /// Activate slave server list.
2363 
2365 {
2366  TMonitor *mon = fAllMonitor;
2367  mon->DeActivateAll();
2368 
2369  slaves = !slaves ? fActiveSlaves : slaves;
2370 
2371  TIter next(slaves);
2372  TSlave *sl;
2373  while ((sl = (TSlave*) next())) {
2374  if (sl->IsValid())
2375  mon->Activate(sl->GetSocket());
2376  }
2377 }
2378 
2379 ////////////////////////////////////////////////////////////////////////////////
2380 /// Activate (on == TRUE) or deactivate (on == FALSE) all sockets
2381 /// monitored by 'mon'.
2382 
2384 {
2385  TMonitor *m = (mon) ? mon : fCurrentMonitor;
2386  if (m) {
2387  if (on)
2388  m->ActivateAll();
2389  else
2390  m->DeActivateAll();
2391  }
2392 }
2393 
2394 ////////////////////////////////////////////////////////////////////////////////
2395 /// Broadcast the group priority to all workers in the specified list. Returns
2396 /// the number of workers the message was successfully sent to.
2397 /// Returns -1 in case of error.
2398 
2399 Int_t TProof::BroadcastGroupPriority(const char *grp, Int_t priority, TList *workers)
2400 {
2401  if (!IsValid()) return -1;
2402 
2403  if (workers->GetSize() == 0) return 0;
2404 
2405  int nsent = 0;
2406  TIter next(workers);
2407 
2408  TSlave *wrk;
2409  while ((wrk = (TSlave *)next())) {
2410  if (wrk->IsValid()) {
2411  if (wrk->SendGroupPriority(grp, priority) == -1)
2412  MarkBad(wrk, "could not send group priority");
2413  else
2414  nsent++;
2415  }
2416  }
2417 
2418  return nsent;
2419 }
2420 
2421 ////////////////////////////////////////////////////////////////////////////////
2422 /// Broadcast the group priority to all workers in the specified list. Returns
2423 /// the number of workers the message was successfully sent to.
2424 /// Returns -1 in case of error.
2425 
2426 Int_t TProof::BroadcastGroupPriority(const char *grp, Int_t priority, ESlaves list)
2427 {
2428  TList *workers = 0;
2429  if (list == kAll) workers = fSlaves;
2430  if (list == kActive) workers = fActiveSlaves;
2431  if (list == kUnique) workers = fUniqueSlaves;
2432  if (list == kAllUnique) workers = fAllUniqueSlaves;
2433 
2434  return BroadcastGroupPriority(grp, priority, workers);
2435 }
2436 
2437 ////////////////////////////////////////////////////////////////////////////////
2438 /// Reset the merge progress notificator
2439 
2441 {
2443 }
2444 
2445 ////////////////////////////////////////////////////////////////////////////////
2446 /// Broadcast a message to all slaves in the specified list. Returns
2447 /// the number of slaves the message was successfully sent to.
2448 /// Returns -1 in case of error.
2449 
2450 Int_t TProof::Broadcast(const TMessage &mess, TList *slaves)
2451 {
2452  if (!IsValid()) return -1;
2453 
2454  if (!slaves || slaves->GetSize() == 0) return 0;
2455 
2456  int nsent = 0;
2457  TIter next(slaves);
2458 
2459  TSlave *sl;
2460  while ((sl = (TSlave *)next())) {
2461  if (sl->IsValid()) {
2462  if (sl->GetSocket()->Send(mess) == -1)
2463  MarkBad(sl, "could not broadcast request");
2464  else
2465  nsent++;
2466  }
2467  }
2468 
2469  return nsent;
2470 }
2471 
2472 ////////////////////////////////////////////////////////////////////////////////
2473 /// Broadcast a message to all slaves in the specified list (either
2474 /// all slaves or only the active slaves). Returns the number of slaves
2475 /// the message was successfully sent to. Returns -1 in case of error.
2476 
2478 {
2479  TList *slaves = 0;
2480  if (list == kAll) slaves = fSlaves;
2481  if (list == kActive) slaves = fActiveSlaves;
2482  if (list == kUnique) slaves = fUniqueSlaves;
2483  if (list == kAllUnique) slaves = fAllUniqueSlaves;
2484 
2485  return Broadcast(mess, slaves);
2486 }
2487 
2488 ////////////////////////////////////////////////////////////////////////////////
2489 /// Broadcast a character string buffer to all slaves in the specified
2490 /// list. Use kind to set the TMessage what field. Returns the number of
2491 /// slaves the message was sent to. Returns -1 in case of error.
2492 
2493 Int_t TProof::Broadcast(const char *str, Int_t kind, TList *slaves)
2494 {
2495  TMessage mess(kind);
2496  if (str) mess.WriteString(str);
2497  return Broadcast(mess, slaves);
2498 }
2499 
2500 ////////////////////////////////////////////////////////////////////////////////
2501 /// Broadcast a character string buffer to all slaves in the specified
2502 /// list (either all slaves or only the active slaves). Use kind to
2503 /// set the TMessage what field. Returns the number of slaves the message
2504 /// was sent to. Returns -1 in case of error.
2505 
2506 Int_t TProof::Broadcast(const char *str, Int_t kind, ESlaves list)
2507 {
2508  TMessage mess(kind);
2509  if (str) mess.WriteString(str);
2510  return Broadcast(mess, list);
2511 }
2512 
2513 ////////////////////////////////////////////////////////////////////////////////
2514 /// Broadcast an object to all slaves in the specified list. Use kind to
2515 /// set the TMEssage what field. Returns the number of slaves the message
2516 /// was sent to. Returns -1 in case of error.
2517 
2519 {
2520  TMessage mess(kind);
2521  mess.WriteObject(obj);
2522  return Broadcast(mess, slaves);
2523 }
2524 
2525 ////////////////////////////////////////////////////////////////////////////////
2526 /// Broadcast an object to all slaves in the specified list. Use kind to
2527 /// set the TMEssage what field. Returns the number of slaves the message
2528 /// was sent to. Returns -1 in case of error.
2529 
2531 {
2532  TMessage mess(kind);
2533  mess.WriteObject(obj);
2534  return Broadcast(mess, list);
2535 }
2536 
2537 ////////////////////////////////////////////////////////////////////////////////
2538 /// Broadcast a raw buffer of specified length to all slaves in the
2539 /// specified list. Returns the number of slaves the buffer was sent to.
2540 /// Returns -1 in case of error.
2541 
2542 Int_t TProof::BroadcastRaw(const void *buffer, Int_t length, TList *slaves)
2543 {
2544  if (!IsValid()) return -1;
2545 
2546  if (slaves->GetSize() == 0) return 0;
2547 
2548  int nsent = 0;
2549  TIter next(slaves);
2550 
2551  TSlave *sl;
2552  while ((sl = (TSlave *)next())) {
2553  if (sl->IsValid()) {
2554  if (sl->GetSocket()->SendRaw(buffer, length) == -1)
2555  MarkBad(sl, "could not send broadcast-raw request");
2556  else
2557  nsent++;
2558  }
2559  }
2560 
2561  return nsent;
2562 }
2563 
2564 ////////////////////////////////////////////////////////////////////////////////
2565 /// Broadcast a raw buffer of specified length to all slaves in the
2566 /// specified list. Returns the number of slaves the buffer was sent to.
2567 /// Returns -1 in case of error.
2568 
2569 Int_t TProof::BroadcastRaw(const void *buffer, Int_t length, ESlaves list)
2570 {
2571  TList *slaves = 0;
2572  if (list == kAll) slaves = fSlaves;
2573  if (list == kActive) slaves = fActiveSlaves;
2574  if (list == kUnique) slaves = fUniqueSlaves;
2575  if (list == kAllUnique) slaves = fAllUniqueSlaves;
2576 
2577  return BroadcastRaw(buffer, length, slaves);
2578 }
2579 
2580 ////////////////////////////////////////////////////////////////////////////////
2581 /// Broadcast file to all workers in the specified list. Returns the number of workers
2582 /// the buffer was sent to.
2583 /// Returns -1 in case of error.
2584 
2585 Int_t TProof::BroadcastFile(const char *file, Int_t opt, const char *rfile, TList *wrks)
2586 {
2587  if (!IsValid()) return -1;
2588 
2589  if (wrks->GetSize() == 0) return 0;
2590 
2591  int nsent = 0;
2592  TIter next(wrks);
2593 
2594  TSlave *wrk;
2595  while ((wrk = (TSlave *)next())) {
2596  if (wrk->IsValid()) {
2597  if (SendFile(file, opt, rfile, wrk) < 0)
2598  Error("BroadcastFile",
2599  "problems sending file to worker %s (%s)",
2600  wrk->GetOrdinal(), wrk->GetName());
2601  else
2602  nsent++;
2603  }
2604  }
2605 
2606  return nsent;
2607 }
2608 
2609 ////////////////////////////////////////////////////////////////////////////////
2610 /// Broadcast file to all workers in the specified list. Returns the number of workers
2611 /// the buffer was sent to.
2612 /// Returns -1 in case of error.
2613 
2614 Int_t TProof::BroadcastFile(const char *file, Int_t opt, const char *rfile, ESlaves list)
2615 {
2616  TList *wrks = 0;
2617  if (list == kAll) wrks = fSlaves;
2618  if (list == kActive) wrks = fActiveSlaves;
2619  if (list == kUnique) wrks = fUniqueSlaves;
2620  if (list == kAllUnique) wrks = fAllUniqueSlaves;
2621 
2622  return BroadcastFile(file, opt, rfile, wrks);
2623 }
2624 
2625 ////////////////////////////////////////////////////////////////////////////////
2626 /// Release the used monitor to be used, making sure to delete newly created
2627 /// monitors.
2628 
2630 {
2631  if (mon && (mon != fAllMonitor) && (mon != fActiveMonitor)
2632  && (mon != fUniqueMonitor) && (mon != fAllUniqueMonitor)) {
2633  delete mon;
2634  }
2635 }
2636 
2637 ////////////////////////////////////////////////////////////////////////////////
2638 /// Collect responses from slave sl. Returns the number of slaves that
2639 /// responded (=1).
2640 /// If timeout >= 0, wait at most timeout seconds (timeout = -1 by default,
2641 /// which means wait forever).
2642 /// If defined (>= 0) endtype is the message that stops this collection.
2643 
2644 Int_t TProof::Collect(const TSlave *sl, Long_t timeout, Int_t endtype, Bool_t deactonfail)
2645 {
2646  Int_t rc = 0;
2647 
2648  TMonitor *mon = 0;
2649  if (!sl->IsValid()) return 0;
2650 
2651  if (fCurrentMonitor == fAllMonitor) {
2652  mon = new TMonitor;
2653  } else {
2654  mon = fAllMonitor;
2655  mon->DeActivateAll();
2656  }
2657  mon->Activate(sl->GetSocket());
2658 
2659  rc = Collect(mon, timeout, endtype, deactonfail);
2660  ReleaseMonitor(mon);
2661  return rc;
2662 }
2663 
2664 ////////////////////////////////////////////////////////////////////////////////
2665 /// Collect responses from the slave servers. Returns the number of slaves
2666 /// that responded.
2667 /// If timeout >= 0, wait at most timeout seconds (timeout = -1 by default,
2668 /// which means wait forever).
2669 /// If defined (>= 0) endtype is the message that stops this collection.
2670 
2671 Int_t TProof::Collect(TList *slaves, Long_t timeout, Int_t endtype, Bool_t deactonfail)
2672 {
2673  Int_t rc = 0;
2674 
2675  TMonitor *mon = 0;
2676 
2677  if (fCurrentMonitor == fAllMonitor) {
2678  mon = new TMonitor;
2679  } else {
2680  mon = fAllMonitor;
2681  mon->DeActivateAll();
2682  }
2683  TIter next(slaves);
2684  TSlave *sl;
2685  while ((sl = (TSlave*) next())) {
2686  if (sl->IsValid())
2687  mon->Activate(sl->GetSocket());
2688  }
2689 
2690  rc = Collect(mon, timeout, endtype, deactonfail);
2691  ReleaseMonitor(mon);
2692  return rc;
2693 }
2694 
2695 ////////////////////////////////////////////////////////////////////////////////
2696 /// Collect responses from the slave servers. Returns the number of slaves
2697 /// that responded.
2698 /// If timeout >= 0, wait at most timeout seconds (timeout = -1 by default,
2699 /// which means wait forever).
2700 /// If defined (>= 0) endtype is the message that stops this collection.
2701 
2702 Int_t TProof::Collect(ESlaves list, Long_t timeout, Int_t endtype, Bool_t deactonfail)
2703 {
2704  Int_t rc = 0;
2705  TMonitor *mon = 0;
2706 
2707  if (list == kAll) mon = fAllMonitor;
2708  if (list == kActive) mon = fActiveMonitor;
2709  if (list == kUnique) mon = fUniqueMonitor;
2710  if (list == kAllUnique) mon = fAllUniqueMonitor;
2711  if (fCurrentMonitor == mon) {
2712  // Get a copy
2713  mon = new TMonitor(*mon);
2714  }
2715  mon->ActivateAll();
2716 
2717  rc = Collect(mon, timeout, endtype, deactonfail);
2718  ReleaseMonitor(mon);
2719  return rc;
2720 }
2721 
2722 ////////////////////////////////////////////////////////////////////////////////
2723 /// Collect responses from the slave servers. Returns the number of messages
2724 /// received. Can be 0 if there are no active slaves.
2725 /// If timeout >= 0, wait at most timeout seconds (timeout = -1 by default,
2726 /// which means wait forever).
2727 /// If defined (>= 0) endtype is the message that stops this collection.
2728 /// Collect also stops its execution from time to time to check for new
2729 /// workers in Dynamic Startup mode.
2730 
2731 Int_t TProof::Collect(TMonitor *mon, Long_t timeout, Int_t endtype, Bool_t deactonfail)
2732 {
2733  Int_t collectId = gRandom->Integer(9999);
2734 
2735  PDB(kCollect, 3)
2736  Info("Collect", ">>>>>> Entering collect responses #%04d", collectId);
2737 
2738  // Reset the status flag and clear the messages in the list, if any
2739  fStatus = 0;
2740  fRecvMessages->Clear();
2741 
2742  Long_t actto = (Long_t)(gEnv->GetValue("Proof.SocketActivityTimeout", -1) * 1000);
2743 
2744  if (!mon->GetActive(actto)) return 0;
2745 
2747 
2748  // Used by external code to know what we are monitoring
2749  TMonitor *savedMonitor = 0;
2750  if (fCurrentMonitor) {
2751  savedMonitor = fCurrentMonitor;
2752  fCurrentMonitor = mon;
2753  } else {
2754  fCurrentMonitor = mon;
2755  fBytesRead = 0;
2756  fRealTime = 0.0;
2757  fCpuTime = 0.0;
2758  }
2759 
2760  // We want messages on the main window during synchronous collection,
2761  // but we save the present status to restore it at the end
2762  Bool_t saveRedirLog = fRedirLog;
2763  if (!IsIdle() && !IsSync())
2764  fRedirLog = kFALSE;
2765 
2766  int cnt = 0, rc = 0;
2767 
2768  // Timeout counter
2769  Long_t nto = timeout;
2770  PDB(kCollect, 2)
2771  Info("Collect","#%04d: active: %d", collectId, mon->GetActive());
2772 
2773  // On clients, handle Ctrl-C during collection
2774  if (fIntHandler)
2775  fIntHandler->Add();
2776 
2777  // Sockets w/o activity during the last 'sto' millisecs are deactivated
2778  Int_t nact = 0;
2779  Long_t sto = -1;
2780  Int_t nsto = 60;
2781  Int_t pollint = gEnv->GetValue("Proof.DynamicStartupPollInt", (Int_t) kPROOF_DynWrkPollInt_s);
2782  mon->ResetInterrupt();
2783  while ((nact = mon->GetActive(sto)) && (nto < 0 || nto > 0)) {
2784 
2785  // Dump last waiting sockets, if in debug mode
2786  PDB(kCollect, 2) {
2787  if (nact < 4) {
2788  TList *al = mon->GetListOfActives();
2789  if (al && al->GetSize() > 0) {
2790  Info("Collect"," %d node(s) still active:", al->GetSize());
2791  TIter nxs(al);
2792  TSocket *xs = 0;
2793  while ((xs = (TSocket *)nxs())) {
2794  TSlave *wrk = FindSlave(xs);
2795  if (wrk)
2796  Info("Collect"," %s (%s)", wrk->GetName(), wrk->GetOrdinal());
2797  else
2798  Info("Collect"," %p: %s:%d", xs, xs->GetInetAddress().GetHostName(),
2799  xs->GetInetAddress().GetPort());
2800  }
2801  }
2802  }
2803  }
2804 
2805  // Preemptive poll for new workers on the master only in Dynamic Mode and only
2806  // during processing (TODO: should work on Top Master only)
2808  ((fLastPollWorkers_s == -1) || (time(0)-fLastPollWorkers_s >= pollint))) {
2811  fLastPollWorkers_s = time(0);
2813  PDB(kCollect, 1)
2814  Info("Collect","#%04d: now active: %d", collectId, mon->GetActive());
2815  }
2816 
2817  // Wait for a ready socket
2818  PDB(kCollect, 3)
2819  Info("Collect", "Will invoke Select() #%04d", collectId);
2820  TSocket *s = mon->Select(1000);
2821 
2822  if (s && s != (TSocket *)(-1)) {
2823  // Get and analyse the info it did receive
2824  rc = CollectInputFrom(s, endtype, deactonfail);
2825  if (rc == 1 || (rc == 2 && !savedMonitor)) {
2826  // Deactivate it if we are done with it
2827  mon->DeActivate(s);
2828  PDB(kCollect, 2)
2829  Info("Collect","#%04d: deactivating %p (active: %d, %p)", collectId,
2830  s, mon->GetActive(),
2831  mon->GetListOfActives()->First());
2832  } else if (rc == 2) {
2833  // This end message was for the saved monitor
2834  // Deactivate it if we are done with it
2835  if (savedMonitor) {
2836  savedMonitor->DeActivate(s);
2837  PDB(kCollect, 2)
2838  Info("Collect","save monitor: deactivating %p (active: %d, %p)",
2839  s, savedMonitor->GetActive(),
2840  savedMonitor->GetListOfActives()->First());
2841  }
2842  }
2843 
2844  // Update counter (if no error occured)
2845  if (rc >= 0)
2846  cnt++;
2847  } else {
2848  // If not timed-out, exit if not stopped or not aborted
2849  // (player exits status is finished in such a case); otherwise,
2850  // we still need to collect the partial output info
2851  if (!s)
2853  mon->DeActivateAll();
2854  // Decrease the timeout counter if requested
2855  if (s == (TSocket *)(-1) && nto > 0)
2856  nto--;
2857  }
2858 
2859  // Check if there are workers with ready output to be sent and ask the first to send it
2860  if (IsMaster() && fWrksOutputReady && fWrksOutputReady->GetSize() > 0) {
2861  // Maximum number of concurrent sendings
2862  Int_t mxws = gEnv->GetValue("Proof.ControlSendOutput", 1);
2863  if (TProof::GetParameter(fPlayer->GetInputList(), "PROOF_ControlSendOutput", mxws) != 0)
2864  mxws = gEnv->GetValue("Proof.ControlSendOutput", 1);
2865  TIter nxwr(fWrksOutputReady);
2866  TSlave *wrk = 0;
2867  while (mxws && (wrk = (TSlave *) nxwr())) {
2868  if (!wrk->TestBit(TSlave::kOutputRequested)) {
2869  // Ask worker for output
2870  TMessage sendoutput(kPROOF_SENDOUTPUT);
2871  PDB(kCollect, 2)
2872  Info("Collect", "worker %s was asked to send its output to master",
2873  wrk->GetOrdinal());
2874  if (wrk->GetSocket()->Send(sendoutput) != 1) {
2876  mxws--;
2877  }
2878  } else {
2879  // Count
2880  mxws--;
2881  }
2882  }
2883  }
2884 
2885  // Check if we need to check the socket activity (we do it every 10 cycles ~ 10 sec)
2886  sto = -1;
2887  if (--nsto <= 0) {
2888  sto = (Long_t) actto;
2889  nsto = 60;
2890  }
2891 
2892  } // end loop over active monitors
2893 
2894  // If timed-out, deactivate the remaining sockets
2895  if (nto == 0) {
2896  TList *al = mon->GetListOfActives();
2897  if (al && al->GetSize() > 0) {
2898  // Notify the name of those which did timeout
2899  Info("Collect"," %d node(s) went in timeout:", al->GetSize());
2900  TIter nxs(al);
2901  TSocket *xs = 0;
2902  while ((xs = (TSocket *)nxs())) {
2903  TSlave *wrk = FindSlave(xs);
2904  if (wrk)
2905  Info("Collect"," %s", wrk->GetName());
2906  else
2907  Info("Collect"," %p: %s:%d", xs, xs->GetInetAddress().GetHostName(),
2908  xs->GetInetAddress().GetPort());
2909  }
2910  }
2911  mon->DeActivateAll();
2912  }
2913 
2914  // Deactivate Ctrl-C special handler
2915  if (fIntHandler)
2916  fIntHandler->Remove();
2917 
2918  // make sure group view is up to date
2919  SendGroupView();
2920 
2921  // Restore redirection setting
2922  fRedirLog = saveRedirLog;
2923 
2924  // Restore the monitor
2925  fCurrentMonitor = savedMonitor;
2926 
2928 
2929  PDB(kCollect, 3)
2930  Info("Collect", "<<<<<< Exiting collect responses #%04d", collectId);
2931 
2932  return cnt;
2933 }
2934 
2935 ////////////////////////////////////////////////////////////////////////////////
2936 /// Asks the PROOF Serv for new workers in Dynamic Startup mode and activates
2937 /// them. Returns the number of new workers found, or <0 on errors.
2938 
2940 {
2941  // Requests for worker updates
2942  Int_t dummy = 0;
2943  TList *reqWorkers = new TList();
2944  reqWorkers->SetOwner(kFALSE);
2945 
2946  if (!TestBit(TProof::kIsMaster)) {
2947  Error("PollForNewWorkers", "Can't invoke: not on a master -- should not happen!");
2948  return -1;
2949  }
2950  if (!gProofServ) {
2951  Error("PollForNewWorkers", "No ProofServ available -- should not happen!");
2952  return -1;
2953  }
2954 
2955  gProofServ->GetWorkers(reqWorkers, dummy, kTRUE); // last 2 are dummy
2956 
2957  // List of new workers only (TProofNodeInfo)
2958  TList *newWorkers = new TList();
2959  newWorkers->SetOwner(kTRUE);
2960 
2961  TIter next(reqWorkers);
2962  TProofNodeInfo *ni;
2963  TString fullOrd;
2964  while (( ni = dynamic_cast<TProofNodeInfo *>(next()) )) {
2965 
2966  // Form the full ordinal
2967  fullOrd.Form("%s.%s", gProofServ->GetOrdinal(), ni->GetOrdinal().Data());
2968 
2969  TIter nextInner(fSlaves);
2970  TSlave *sl;
2971  Bool_t found = kFALSE;
2972  while (( sl = dynamic_cast<TSlave *>(nextInner()) )) {
2973  if ( strcmp(sl->GetOrdinal(), fullOrd.Data()) == 0 ) {
2974  found = kTRUE;
2975  break;
2976  }
2977  }
2978 
2979  if (found) delete ni;
2980  else {
2981  newWorkers->Add(ni);
2982  PDB(kGlobal, 1)
2983  Info("PollForNewWorkers", "New worker found: %s:%s",
2984  ni->GetNodeName().Data(), fullOrd.Data());
2985  }
2986  }
2987 
2988  delete reqWorkers; // not owner
2989 
2990  Int_t nNewWorkers = newWorkers->GetEntries();
2991 
2992  // Add the new workers
2993  if (nNewWorkers > 0) {
2994  PDB(kGlobal, 1)
2995  Info("PollForNewWorkers", "Requesting to add %d new worker(s)", newWorkers->GetEntries());
2996  Int_t rv = AddWorkers(newWorkers);
2997  if (rv < 0) {
2998  Error("PollForNewWorkers", "Call to AddWorkers() failed (got %d < 0)", rv);
2999  return -1;
3000  }
3001  // Don't delete newWorkers: AddWorkers() will do that
3002  }
3003  else {
3004  PDB(kGlobal, 2)
3005  Info("PollForNewWorkers", "No new worker found");
3006  delete newWorkers;
3007  }
3008 
3009  return nNewWorkers;
3010 }
3011 
3012 ////////////////////////////////////////////////////////////////////////////////
3013 /// Remove links to objects in list 'ol' from gDirectory
3014 
3016 {
3017  if (ol) {
3018  TIter nxo(ol);
3019  TObject *o = 0;
3020  while ((o = nxo()))
3021  gDirectory->RecursiveRemove(o);
3022  }
3023 }
3024 
3025 ////////////////////////////////////////////////////////////////////////////////
3026 /// Collect and analyze available input from socket s.
3027 /// Returns 0 on success, -1 if any failure occurs.
3028 
3030 {
3031  TMessage *mess;
3032 
3033  Int_t recvrc = 0;
3034  if ((recvrc = s->Recv(mess)) < 0) {
3035  PDB(kCollect,2)
3036  Info("CollectInputFrom","%p: got %d from Recv()", s, recvrc);
3037  Bool_t bad = kTRUE;
3038  if (recvrc == -5) {
3039  // Broken connection: try reconnection
3041  if (s->Reconnect() == 0) {
3043  bad = kFALSE;
3044  }
3045  }
3046  if (bad)
3047  MarkBad(s, "problems receiving a message in TProof::CollectInputFrom(...)");
3048  // Ignore this wake up
3049  return -1;
3050  }
3051  if (!mess) {
3052  // we get here in case the remote server died
3053  MarkBad(s, "undefined message in TProof::CollectInputFrom(...)");
3054  return -1;
3055  }
3056  Int_t rc = 0;
3057 
3058  Int_t what = mess->What();
3059  TSlave *sl = FindSlave(s);
3060  rc = HandleInputMessage(sl, mess, deactonfail);
3061  if (rc == 1 && (endtype >= 0) && (what != endtype))
3062  // This message was for the base monitor in recursive case
3063  rc = 2;
3064 
3065  // We are done successfully
3066  return rc;
3067 }
3068 
3069 ////////////////////////////////////////////////////////////////////////////////
3070 /// Analyze the received message.
3071 /// Returns 0 on success (1 if this the last message from this socket), -1 if
3072 /// any failure occurs.
3073 
3075 {
3076  char str[512];
3077  TObject *obj;
3078  Int_t rc = 0;
3079 
3080  if (!mess || !sl) {
3081  Warning("HandleInputMessage", "given an empty message or undefined worker");
3082  return -1;
3083  }
3084  Bool_t delete_mess = kTRUE;
3085  TSocket *s = sl->GetSocket();
3086  if (!s) {
3087  Warning("HandleInputMessage", "worker socket is undefined");
3088  return -1;
3089  }
3090 
3091  // The message type
3092  Int_t what = mess->What();
3093 
3094  PDB(kCollect,3)
3095  Info("HandleInputMessage", "got type %d from '%s'", what, sl->GetOrdinal());
3096 
3097  switch (what) {
3098 
3099  case kMESS_OK:
3100  // Add the message to the list
3101  fRecvMessages->Add(mess);
3102  delete_mess = kFALSE;
3103  break;
3104 
3105  case kMESS_OBJECT:
3106  if (fPlayer) fPlayer->HandleRecvHisto(mess);
3107  break;
3108 
3109  case kPROOF_FATAL:
3110  { TString msg;
3111  if ((mess->BufferSize() > mess->Length()))
3112  (*mess) >> msg;
3113  if (msg.IsNull()) {
3114  MarkBad(s, "received kPROOF_FATAL");
3115  } else {
3116  MarkBad(s, msg);
3117  }
3118  }
3119  if (fProgressDialogStarted) {
3120  // Finalize the progress dialog
3121  Emit("StopProcess(Bool_t)", kTRUE);
3122  }
3123  break;
3124 
3125  case kPROOF_STOP:
3126  // Stop collection from this worker
3127  Info("HandleInputMessage", "received kPROOF_STOP from %s: disabling any further collection this worker",
3128  sl->GetOrdinal());
3129  rc = 1;
3130  break;
3131 
3132  case kPROOF_GETTREEHEADER:
3133  // Add the message to the list
3134  fRecvMessages->Add(mess);
3135  delete_mess = kFALSE;
3136  rc = 1;
3137  break;
3138 
3139  case kPROOF_TOUCH:
3140  // send a request for touching the remote admin file
3141  {
3142  sl->Touch();
3143  }
3144  break;
3145 
3146  case kPROOF_GETOBJECT:
3147  // send slave object it asks for
3148  mess->ReadString(str, sizeof(str));
3149  obj = gDirectory->Get(str);
3150  if (obj)
3151  s->SendObject(obj);
3152  else
3153  s->Send(kMESS_NOTOK);
3154  break;
3155 
3156  case kPROOF_GETPACKET:
3157  {
3158  PDB(kGlobal,2)
3159  Info("HandleInputMessage","%s: kPROOF_GETPACKET", sl->GetOrdinal());
3160  TDSetElement *elem = 0;
3161  elem = fPlayer ? fPlayer->GetNextPacket(sl, mess) : 0;
3162 
3163  if (elem != (TDSetElement*) -1) {
3164  TMessage answ(kPROOF_GETPACKET);
3165  answ << elem;
3166  s->Send(answ);
3167 
3168  while (fWaitingSlaves != 0 && fWaitingSlaves->GetSize()) {
3169  TPair *p = (TPair*) fWaitingSlaves->First();
3170  s = (TSocket*) p->Key();
3171  TMessage *m = (TMessage*) p->Value();
3172 
3173  elem = fPlayer ? fPlayer->GetNextPacket(sl, m) : 0;
3174  if (elem != (TDSetElement*) -1) {
3176  a << elem;
3177  s->Send(a);
3178  // remove has to happen via Links because TPair does not have
3179  // a Compare() function and therefore RemoveFirst() and
3180  // Remove(TObject*) do not work
3182  delete p;
3183  delete m;
3184  } else {
3185  break;
3186  }
3187  }
3188  } else {
3189  if (fWaitingSlaves == 0) fWaitingSlaves = new TList;
3190  fWaitingSlaves->Add(new TPair(s, mess));
3191  delete_mess = kFALSE;
3192  }
3193  }
3194  break;
3195 
3196  case kPROOF_LOGFILE:
3197  {
3198  Int_t size;
3199  (*mess) >> size;
3200  PDB(kGlobal,2)
3201  Info("HandleInputMessage","%s: kPROOF_LOGFILE: size: %d", sl->GetOrdinal(), size);
3202  RecvLogFile(s, size);
3203  }
3204  break;
3205 
3206  case kPROOF_LOGDONE:
3207  (*mess) >> sl->fStatus >> sl->fParallel;
3208  PDB(kCollect,2)
3209  Info("HandleInputMessage","%s: kPROOF_LOGDONE: status %d parallel %d",
3210  sl->GetOrdinal(), sl->fStatus, sl->fParallel);
3211  if (sl->fStatus != 0) {
3212  // Return last nonzero status
3213  fStatus = sl->fStatus;
3214  // Deactivate the worker, if required
3215  if (deactonfail) DeactivateWorker(sl->fOrdinal);
3216  }
3217  // Remove from the workers-ready list
3220  fWrksOutputReady->Remove(sl);
3221  }
3222  rc = 1;
3223  break;
3224 
3225  case kPROOF_GETSTATS:
3226  {
3227  (*mess) >> sl->fBytesRead >> sl->fRealTime >> sl->fCpuTime
3228  >> sl->fWorkDir >> sl->fProofWorkDir;
3229  PDB(kCollect,2)
3230  Info("HandleInputMessage", "kPROOF_GETSTATS: %s", sl->fWorkDir.Data());
3231  TString img;
3232  if ((mess->BufferSize() > mess->Length()))
3233  (*mess) >> img;
3234  // Set image
3235  if (img.IsNull()) {
3236  if (sl->fImage.IsNull())
3237  sl->fImage.Form("%s:%s", TUrl(sl->fName).GetHostFQDN(),
3238  sl->fProofWorkDir.Data());
3239  } else {
3240  sl->fImage = img;
3241  }
3242  PDB(kGlobal,2)
3243  Info("HandleInputMessage",
3244  "kPROOF_GETSTATS:%s image: %s", sl->GetOrdinal(), sl->GetImage());
3245 
3246  fBytesRead += sl->fBytesRead;
3247  fRealTime += sl->fRealTime;
3248  fCpuTime += sl->fCpuTime;
3249  rc = 1;
3250  }
3251  break;
3252 
3253  case kPROOF_GETPARALLEL:
3254  {
3255  Bool_t async = kFALSE;
3256  (*mess) >> sl->fParallel;
3257  if ((mess->BufferSize() > mess->Length()))
3258  (*mess) >> async;
3259  rc = (async) ? 0 : 1;
3260  }
3261  break;
3262 
3263  case kPROOF_CHECKFILE:
3264  { // New servers (>= 5.22) send the status
3265  if ((mess->BufferSize() > mess->Length())) {
3266  (*mess) >> fCheckFileStatus;
3267  } else {
3268  // Form old servers this meant success (failure was signaled with the
3269  // dangerous kPROOF_FATAL)
3270  fCheckFileStatus = 1;
3271  }
3272  rc = 1;
3273  }
3274  break;
3275 
3276  case kPROOF_SENDFILE:
3277  { // New server: signals ending of sendfile operation
3278  rc = 1;
3279  }
3280  break;
3281 
3282  case kPROOF_PACKAGE_LIST:
3283  {
3284  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_PACKAGE_LIST: enter");
3285  Int_t type = 0;
3286  (*mess) >> type;
3287  switch (type) {
3291  if (fEnabledPackages) {
3293  } else {
3294  Error("HandleInputMessage",
3295  "kPROOF_PACKAGE_LIST: kListEnabledPackages: TList not found in message!");
3296  }
3297  break;
3298  case TProof::kListPackages:
3301  if (fAvailablePackages) {
3303  } else {
3304  Error("HandleInputMessage",
3305  "kPROOF_PACKAGE_LIST: kListPackages: TList not found in message!");
3306  }
3307  break;
3308  default:
3309  Error("HandleInputMessage", "kPROOF_PACKAGE_LIST: unknown type: %d", type);
3310  }
3311  }
3312  break;
3313 
3314  case kPROOF_SENDOUTPUT:
3315  {
3316  // We start measuring the merging time
3317  fPlayer->SetMerging();
3318 
3319  // Worker is ready to send output: make sure the relevant bit is reset
3321  PDB(kGlobal,2)
3322  Info("HandleInputMessage","kPROOF_SENDOUTPUT: enter (%s)", sl->GetOrdinal());
3323  // Create the list if not yet done
3324  if (!fWrksOutputReady) {
3325  fWrksOutputReady = new TList;
3327  }
3328  fWrksOutputReady->Add(sl);
3329  }
3330  break;
3331 
3332  case kPROOF_OUTPUTOBJECT:
3333  {
3334  // We start measuring the merging time
3335  fPlayer->SetMerging();
3336 
3337  PDB(kGlobal,2)
3338  Info("HandleInputMessage","kPROOF_OUTPUTOBJECT: enter");
3339  Int_t type = 0;
3340  const char *prefix = gProofServ ? gProofServ->GetPrefix() : "Lite-0";
3342  Info("HandleInputMessage", "finalization on %s started ...", prefix);
3344  }
3345 
3346  while ((mess->BufferSize() > mess->Length())) {
3347  (*mess) >> type;
3348  // If a query result header, add it to the player list
3349  if (fPlayer) {
3350  if (type == 0) {
3351  // Retrieve query result instance (output list not filled)
3352  TQueryResult *pq =
3354  if (pq) {
3355  // Add query to the result list in TProofPlayer
3356  fPlayer->AddQueryResult(pq);
3357  fPlayer->SetCurrentQuery(pq);
3358  // And clear the output list, as we start merging a new set of results
3359  if (fPlayer->GetOutputList())
3360  fPlayer->GetOutputList()->Clear();
3361  // Add the unique query tag as TNamed object to the input list
3362  // so that it is available in TSelectors for monitoring
3363  TString qid = TString::Format("%s:%s",pq->GetTitle(),pq->GetName());
3364  if (fPlayer->GetInputList()->FindObject("PROOF_QueryTag"))
3365  fPlayer->GetInputList()->Remove(fPlayer->GetInputList()->FindObject("PROOF_QueryTag"));
3366  fPlayer->AddInput(new TNamed("PROOF_QueryTag", qid.Data()));
3367  } else {
3368  Warning("HandleInputMessage","kPROOF_OUTPUTOBJECT: query result missing");
3369  }
3370  } else if (type > 0) {
3371  // Read object
3372  TObject *o = mess->ReadObject(TObject::Class());
3373  // Increment counter on the client side
3375  TString msg;
3376  Bool_t changed = kFALSE;
3377  msg.Form("%s: merging output objects ... %s", prefix, fMergePrg.Export(changed));
3378  if (gProofServ) {
3380  } else if (IsTty() || changed) {
3381  fprintf(stderr, "%s\r", msg.Data());
3382  }
3383  // Add or merge it
3384  if ((fPlayer->AddOutputObject(o) == 1)) {
3385  // Remove the object if it has been merged
3386  SafeDelete(o);
3387  }
3388  if (type > 1) {
3389  // Update the merger progress info
3391  if (TestBit(TProof::kIsClient) && !IsLite()) {
3392  // In PROOFLite this has to be done once only in TProofLite::Process
3394  if (pq) {
3396  // Add input objects (do not override remote settings, if any)
3397  TObject *xo = 0;
3398  TIter nxin(fPlayer->GetInputList());
3399  // Servers prior to 5.28/00 do not create the input list in the TQueryResult
3400  if (!pq->GetInputList()) pq->SetInputList(new TList());
3401  while ((xo = nxin()))
3402  if (!pq->GetInputList()->FindObject(xo->GetName()))
3403  pq->AddInput(xo->Clone());
3404  // If the last object, notify the GUI that the result arrived
3405  QueryResultReady(TString::Format("%s:%s", pq->GetTitle(), pq->GetName()));
3406  }
3407  // Processing is over
3408  UpdateDialog();
3409  }
3410  }
3411  }
3412  } else {
3413  Warning("HandleInputMessage", "kPROOF_OUTPUTOBJECT: player undefined!");
3414  }
3415  }
3416  }
3417  break;
3418 
3419  case kPROOF_OUTPUTLIST:
3420  {
3421  // We start measuring the merging time
3422 
3423  PDB(kGlobal,2)
3424  Info("HandleInputMessage","%s: kPROOF_OUTPUTLIST: enter", sl->GetOrdinal());
3425  TList *out = 0;
3426  if (fPlayer) {
3427  fPlayer->SetMerging();
3428  if (TestBit(TProof::kIsMaster) || fProtocol < 7) {
3429  out = (TList *) mess->ReadObject(TList::Class());
3430  } else {
3431  TQueryResult *pq =
3433  if (pq) {
3434  // Add query to the result list in TProofPlayer
3435  fPlayer->AddQueryResult(pq);
3436  fPlayer->SetCurrentQuery(pq);
3437  // To avoid accidental cleanups from anywhere else
3438  // remove objects from gDirectory and clone the list
3439  out = pq->GetOutputList();
3440  CleanGDirectory(out);
3441  out = (TList *) out->Clone();
3442  // Notify the GUI that the result arrived
3443  QueryResultReady(TString::Format("%s:%s", pq->GetTitle(), pq->GetName()));
3444  } else {
3445  PDB(kGlobal,2)
3446  Info("HandleInputMessage",
3447  "%s: kPROOF_OUTPUTLIST: query result missing", sl->GetOrdinal());
3448  }
3449  }
3450  if (out) {
3451  out->SetOwner();
3452  fPlayer->AddOutput(out); // Incorporate the list
3453  SafeDelete(out);
3454  } else {
3455  PDB(kGlobal,2)
3456  Info("HandleInputMessage",
3457  "%s: kPROOF_OUTPUTLIST: outputlist is empty", sl->GetOrdinal());
3458  }
3459  } else {
3460  Warning("HandleInputMessage",
3461  "%s: kPROOF_OUTPUTLIST: player undefined!", sl->GetOrdinal());
3462  }
3463  // On clients at this point processing is over
3464  if (TestBit(TProof::kIsClient) && !IsLite())
3465  UpdateDialog();
3466  }
3467  break;
3468 
3469  case kPROOF_QUERYLIST:
3470  {
3471  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_QUERYLIST: enter");
3472  (*mess) >> fOtherQueries >> fDrawQueries;
3473  if (fQueries) {
3474  fQueries->Delete();
3475  delete fQueries;
3476  fQueries = 0;
3477  }
3478  fQueries = (TList *) mess->ReadObject(TList::Class());
3479  }
3480  break;
3481 
3482  case kPROOF_RETRIEVE:
3483  {
3484  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_RETRIEVE: enter");
3485  TQueryResult *pq =
3487  if (pq && fPlayer) {
3488  fPlayer->AddQueryResult(pq);
3489  // Notify the GUI that the result arrived
3490  QueryResultReady(TString::Format("%s:%s", pq->GetTitle(), pq->GetName()));
3491  } else {
3492  PDB(kGlobal,2)
3493  Info("HandleInputMessage",
3494  "kPROOF_RETRIEVE: query result missing or player undefined");
3495  }
3496  }
3497  break;
3498 
3499  case kPROOF_MAXQUERIES:
3500  {
3501  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_MAXQUERIES: enter");
3502  Int_t max = 0;
3503 
3504  (*mess) >> max;
3505  Printf("Number of queries fully kept remotely: %d", max);
3506  }
3507  break;
3508 
3509  case kPROOF_SERVERSTARTED:
3510  {
3511  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_SERVERSTARTED: enter");
3512 
3513  UInt_t tot = 0, done = 0;
3514  TString action;
3515  Bool_t st = kTRUE;
3516 
3517  (*mess) >> action >> tot >> done >> st;
3518 
3519  if (TestBit(TProof::kIsClient)) {
3520  if (tot) {
3521  TString type = (action.Contains("submas")) ? "submasters"
3522  : "workers";
3523  Int_t frac = (Int_t) (done*100.)/tot;
3524  char msg[512] = {0};
3525  if (frac >= 100) {
3526  snprintf(msg, 512, "%s: OK (%d %s) \n",
3527  action.Data(),tot, type.Data());
3528  } else {
3529  snprintf(msg, 512, "%s: %d out of %d (%d %%)\r",
3530  action.Data(), done, tot, frac);
3531  }
3532  if (fSync)
3533  fprintf(stderr,"%s", msg);
3534  else
3535  NotifyLogMsg(msg, 0);
3536  }
3537  // Notify GUIs
3538  StartupMessage(action.Data(), st, (Int_t)done, (Int_t)tot);
3539  } else {
3540 
3541  // Just send the message one level up
3543  m << action << tot << done << st;
3544  gProofServ->GetSocket()->Send(m);
3545  }
3546  }
3547  break;
3548 
3549  case kPROOF_DATASET_STATUS:
3550  {
3551  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_DATASET_STATUS: enter");
3552 
3553  UInt_t tot = 0, done = 0;
3554  TString action;
3555  Bool_t st = kTRUE;
3556 
3557  (*mess) >> action >> tot >> done >> st;
3558 
3559  if (TestBit(TProof::kIsClient)) {
3560  if (tot) {
3561  TString type = "files";
3562  Int_t frac = (Int_t) (done*100.)/tot;
3563  char msg[512] = {0};
3564  if (frac >= 100) {
3565  snprintf(msg, 512, "%s: OK (%d %s) \n",
3566  action.Data(),tot, type.Data());
3567  } else {
3568  snprintf(msg, 512, "%s: %d out of %d (%d %%)\r",
3569  action.Data(), done, tot, frac);
3570  }
3571  if (fSync)
3572  fprintf(stderr,"%s", msg);
3573  else
3574  NotifyLogMsg(msg, 0);
3575  }
3576  // Notify GUIs
3577  DataSetStatus(action.Data(), st, (Int_t)done, (Int_t)tot);
3578  } else {
3579 
3580  // Just send the message one level up
3582  m << action << tot << done << st;
3583  gProofServ->GetSocket()->Send(m);
3584  }
3585  }
3586  break;
3587 
3588  case kPROOF_STARTPROCESS:
3589  {
3590  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_STARTPROCESS: enter");
3591 
3592  // For Proof-Lite this variable is the number of workers and is set
3593  // by the player
3594  if (!IsLite()) {
3595  fNotIdle = 1;
3596  fIsWaiting = kFALSE;
3597  }
3598 
3599  // Redirect the output, if needed
3600  fRedirLog = (fSync) ? fRedirLog : kTRUE;
3601 
3602  // The signal is used on masters by XrdProofdProtocol to catch
3603  // the start of processing; on clients it allows to update the
3604  // progress dialog
3605  if (!TestBit(TProof::kIsMaster)) {
3606 
3607  // This is the end of preparation
3608  fQuerySTW.Stop();
3610  PDB(kGlobal,2) Info("HandleInputMessage","Preparation time: %f s", fPrepTime);
3611 
3612  TString selec;
3613  Int_t dsz = -1;
3614  Long64_t first = -1, nent = -1;
3615  (*mess) >> selec >> dsz >> first >> nent;
3616  // Start or reset the progress dialog
3617  if (!gROOT->IsBatch()) {
3618  if (fProgressDialog &&
3620  if (!fProgressDialogStarted) {
3621  fProgressDialog->ExecPlugin(5, this,
3622  selec.Data(), dsz, first, nent);
3624  } else {
3625  ResetProgressDialog(selec, dsz, first, nent);
3626  }
3627  }
3629  }
3630  }
3631  }
3632  break;
3633 
3634  case kPROOF_ENDINIT:
3635  {
3636  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_ENDINIT: enter");
3637 
3638  if (TestBit(TProof::kIsMaster)) {
3639  if (fPlayer)
3640  fPlayer->SetInitTime();
3641  }
3642  }
3643  break;
3644 
3645  case kPROOF_SETIDLE:
3646  {
3647  PDB(kGlobal,2)
3648  Info("HandleInputMessage","kPROOF_SETIDLE from '%s': enter (%d)", sl->GetOrdinal(), fNotIdle);
3649 
3650  // The session is idle
3651  if (IsLite()) {
3652  if (fNotIdle > 0) {
3653  fNotIdle--;
3654  PDB(kGlobal,2)
3655  Info("HandleInputMessage", "%s: got kPROOF_SETIDLE", sl->GetOrdinal());
3656  } else {
3657  Warning("HandleInputMessage",
3658  "%s: got kPROOF_SETIDLE but no running workers ! protocol error?",
3659  sl->GetOrdinal());
3660  }
3661  } else {
3662  fNotIdle = 0;
3663  // Check if the query has been enqueued
3664  if ((mess->BufferSize() > mess->Length()))
3665  (*mess) >> fIsWaiting;
3666  }
3667  }
3668  break;
3669 
3670  case kPROOF_QUERYSUBMITTED:
3671  {
3672  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_QUERYSUBMITTED: enter");
3673 
3674  // We have received the sequential number
3675  (*mess) >> fSeqNum;
3676  Bool_t sync = fSync;
3677  if ((mess->BufferSize() > mess->Length()))
3678  (*mess) >> sync;
3679  if (sync != fSync && fSync) {
3680  // The server required to switch to asynchronous mode
3681  Activate();
3682  fSync = kFALSE;
3683  }
3684  DisableGoAsyn();
3685  // Check if the query has been enqueued
3686  fIsWaiting = kTRUE;
3687  // For Proof-Lite this variable is the number of workers and is set by the player
3688  if (!IsLite())
3689  fNotIdle = 1;
3690 
3691  rc = 1;
3692  }
3693  break;
3694 
3695  case kPROOF_SESSIONTAG:
3696  {
3697  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_SESSIONTAG: enter");
3698 
3699  // We have received the unique tag and save it as name of this object
3700  TString stag;
3701  (*mess) >> stag;
3702  SetName(stag);
3703  // In the TSlave object
3704  sl->SetSessionTag(stag);
3705  // Server may have also sent the group
3706  if ((mess->BufferSize() > mess->Length()))
3707  (*mess) >> fGroup;
3708  // Server may have also sent the user
3709  if ((mess->BufferSize() > mess->Length())) {
3710  TString usr;
3711  (*mess) >> usr;
3712  if (!usr.IsNull()) fUrl.SetUser(usr.Data());
3713  }
3714  }
3715  break;
3716 
3717  case kPROOF_FEEDBACK:
3718  {
3719  PDB(kGlobal,2)
3720  Info("HandleInputMessage","kPROOF_FEEDBACK: enter");
3721  TList *out = (TList *) mess->ReadObject(TList::Class());
3722  out->SetOwner();
3723  if (fPlayer)
3724  fPlayer->StoreFeedback(sl, out); // Adopts the list
3725  else
3726  // Not yet ready: stop collect asap
3727  rc = 1;
3728  }
3729  break;
3730 
3731  case kPROOF_AUTOBIN:
3732  {
3733  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_AUTOBIN: enter");
3734 
3735  TString name;
3736  Double_t xmin, xmax, ymin, ymax, zmin, zmax;
3737 
3738  (*mess) >> name >> xmin >> xmax >> ymin >> ymax >> zmin >> zmax;
3739 
3740  if (fPlayer) fPlayer->UpdateAutoBin(name,xmin,xmax,ymin,ymax,zmin,zmax);
3741 
3742  TMessage answ(kPROOF_AUTOBIN);
3743 
3744  answ << name << xmin << xmax << ymin << ymax << zmin << zmax;
3745 
3746  s->Send(answ);
3747  }
3748  break;
3749 
3750  case kPROOF_PROGRESS:
3751  {
3752  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_PROGRESS: enter");
3753 
3754  if (GetRemoteProtocol() > 25) {
3755  // New format
3756  TProofProgressInfo *pi = 0;
3757  (*mess) >> pi;
3758  fPlayer->Progress(sl,pi);
3759  } else if (GetRemoteProtocol() > 11) {
3760  Long64_t total, processed, bytesread;
3761  Float_t initTime, procTime, evtrti, mbrti;
3762  (*mess) >> total >> processed >> bytesread
3763  >> initTime >> procTime
3764  >> evtrti >> mbrti;
3765  if (fPlayer)
3766  fPlayer->Progress(sl, total, processed, bytesread,
3767  initTime, procTime, evtrti, mbrti);
3768 
3769  } else {
3770  // Old format
3771  Long64_t total, processed;
3772  (*mess) >> total >> processed;
3773  if (fPlayer)
3774  fPlayer->Progress(sl, total, processed);
3775  }
3776  }
3777  break;
3778 
3779  case kPROOF_STOPPROCESS:
3780  {
3781  // This message is sent from a worker that finished processing.
3782  // We determine whether it was asked to finish by the
3783  // packetizer or stopped during processing a packet
3784  // (by TProof::RemoveWorkers() or by an external signal).
3785  // In the later case call packetizer->MarkBad.
3786  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_STOPPROCESS: enter");
3787 
3788  Long64_t events = 0;
3789  Bool_t abort = kFALSE;
3790  TProofProgressStatus *status = 0;
3791 
3792  if ((mess->BufferSize() > mess->Length()) && (fProtocol > 18)) {
3793  (*mess) >> status >> abort;
3794  } else if ((mess->BufferSize() > mess->Length()) && (fProtocol > 8)) {
3795  (*mess) >> events >> abort;
3796  } else {
3797  (*mess) >> events;
3798  }
3799  if (fPlayer) {
3800  if (fProtocol > 18) {
3801  TList *listOfMissingFiles = 0;
3802  if (!(listOfMissingFiles = (TList *)GetOutput("MissingFiles"))) {
3803  listOfMissingFiles = new TList();
3804  listOfMissingFiles->SetName("MissingFiles");
3805  if (fPlayer)
3806  fPlayer->AddOutputObject(listOfMissingFiles);
3807  }
3808  if (fPlayer->GetPacketizer()) {
3809  Int_t ret =
3810  fPlayer->GetPacketizer()->AddProcessed(sl, status, 0, &listOfMissingFiles);
3811  if (ret > 0)
3812  fPlayer->GetPacketizer()->MarkBad(sl, status, &listOfMissingFiles);
3813  // This object is now owned by the packetizer
3814  status = 0;
3815  }
3816  if (status) fPlayer->AddEventsProcessed(status->GetEntries());
3817  } else {
3818  fPlayer->AddEventsProcessed(events);
3819  }
3820  }
3821  SafeDelete(status);
3822  if (!TestBit(TProof::kIsMaster))
3823  Emit("StopProcess(Bool_t)", abort);
3824  break;
3825  }
3826 
3827  case kPROOF_SUBMERGER:
3828  {
3829  PDB(kGlobal,2) Info("HandleInputMessage", "kPROOF_SUBMERGER: enter");
3830  HandleSubmerger(mess, sl);
3831  }
3832  break;
3833 
3834  case kPROOF_GETSLAVEINFO:
3835  {
3836  PDB(kGlobal,2) Info("HandleInputMessage", "kPROOF_GETSLAVEINFO: enter");
3837 
3838  Bool_t active = (GetListOfActiveSlaves()->FindObject(sl) != 0);
3839  Bool_t bad = (GetListOfBadSlaves()->FindObject(sl) != 0);
3840  TList* tmpinfo = 0;
3841  (*mess) >> tmpinfo;
3842  if (tmpinfo == 0) {
3843  Error("HandleInputMessage", "kPROOF_GETSLAVEINFO: no list received!");
3844  } else {
3845  tmpinfo->SetOwner(kFALSE);
3846  Int_t nentries = tmpinfo->GetSize();
3847  for (Int_t i=0; i<nentries; i++) {
3848  TSlaveInfo* slinfo =
3849  dynamic_cast<TSlaveInfo*>(tmpinfo->At(i));
3850  if (slinfo) {
3851  // If PROOF-Lite
3852  if (IsLite()) slinfo->fHostName = gSystem->HostName();
3853  // Check if we have already a instance for this worker
3854  TIter nxw(fSlaveInfo);
3855  TSlaveInfo *ourwi = 0;
3856  while ((ourwi = (TSlaveInfo *)nxw())) {
3857  if (!strcmp(ourwi->GetOrdinal(), slinfo->GetOrdinal())) {
3858  ourwi->SetSysInfo(slinfo->GetSysInfo());
3859  ourwi->fHostName = slinfo->GetName();
3860  if (slinfo->GetDataDir() && (strlen(slinfo->GetDataDir()) > 0))
3861  ourwi->fDataDir = slinfo->GetDataDir();
3862  break;
3863  }
3864  }
3865  if (!ourwi) {
3866  fSlaveInfo->Add(slinfo);
3867  } else {
3868  slinfo = ourwi;
3869  }
3870  if (slinfo->fStatus != TSlaveInfo::kBad) {
3871  if (!active) slinfo->SetStatus(TSlaveInfo::kNotActive);
3872  if (bad) slinfo->SetStatus(TSlaveInfo::kBad);
3873  }
3874  if (sl->GetMsd() && (strlen(sl->GetMsd()) > 0))
3875  slinfo->fMsd = sl->GetMsd();
3876  }
3877  }
3878  delete tmpinfo;
3879  rc = 1;
3880  }
3881  }
3882  break;
3883 
3884  case kPROOF_VALIDATE_DSET:
3885  {
3886  PDB(kGlobal,2)
3887  Info("HandleInputMessage", "kPROOF_VALIDATE_DSET: enter");
3888  TDSet* dset = 0;
3889  (*mess) >> dset;
3890  if (!fDSet)
3891  Error("HandleInputMessage", "kPROOF_VALIDATE_DSET: fDSet not set");
3892  else
3893  fDSet->Validate(dset);
3894  delete dset;
3895  }
3896  break;
3897 
3898  case kPROOF_DATA_READY:
3899  {
3900  PDB(kGlobal,2) Info("HandleInputMessage", "kPROOF_DATA_READY: enter");
3901  Bool_t dataready = kFALSE;
3902  Long64_t totalbytes, bytesready;
3903  (*mess) >> dataready >> totalbytes >> bytesready;
3904  fTotalBytes += totalbytes;
3905  fBytesReady += bytesready;
3906  if (dataready == kFALSE) fDataReady = dataready;
3907  }
3908  break;
3909 
3910  case kPROOF_PING:
3911  // do nothing (ping is already acknowledged)
3912  break;
3913 
3914  case kPROOF_MESSAGE:
3915  {
3916  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_MESSAGE: enter");
3917 
3918  // We have received the unique tag and save it as name of this object
3919  TString msg;
3920  (*mess) >> msg;
3921  Bool_t lfeed = kTRUE;
3922  if ((mess->BufferSize() > mess->Length()))
3923  (*mess) >> lfeed;
3924 
3925  if (TestBit(TProof::kIsClient)) {
3926 
3927  if (fSync) {
3928  // Notify locally
3929  fprintf(stderr,"%s%c", msg.Data(), (lfeed ? '\n' : '\r'));
3930  } else {
3931  // Notify locally taking care of redirection, windows logs, ...
3932  NotifyLogMsg(msg, (lfeed ? "\n" : "\r"));
3933  }
3934  } else {
3935 
3936  // The message is logged for debugging purposes.
3937  fprintf(stderr,"%s%c", msg.Data(), (lfeed ? '\n' : '\r'));
3938  if (gProofServ) {
3939  // We hide it during normal operations
3941 
3942  // And send the message one level up
3943  gProofServ->SendAsynMessage(msg, lfeed);
3944  }
3945  }
3946  }
3947  break;
3948 
3949  case kPROOF_VERSARCHCOMP:
3950  {
3951  TString vac;
3952  (*mess) >> vac;
3953  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_VERSARCHCOMP: %s", vac.Data());
3954  Int_t from = 0;
3955  TString vers, archcomp;
3956  if (vac.Tokenize(vers, from, "|"))
3957  vac.Tokenize(archcomp, from, "|");
3958  sl->SetArchCompiler(archcomp);
3959  vers.ReplaceAll(":","|");
3960  sl->SetROOTVersion(vers);
3961  }
3962  break;
3963 
3964  default:
3965  {
3966  Error("HandleInputMessage", "unknown command received from '%s' (what = %d)",
3967  sl->GetOrdinal(), what);
3968  }
3969  break;
3970  }
3971 
3972  // Cleanup
3973  if (delete_mess)
3974  delete mess;
3975 
3976  // We are done successfully
3977  return rc;
3978 }
3979 
3980 ////////////////////////////////////////////////////////////////////////////////
3981 /// Process a message of type kPROOF_SUBMERGER
3982 
3984 {
3985  // Message sub-type
3986  Int_t type = 0;
3987  (*mess) >> type;
3988  TSocket *s = sl->GetSocket();
3989 
3990  switch (type) {
3991  case kOutputSent:
3992  {
3993  if (IsEndMaster()) {
3994  Int_t merger_id = -1;
3995  (*mess) >> merger_id;
3996 
3997  PDB(kSubmerger, 2)
3998  Info("HandleSubmerger", "kOutputSent: Worker %s:%d:%s had sent its output to merger #%d",
3999  sl->GetName(), sl->GetPort(), sl->GetOrdinal(), merger_id);
4000 
4001  if (!fMergers || fMergers->GetSize() <= merger_id) {
4002  Error("HandleSubmerger", "kOutputSize: #%d not in list ", merger_id);
4003  break;
4004  }
4005  TMergerInfo * mi = (TMergerInfo *) fMergers->At(merger_id);
4006  mi->SetMergedWorker();
4007  if (mi->AreAllWorkersMerged()) {
4008  mi->Deactivate();
4009  if (GetActiveMergersCount() == 0) {
4010  fMergers->Clear();
4011  delete fMergers;
4012  fMergersSet = kFALSE;
4013  fMergersCount = -1;
4014  fLastAssignedMerger = 0;
4015  PDB(kSubmerger, 2) Info("HandleSubmerger", "all mergers removed ... ");
4016  }
4017  }
4018  } else {
4019  PDB(kSubmerger, 2) Error("HandleSubmerger","kOutputSent: received not on endmaster!");
4020  }
4021  }
4022  break;
4023 
4024  case kMergerDown:
4025  {
4026  Int_t merger_id = -1;
4027  (*mess) >> merger_id;
4028 
4029  PDB(kSubmerger, 2) Info("HandleSubmerger", "kMergerDown: #%d ", merger_id);
4030 
4031  if (!fMergers || fMergers->GetSize() <= merger_id) {
4032  Error("HandleSubmerger", "kMergerDown: #%d not in list ", merger_id);
4033  break;
4034  }
4035 
4036  TMergerInfo * mi = (TMergerInfo *) fMergers->At(merger_id);
4037  if (!mi->IsActive()) {
4038  break;
4039  } else {
4040  mi->Deactivate();
4041  }
4042 
4043  // Stop the invalid merger in the case it is still listening
4044  TMessage stop(kPROOF_SUBMERGER);
4045  stop << Int_t(kStopMerging);
4046  stop << 0;
4047  s->Send(stop);
4048 
4049  // Ask for results from merger (only original results from this node as worker are returned)
4050  AskForOutput(mi->GetMerger());
4051 
4052  // Ask for results from all workers assigned to this merger
4053  TIter nxo(mi->GetWorkers());
4054  TObject * o = 0;
4055  while ((o = nxo())) {
4056  AskForOutput((TSlave *)o);
4057  }
4058  PDB(kSubmerger, 2) Info("HandleSubmerger", "kMergerDown:%d: exit", merger_id);
4059  }
4060  break;
4061 
4062  case kOutputSize:
4063  {
4064  if (IsEndMaster()) {
4065  PDB(kSubmerger, 2)
4066  Info("HandleSubmerger", "worker %s reported as finished ", sl->GetOrdinal());
4067 
4068  const char *prefix = gProofServ ? gProofServ->GetPrefix() : "Lite-0";
4069  if (!fFinalizationRunning) {
4070  Info("HandleSubmerger", "finalization on %s started ...", prefix);
4072  }
4073 
4074  Int_t output_size = 0;
4075  Int_t merging_port = 0;
4076  (*mess) >> output_size >> merging_port;
4077 
4078  PDB(kSubmerger, 2) Info("HandleSubmerger",
4079  "kOutputSize: Worker %s:%d:%s reports %d output objects (+ available port %d)",
4080  sl->GetName(), sl->GetPort(), sl->GetOrdinal(), output_size, merging_port);
4081  TString msg;
4082  if (!fMergersSet) {
4083 
4085 
4086  // First pass - setting number of mergers according to user or dynamically
4087  fMergersCount = -1; // No mergers used if not set by user
4088  TParameter<Int_t> *mc = dynamic_cast<TParameter<Int_t> *>(GetParameter("PROOF_UseMergers"));
4089  if (mc) fMergersCount = mc->GetVal(); // Value set by user
4090  TParameter<Int_t> *mh = dynamic_cast<TParameter<Int_t> *>(GetParameter("PROOF_MergersByHost"));
4091  if (mh) fMergersByHost = (mh->GetVal() != 0) ? kTRUE : kFALSE; // Assign submergers by hostname
4092 
4093  // Mergers count specified by user but not valid
4094  if (fMergersCount < 0 || (fMergersCount > (activeWorkers/2) )) {
4095  msg.Form("%s: Invalid request: cannot start %d mergers for %d workers",
4096  prefix, fMergersCount, activeWorkers);
4097  if (gProofServ)
4099  else
4100  Printf("%s",msg.Data());
4101  fMergersCount = 0;
4102  }
4103  // Mergers count will be set dynamically
4104  if ((fMergersCount == 0) && (!fMergersByHost)) {
4105  if (activeWorkers > 1) {
4106  fMergersCount = TMath::Nint(TMath::Sqrt(activeWorkers));
4107  if (activeWorkers / fMergersCount < 2)
4108  fMergersCount = (Int_t) TMath::Sqrt(activeWorkers);
4109  }
4110  if (fMergersCount > 1)
4111  msg.Form("%s: Number of mergers set dynamically to %d (for %d workers)",
4112  prefix, fMergersCount, activeWorkers);
4113  else {
4114  msg.Form("%s: No mergers will be used for %d workers",
4115  prefix, activeWorkers);
4116  fMergersCount = -1;
4117  }
4118  if (gProofServ)
4120  else
4121  Printf("%s",msg.Data());
4122  } else if (fMergersByHost) {
4123  // We force mergers at host level to minimize network traffic
4124  if (activeWorkers > 1) {
4125  fMergersCount = 0;
4126  THashList hosts;
4127  TIter nxwk(fSlaves);
4128  TObject *wrk = 0;
4129  while ((wrk = nxwk())) {
4130  if (!hosts.FindObject(wrk->GetName())) {
4131  hosts.Add(new TObjString(wrk->GetName()));
4132  fMergersCount++;
4133  }
4134  }
4135  }
4136  if (fMergersCount > 1)
4137  msg.Form("%s: Number of mergers set to %d (for %d workers), one for each slave host",
4138  prefix, fMergersCount, activeWorkers);
4139  else {
4140  msg.Form("%s: No mergers will be used for %d workers",
4141  prefix, activeWorkers);
4142  fMergersCount = -1;
4143  }
4144  if (gProofServ)
4146  else
4147  Printf("%s",msg.Data());
4148  } else {
4149  msg.Form("%s: Number of mergers set by user to %d (for %d workers)",
4150  prefix, fMergersCount, activeWorkers);
4151  if (gProofServ)
4153  else
4154  Printf("%s",msg.Data());
4155  }
4156 
4157  // We started merging; we call it here because fMergersCount is still the original number
4158  // and can be saved internally
4160 
4161  // Update merger counters (new workers are not yet active)
4163 
4164  if (fMergersCount > 0) {
4165 
4166  fMergers = new TList();
4167  fLastAssignedMerger = 0;
4168  // Total number of workers, which will not act as mergers ('pure workers')
4169  fWorkersToMerge = (activeWorkers - fMergersCount);
4170  // Establish the first merger
4171  if (!CreateMerger(sl, merging_port)) {
4172  // Cannot establish first merger
4173  AskForOutput(sl);
4174  fWorkersToMerge--;
4175  fMergersCount--;
4176  }
4178  } else {
4179  AskForOutput(sl);
4180  }
4181  fMergersSet = kTRUE;
4182  } else {
4183  // Multiple pass
4184  if (fMergersCount == -1) {
4185  // No mergers. Workers send their outputs directly to master
4186  AskForOutput(sl);
4187  } else {
4188  if ((fRedirectNext > 0 ) && (!fMergersByHost)) {
4189  RedirectWorker(s, sl, output_size);
4190  fRedirectNext--;
4191  } else {
4192  Bool_t newMerger = kTRUE;
4193  if (fMergersByHost) {
4194  TIter nxmg(fMergers);
4195  TMergerInfo *mgi = 0;
4196  while ((mgi = (TMergerInfo *) nxmg())) {
4197  if (!strcmp(sl->GetName(), mgi->GetMerger()->GetName())) {
4198  newMerger = kFALSE;
4199  break;
4200  }
4201  }
4202  }
4203  if ((fMergersCount > fMergers->GetSize()) && newMerger) {
4204  // Still not enough mergers established
4205  if (!CreateMerger(sl, merging_port)) {
4206  // Cannot establish a merger
4207  AskForOutput(sl);
4208  fWorkersToMerge--;
4209  fMergersCount--;
4210  }
4211  } else
4212  RedirectWorker(s, sl, output_size);
4213  }
4214  }
4215  }
4216  } else {
4217  Error("HandleSubMerger","kOutputSize received not on endmaster!");
4218  }
4219  }
4220  break;
4221  }
4222 }
4223 
4224 ////////////////////////////////////////////////////////////////////////////////
4225 /// Redirect output of worker sl to some merger
4226 
4227 void TProof::RedirectWorker(TSocket *s, TSlave * sl, Int_t output_size)
4228 {
4229  Int_t merger_id = -1;
4230 
4231  if (fMergersByHost) {
4232  for (Int_t i = 0; i < fMergers->GetSize(); i++) {
4233  TMergerInfo *mgi = (TMergerInfo *)fMergers->At(i);
4234  if (!strcmp(sl->GetName(), mgi->GetMerger()->GetName())) {
4235  merger_id = i;
4236  break;
4237  }
4238  }
4239  } else {
4240  merger_id = FindNextFreeMerger();
4241  }
4242 
4243  if (merger_id == -1) {
4244  // No free merger (probably it had crashed before)
4245  AskForOutput(sl);
4246  } else {
4247  TMessage sendoutput(kPROOF_SUBMERGER);
4248  sendoutput << Int_t(kSendOutput);
4249  PDB(kSubmerger, 2)
4250  Info("RedirectWorker", "redirecting worker %s to merger %d", sl->GetOrdinal(), merger_id);
4251 
4252  PDB(kSubmerger, 2) Info("RedirectWorker", "redirecting output to merger #%d", merger_id);
4253  if (!fMergers || fMergers->GetSize() <= merger_id) {
4254  Error("RedirectWorker", "#%d not in list ", merger_id);
4255  return;
4256  }
4257  TMergerInfo * mi = (TMergerInfo *) fMergers->At(merger_id);
4258 
4259  TString hname = (IsLite()) ? "localhost" : mi->GetMerger()->GetName();
4260  sendoutput << merger_id;
4261  sendoutput << hname;
4262  sendoutput << mi->GetPort();
4263  s->Send(sendoutput);
4264  mi->AddMergedObjects(output_size);
4265  mi->AddWorker(sl);
4266  }
4267 }
4268 
4269 ////////////////////////////////////////////////////////////////////////////////
4270 /// Return a merger, which is both active and still accepts some workers to be
4271 /// assigned to it. It works on the 'round-robin' basis.
4272 
4274 {
4275  while (fLastAssignedMerger < fMergers->GetSize() &&
4276  (!((TMergerInfo*)fMergers->At(fLastAssignedMerger))->IsActive() ||
4277  ((TMergerInfo*)fMergers->At(fLastAssignedMerger))->AreAllWorkersAssigned())) {
4279  }
4280 
4281  if (fLastAssignedMerger == fMergers->GetSize()) {
4282  fLastAssignedMerger = 0;
4283  } else {
4284  return fLastAssignedMerger++;
4285  }
4286 
4287  while (fLastAssignedMerger < fMergers->GetSize() &&
4288  (!((TMergerInfo*)fMergers->At(fLastAssignedMerger))->IsActive() ||
4289  ((TMergerInfo*)fMergers->At(fLastAssignedMerger))->AreAllWorkersAssigned())) {
4291  }
4292 
4293  if (fLastAssignedMerger == fMergers->GetSize()) {
4294  return -1;
4295  } else {
4296  return fLastAssignedMerger++;
4297  }
4298 }
4299 
4300 ////////////////////////////////////////////////////////////////////////////////
4301 /// Master asks for output from worker sl
4302 
4304 {
4305  TMessage sendoutput(kPROOF_SUBMERGER);
4306  sendoutput << Int_t(kSendOutput);
4307 
4308  PDB(kSubmerger, 2) Info("AskForOutput",
4309  "worker %s was asked to send its output to master",
4310  sl->GetOrdinal());
4311 
4312  sendoutput << -1;
4313  sendoutput << TString("master");
4314  sendoutput << -1;
4315  sl->GetSocket()->Send(sendoutput);
4316  if (IsLite()) fMergePrg.IncreaseNWrks();
4317 }
4318 
4319 ////////////////////////////////////////////////////////////////////////////////
4320 /// Final update of the progress dialog
4321 
4323 {
4324  if (!fPlayer) return;
4325 
4326  // Handle abort ...
4328  if (fSync)
4329  Info("UpdateDialog",
4330  "processing was aborted - %lld events processed",
4332 
4333  if (GetRemoteProtocol() > 11) {
4334  // New format
4335  Progress(-1, fPlayer->GetEventsProcessed(), -1, -1., -1., -1., -1.);
4336  } else {
4338  }
4339  Emit("StopProcess(Bool_t)", kTRUE);
4340  }
4341 
4342  // Handle stop ...
4344  if (fSync)
4345  Info("UpdateDialog",
4346  "processing was stopped - %lld events processed",
4348 
4349  if (GetRemoteProtocol() > 25) {
4350  // New format
4351  Progress(-1, fPlayer->GetEventsProcessed(), -1, -1., -1., -1., -1., -1, -1, -1.);
4352  } else if (GetRemoteProtocol() > 11) {
4353  Progress(-1, fPlayer->GetEventsProcessed(), -1, -1., -1., -1., -1.);
4354  } else {
4356  }
4357  Emit("StopProcess(Bool_t)", kFALSE);
4358  }
4359 
4360  // Final update of the dialog box
4361  if (GetRemoteProtocol() > 25) {
4362  // New format
4363  EmitVA("Progress(Long64_t,Long64_t,Long64_t,Float_t,Float_t,Float_t,Float_t,Int_t,Int_t,Float_t)",
4364  10, (Long64_t)(-1), (Long64_t)(-1), (Long64_t)(-1),(Float_t)(-1.),(Float_t)(-1.),
4365  (Float_t)(-1.),(Float_t)(-1.),(Int_t)(-1),(Int_t)(-1),(Float_t)(-1.));
4366  } else if (GetRemoteProtocol() > 11) {
4367  // New format
4368  EmitVA("Progress(Long64_t,Long64_t,Long64_t,Float_t,Float_t,Float_t,Float_t)",
4369  7, (Long64_t)(-1), (Long64_t)(-1), (Long64_t)(-1),
4370  (Float_t)(-1.),(Float_t)(-1.),(Float_t)(-1.),(Float_t)(-1.));
4371  } else {
4372  EmitVA("Progress(Long64_t,Long64_t)", 2, (Long64_t)(-1), (Long64_t)(-1));
4373  }
4374 }
4375 
4376 ////////////////////////////////////////////////////////////////////////////////
4377 /// Activate the a-sync input handler.
4378 
4380 {
4381  TIter next(fSlaves);
4382  TSlave *sl;
4383 
4384  while ((sl = (TSlave*) next()))
4385  if (sl->GetInputHandler())
4386  sl->GetInputHandler()->Add();
4387 }
4388 
4389 ////////////////////////////////////////////////////////////////////////////////
4390 /// De-activate a-sync input handler.
4391 
4393 {
4394  TIter next(fSlaves);
4395  TSlave *sl;
4396 
4397  while ((sl = (TSlave*) next()))
4398  if (sl->GetInputHandler())
4399  sl->GetInputHandler()->Remove();
4400 }
4401 
4402 ////////////////////////////////////////////////////////////////////////////////
4403 /// Get the active mergers count
4404 
4406 {
4407  if (!fMergers) return 0;
4408 
4409  Int_t active_mergers = 0;
4410 
4411  TIter mergers(fMergers);
4412  TMergerInfo *mi = 0;
4413  while ((mi = (TMergerInfo *)mergers())) {
4414  if (mi->IsActive()) active_mergers++;
4415  }
4416 
4417  return active_mergers;
4418 }
4419 
4420 ////////////////////////////////////////////////////////////////////////////////
4421 /// Create a new merger
4422 
4424 {
4425  PDB(kSubmerger, 2)
4426  Info("CreateMerger", "worker %s will be merger ", sl->GetOrdinal());
4427 
4428  PDB(kSubmerger, 2) Info("CreateMerger","Begin");
4429 
4430  if (port <= 0) {
4431  PDB(kSubmerger,2)
4432  Info("CreateMerger", "cannot create merger on port %d - exit", port);
4433  return kFALSE;
4434  }
4435 
4436  Int_t workers = -1;
4437  if (!fMergersByHost) {
4438  Int_t mergersToCreate = fMergersCount - fMergers->GetSize();
4439  // Number of pure workers, which are not simply divisible by mergers
4440  Int_t rest = fWorkersToMerge % mergersToCreate;
4441  // We add one more worker for each of the first 'rest' mergers being established
4442  if (rest > 0 && fMergers->GetSize() < rest) {
4443  rest = 1;
4444  } else {
4445  rest = 0;
4446  }
4447  workers = (fWorkersToMerge / mergersToCreate) + rest;
4448  } else {
4449  Int_t workersOnHost = 0;
4450  for (Int_t i = 0; i < fActiveSlaves->GetSize(); i++) {
4451  if(!strcmp(sl->GetName(), fActiveSlaves->At(i)->GetName())) workersOnHost++;
4452  }
4453  workers = workersOnHost - 1;
4454  }
4455 
4456  TString msg;
4457  msg.Form("worker %s on host %s will be merger for %d additional workers", sl->GetOrdinal(), sl->GetName(), workers);
4458 
4459  if (gProofServ) {
4461  } else {
4462  Printf("%s",msg.Data());
4463  }
4464  TMergerInfo * merger = new TMergerInfo(sl, port, workers);
4465 
4466  TMessage bemerger(kPROOF_SUBMERGER);
4467  bemerger << Int_t(kBeMerger);
4468  bemerger << fMergers->GetSize();
4469  bemerger << workers;
4470  sl->GetSocket()->Send(bemerger);
4471 
4472  PDB(kSubmerger,2) Info("CreateMerger",
4473  "merger #%d (port: %d) for %d workers started",
4474  fMergers->GetSize(), port, workers);
4475 
4476  fMergers->Add(merger);
4477  fWorkersToMerge = fWorkersToMerge - workers;
4478 
4479  fRedirectNext = workers / 2;
4480 
4481  PDB(kSubmerger, 2) Info("CreateMerger", "exit");
4482  return kTRUE;
4483 }
4484 
4485 ////////////////////////////////////////////////////////////////////////////////
4486 /// Add a bad slave server to the bad slave list and remove it from
4487 /// the active list and from the two monitor objects. Assume that the work
4488 /// done by this worker was lost and ask packerizer to reassign it.
4489 
4490 void TProof::MarkBad(TSlave *wrk, const char *reason)
4491 {
4492  std::lock_guard<std::recursive_mutex> lock(fCloseMutex);
4493 
4494  // We may have been invalidated in the meanwhile: nothing to do in such a case
4495  if (!IsValid()) return;
4496 
4497  if (!wrk) {
4498  Error("MarkBad", "worker instance undefined: protocol error? ");
4499  return;
4500  }
4501 
4502  // Local URL
4503  static TString thisurl;
4504  if (thisurl.IsNull()) {
4505  if (IsMaster()) {
4506  Int_t port = gEnv->GetValue("ProofServ.XpdPort",-1);
4507  thisurl = TUrl(gSystem->HostName()).GetHostFQDN();
4508  if (port > 0) thisurl += TString::Format(":%d", port);
4509  } else {
4510  thisurl.Form("%s@%s:%d", fUrl.GetUser(), fUrl.GetHost(), fUrl.GetPort());
4511  }
4512  }
4513 
4514  if (!reason || (strcmp(reason, kPROOF_TerminateWorker) && strcmp(reason, kPROOF_WorkerIdleTO))) {
4515  // Message for notification
4516  const char *mastertype = (gProofServ && gProofServ->IsTopMaster()) ? "top master" : "master";
4517  TString src = IsMaster() ? Form("%s at %s", mastertype, thisurl.Data()) : "local session";
4518  TString msg;
4519  msg.Form("\n +++ Message from %s : marking %s:%d (%s) as bad\n +++ Reason: %s",
4520  src.Data(), wrk->GetName(), wrk->GetPort(), wrk->GetOrdinal(),
4521  (reason && strlen(reason)) ? reason : "unknown");
4522  Info("MarkBad", "%s", msg.Data());
4523  // Notify one level up, if the case
4524  // Add some hint for diagnostics
4525  if (gProofServ) {
4526  msg += TString::Format("\n\n +++ Most likely your code crashed on worker %s at %s:%d.\n",
4527  wrk->GetOrdinal(), wrk->GetName(), wrk->GetPort());
4528  } else {
4529  msg += TString::Format("\n\n +++ Most likely your code crashed\n");
4530  }
4531  msg += TString::Format(" +++ Please check the session logs for error messages either using\n");
4532  msg += TString::Format(" +++ the 'Show logs' button or executing\n");
4533  msg += TString::Format(" +++\n");
4534  if (gProofServ) {
4535  msg += TString::Format(" +++ root [] TProof::Mgr(\"%s\")->GetSessionLogs()->"
4536  "Display(\"%s\",0)\n\n", thisurl.Data(), wrk->GetOrdinal());
4538  } else {
4539  msg += TString::Format(" +++ root [] TProof::Mgr(\"%s\")->GetSessionLogs()->"
4540  "Display(\"*\")\n\n", thisurl.Data());
4541  Printf("%s", msg.Data());
4542  }
4543  } else if (reason) {
4544  if (gDebug > 0 && strcmp(reason, kPROOF_WorkerIdleTO)) {
4545  Info("MarkBad", "worker %s at %s:%d asked to terminate",
4546  wrk->GetOrdinal(), wrk->GetName(), wrk->GetPort());
4547  }
4548  }
4549 
4550  if (IsMaster() && reason) {
4551  if (strcmp(reason, kPROOF_TerminateWorker)) {
4552  // if the reason was not a planned termination
4553  TList *listOfMissingFiles = 0;
4554  if (!(listOfMissingFiles = (TList *)GetOutput("MissingFiles"))) {
4555  listOfMissingFiles = new TList();
4556  listOfMissingFiles->SetName("MissingFiles");
4557  if (fPlayer)
4558  fPlayer->AddOutputObject(listOfMissingFiles);
4559  }
4560  // If a query is being processed, assume that the work done by
4561  // the worker was lost and needs to be reassigned.
4562  TVirtualPacketizer *packetizer = fPlayer ? fPlayer->GetPacketizer() : 0;
4563  if (packetizer) {
4564  // the worker was lost so do resubmit the packets
4565  packetizer->MarkBad(wrk, 0, &listOfMissingFiles);
4566  }
4567  } else {
4568  // Tell the coordinator that we are gone
4569  if (gProofServ) {
4570  TString ord(wrk->GetOrdinal());
4571  Int_t id = ord.Last('.');
4572  if (id != kNPOS) ord.Remove(0, id+1);
4573  gProofServ->ReleaseWorker(ord.Data());
4574  }
4575  }
4576  } else if (TestBit(TProof::kIsClient) && reason && !strcmp(reason, kPROOF_WorkerIdleTO)) {
4577  // We are invalid after this
4578  fValid = kFALSE;
4579  }
4580 
4581  fActiveSlaves->Remove(wrk);
4582  FindUniqueSlaves();
4583 
4584  fAllMonitor->Remove(wrk->GetSocket());
4585  fActiveMonitor->Remove(wrk->GetSocket());
4586 
4588 
4589  if (IsMaster()) {
4590  if (reason && !strcmp(reason, kPROOF_TerminateWorker)) {
4591  // if the reason was a planned termination then delete the worker and
4592  // remove it from all the lists
4593  fSlaves->Remove(wrk);
4594  fBadSlaves->Remove(wrk);
4595  fActiveSlaves->Remove(wrk);
4596  fInactiveSlaves->Remove(wrk);
4597  fUniqueSlaves->Remove(wrk);
4598  fAllUniqueSlaves->Remove(wrk);
4599  fNonUniqueMasters->Remove(wrk);
4600 
4601  // we add it to the list of terminated slave infos instead, so that it
4602  // stays available in the .workers persistent file
4603  TSlaveInfo *si = new TSlaveInfo(
4604  wrk->GetOrdinal(),
4605  Form("%s@%s:%d", wrk->GetUser(), wrk->GetName(), wrk->GetPort()),
4606  0, "", wrk->GetWorkDir());
4608  else delete si;
4609 
4610  delete wrk;
4611  } else {
4612  fBadSlaves->Add(wrk);
4613  fActiveSlaves->Remove(wrk);
4614  fUniqueSlaves->Remove(wrk);
4615  fAllUniqueSlaves->Remove(wrk);
4616  fNonUniqueMasters->Remove(wrk);
4618  wrk->Close();
4619  // Update the mergers count, if needed
4620  if (fMergersSet) {
4621  Int_t mergersCount = -1;
4622  TParameter<Int_t> *mc = dynamic_cast<TParameter<Int_t> *>(GetParameter("PROOF_UseMergers"));
4623  if (mc) mergersCount = mc->GetVal(); // Value set by user
4624  // Mergers count is set dynamically: recalculate it
4625  if (mergersCount == 0) {
4627  if (activeWorkers > 1) {
4628  fMergersCount = TMath::Nint(TMath::Sqrt(activeWorkers));
4629  if (activeWorkers / fMergersCount < 2)
4630  fMergersCount = (Int_t) TMath::Sqrt(activeWorkers);
4631  }
4632  }
4633  }
4634  }
4635 
4636  // Update session workers files
4637  SaveWorkerInfo();
4638  } else {
4639  // On clients the proof session should be removed from the lists
4640  // and deleted, since it is not valid anymore
4641  fSlaves->Remove(wrk);
4642  if (fManager)
4643  fManager->DiscardSession(this);
4644  }
4645 }
4646 
4647 ////////////////////////////////////////////////////////////////////////////////
4648 /// Add slave with socket s to the bad slave list and remove if from
4649 /// the active list and from the two monitor objects.
4650 
4651 void TProof::MarkBad(TSocket *s, const char *reason)
4652 {
4653  std::lock_guard<std::recursive_mutex> lock(fCloseMutex);
4654 
4655  // We may have been invalidated in the meanwhile: nothing to do in such a case
4656  if (!IsValid()) return;
4657 
4658  TSlave *wrk = FindSlave(s);
4659  MarkBad(wrk, reason);
4660 }
4661 
4662 ////////////////////////////////////////////////////////////////////////////////
4663 /// Ask an active worker 'wrk' to terminate, i.e. to shutdown
4664 
4666 {
4667  if (!wrk) {
4668  Warning("TerminateWorker", "worker instance undefined: protocol error? ");
4669  return;
4670  }
4671 
4672  // Send stop message
4673  if (wrk->GetSocket() && wrk->GetSocket()->IsValid()) {
4674  TMessage mess(kPROOF_STOP);
4675  wrk->GetSocket()->Send(mess);
4676  } else {
4677  if (gDebug > 0)
4678  Info("TerminateWorker", "connection to worker is already down: cannot"
4679  " send termination message");
4680  }
4681 
4682  // This is a bad worker from now on
4684 }
4685 
4686 ////////////////////////////////////////////////////////////////////////////////
4687 /// Ask an active worker 'ord' to terminate, i.e. to shutdown
4688 
4689 void TProof::TerminateWorker(const char *ord)
4690 {
4691  if (ord && strlen(ord) > 0) {
4692  Bool_t all = (ord[0] == '*') ? kTRUE : kFALSE;
4693  if (IsMaster()) {
4694  TIter nxw(fSlaves);
4695  TSlave *wrk = 0;
4696  while ((wrk = (TSlave *)nxw())) {
4697  if (all || !strcmp(wrk->GetOrdinal(), ord)) {
4698  TerminateWorker(wrk);
4699  if (!all) break;
4700  }
4701  }
4702  } else {
4703  TMessage mess(kPROOF_STOP);
4704  mess << TString(ord);
4705  Broadcast(mess);
4706  }
4707  }
4708 }
4709 
4710 ////////////////////////////////////////////////////////////////////////////////
4711 /// Ping PROOF. Returns 1 if master server responded.
4712 
4714 {
4715  return Ping(kActive);
4716 }
4717 
4718 ////////////////////////////////////////////////////////////////////////////////
4719 /// Ping PROOF slaves. Returns the number of slaves that responded.
4720 
4722 {
4723  TList *slaves = 0;
4724  if (list == kAll) slaves = fSlaves;
4725  if (list == kActive) slaves = fActiveSlaves;
4726  if (list == kUnique) slaves = fUniqueSlaves;
4727  if (list == kAllUnique) slaves = fAllUniqueSlaves;
4728 
4729  if (slaves->GetSize() == 0) return 0;
4730 
4731  int nsent = 0;
4732  TIter next(slaves);
4733 
4734  TSlave *sl;
4735  while ((sl = (TSlave *)next())) {
4736  if (sl->IsValid()) {
4737  if (sl->Ping() == -1) {
4738  MarkBad(sl, "ping unsuccessful");
4739  } else {
4740  nsent++;
4741  }
4742  }
4743  }
4744 
4745  return nsent;
4746 }
4747 
4748 ////////////////////////////////////////////////////////////////////////////////
4749 /// Ping PROOF slaves. Returns the number of slaves that responded.
4750 
4752 {
4753  TList *slaves = fSlaves;
4754 
4755  if (slaves->GetSize() == 0) return;
4756 
4757  TIter next(slaves);
4758 
4759  TSlave *sl;
4760  while ((sl = (TSlave *)next())) {
4761  if (sl->IsValid()) {
4762  sl->Touch();
4763  }
4764  }
4765 
4766  return;
4767 }
4768 
4769 ////////////////////////////////////////////////////////////////////////////////
4770 /// Print status of PROOF cluster.
4771 
4772 void TProof::Print(Option_t *option) const
4773 {
4774  TString secCont;
4775 
4776  if (TestBit(TProof::kIsClient)) {
4777  Printf("Connected to: %s (%s)", GetMaster(),
4778  IsValid() ? "valid" : "invalid");
4779  Printf("Port number: %d", GetPort());
4780  Printf("User: %s", GetUser());
4781  Printf("ROOT version|rev: %s|%s", gROOT->GetVersion(), gROOT->GetGitCommit());
4782  Printf("Architecture-Compiler: %s-%s", gSystem->GetBuildArch(),
4784  TSlave *sl = (TSlave *)fActiveSlaves->First();
4785  if (sl) {
4786  TString sc;
4787  if (sl->GetSocket()->GetSecContext())
4788  Printf("Security context: %s",
4789  sl->GetSocket()->GetSecContext()->AsString(sc));
4790  Printf("Proofd protocol version: %d", sl->GetSocket()->GetRemoteProtocol());
4791  } else {
4792  Printf("Security context: Error - No connection");
4793  Printf("Proofd protocol version: Error - No connection");
4794  }
4795  Printf("Client protocol version: %d", GetClientProtocol());
4796  Printf("Remote protocol version: %d", GetRemoteProtocol());
4797  Printf("Log level: %d", GetLogLevel());
4798  Printf("Session unique tag: %s", IsValid() ? GetSessionTag() : "");
4799  Printf("Default data pool: %s", IsValid() ? GetDataPoolUrl() : "");
4800  if (IsValid())
4801  const_cast<TProof*>(this)->SendPrint(option);
4802  } else {
4803  const_cast<TProof*>(this)->AskStatistics();
4804  if (IsParallel())
4805  Printf("*** Master server %s (parallel mode, %d workers):",
4807  else
4808  Printf("*** Master server %s (sequential mode):",
4809  gProofServ->GetOrdinal());
4810 
4811  Printf("Master host name: %s", gSystem->HostName());
4812  Printf("Port number: %d", GetPort());
4813  if (strlen(gProofServ->GetGroup()) > 0) {
4814  Printf("User/Group: %s/%s", GetUser(), gProofServ->GetGroup());
4815  } else {
4816  Printf("User: %s", GetUser());
4817  }
4818  TString ver;
4819  ver.Form("%s|%s", gROOT->GetVersion(), gROOT->GetGitCommit());
4820  if (gSystem->Getenv("ROOTVERSIONTAG"))
4821  ver.Form("%s|%s", gROOT->GetVersion(), gSystem->Getenv("ROOTVERSIONTAG"));
4822  Printf("ROOT version|rev|tag: %s", ver.Data());
4823  Printf("Architecture-Compiler: %s-%s", gSystem->GetBuildArch(),
4825  Printf("Protocol version: %d", GetClientProtocol());
4826  Printf("Image name: %s", GetImage());
4827  Printf("Working directory: %s", gSystem->WorkingDirectory());
4828  Printf("Config directory: %s", GetConfDir());
4829  Printf("Config file: %s", GetConfFile());
4830  Printf("Log level: %d", GetLogLevel());
4831  Printf("Number of workers: %d", GetNumberOfSlaves());
4832  Printf("Number of active workers: %d", GetNumberOfActiveSlaves());
4833  Printf("Number of unique workers: %d", GetNumberOfUniqueSlaves());
4834  Printf("Number of inactive workers: %d", GetNumberOfInactiveSlaves());
4835  Printf("Number of bad workers: %d", GetNumberOfBadSlaves());
4836  Printf("Total MB's processed: %.2f", float(GetBytesRead())/(1024*1024));
4837  Printf("Total real time used (s): %.3f", GetRealTime());
4838  Printf("Total CPU time used (s): %.3f", GetCpuTime());
4839  if (TString(option).Contains("a", TString::kIgnoreCase) && GetNumberOfSlaves()) {
4840  Printf("List of workers:");
4841  TList masters;
4842  TIter nextslave(fSlaves);
4843  while (TSlave* sl = dynamic_cast<TSlave*>(nextslave())) {
4844  if (!sl->IsValid()) continue;
4845 
4846  if (sl->GetSlaveType() == TSlave::kSlave) {
4847  sl->Print(option);
4848  } else if (sl->GetSlaveType() == TSlave::kMaster) {
4849  TMessage mess(kPROOF_PRINT);
4850  mess.WriteString(option);
4851  if (sl->GetSocket()->Send(mess) == -1)
4852  const_cast<TProof*>(this)->MarkBad(sl, "could not send kPROOF_PRINT request");
4853  else
4854  masters.Add(sl);
4855  } else {
4856  Error("Print", "TSlave is neither Master nor Worker");
4857  R__ASSERT(0);
4858  }
4859  }
4860  const_cast<TProof*>(this)->Collect(&masters, fCollectTimeout);
4861  }
4862  }
4863 }
4864 
4865 ////////////////////////////////////////////////////////////////////////////////
4866 /// Extract from opt information about output handling settings.
4867 /// The understood keywords are:
4868 /// of=<file>, outfile=<file> output file location
4869 /// ds=<dsname>, dataset=<dsname> dataset name ('of' and 'ds' are
4870 /// mutually exclusive,execution stops
4871 /// if both are found)
4872 /// sft[=<opt>], savetofile[=<opt>] control saving to file
4873 ///
4874 /// For 'mvf', the <opt> integer has the following meaning:
4875 /// <opt> = <how>*10 + <force>
4876 /// <force> = 0 save to file if memory threshold is reached
4877 /// (the memory threshold is set by the cluster
4878 /// admin); in case an output file is defined, the
4879 /// files are merged at the end;
4880 /// 1 save results to file.
4881 /// <how> = 0 save at the end of the query
4882 /// 1 save results after each packet (to reduce the
4883 /// loss in case of crash).
4884 ///
4885 /// Setting 'ds' automatically sets 'mvf=1'; it is still possible to set 'mvf=11'
4886 /// to save results after each packet.
4887 ///
4888 /// The separator from the next option is either a ' ' or a ';'
4889 ///
4890 /// All recognized settings are removed from the input string opt.
4891 /// If action == 0, set up the output file accordingly, if action == 1 clean related
4892 /// output file settings.
4893 /// If the final target file is local then 'target' is set to the final local path
4894 /// when action == 0 and used to retrieve the file with TFile::Cp when action == 1.
4895 ///
4896 /// Output file settings are in the form
4897 ///
4898 /// <previous_option>of=name <next_option>
4899 /// <previous_option>outfile=name,...;<next_option>
4900 ///
4901 /// The separator from the next option is either a ' ' or a ';'
4902 /// Called interanally by TProof::Process.
4903 ///
4904 /// Returns 0 on success, -1 on error.
4905 
4907 {
4908  TString outfile, dsname, stfopt;
4909  if (action == 0) {
4910  TString tagf, tagd, tags, oo;
4911  Ssiz_t from = 0, iof = kNPOS, iod = kNPOS, ios = kNPOS;
4912  while (opt.Tokenize(oo, from, "[; ]")) {
4913  if (oo.BeginsWith("of=")) {
4914  tagf = "of=";
4915  iof = opt.Index(tagf);
4916  } else if (oo.BeginsWith("outfile=")) {
4917  tagf = "outfile=";
4918  iof = opt.Index(tagf);
4919  } else if (oo.BeginsWith("ds")) {
4920  tagd = "ds";
4921  iod = opt.Index(tagd);
4922  } else if (oo.BeginsWith("dataset")) {
4923  tagd = "dataset";
4924  iod = opt.Index(tagd);
4925  } else if (oo.BeginsWith("stf")) {
4926  tags = "stf";
4927  ios = opt.Index(tags);
4928  } else if (oo.BeginsWith("savetofile")) {
4929  tags = "savetofile";
4930  ios = opt.Index(tags);
4931  }
4932  }
4933  // Check consistency
4934  if (iof != kNPOS && iod != kNPOS) {
4935  Error("HandleOutputOptions", "options 'of'/'outfile' and 'ds'/'dataset' are incompatible!");
4936  return -1;
4937  }
4938 
4939  // Check output file first
4940  if (iof != kNPOS) {
4941  from = iof + tagf.Length();
4942  if (!opt.Tokenize(outfile, from, "[; ]") || outfile.IsNull()) {
4943  Error("HandleOutputOptions", "could not extract output file settings string! (%s)", opt.Data());
4944  return -1;
4945  }
4946  // For removal from original options string
4947  tagf += outfile;
4948  }
4949  // Check dataset
4950  if (iod != kNPOS) {
4951  from = iod + tagd.Length();
4952  if (!opt.Tokenize(dsname, from, "[; ]"))
4953  if (gDebug > 0) Info("HandleOutputOptions", "no dataset name found: use default");
4954  // For removal from original options string
4955  tagd += dsname;
4956  // The name may be empty or beginning with a '='
4957  if (dsname.BeginsWith("=")) dsname.Replace(0, 1, "");
4958  if (dsname.Contains("|V")) {
4959  target = "ds|V";
4960  dsname.ReplaceAll("|V", "");
4961  }
4962  if (dsname.IsNull()) dsname = "dataset_<qtag>";
4963  }
4964  // Check stf
4965  if (ios != kNPOS) {
4966  from = ios + tags.Length();
4967  if (!opt.Tokenize(stfopt, from, "[; ]"))
4968  if (gDebug > 0) Info("HandleOutputOptions", "save-to-file not found: use default");
4969  // For removal from original options string
4970  tags += stfopt;
4971  // It must be digit
4972  if (!stfopt.IsNull()) {
4973  if (stfopt.BeginsWith("=")) stfopt.Replace(0,1,"");
4974  if (!stfopt.IsNull()) {
4975  if (!stfopt.IsDigit()) {
4976  Error("HandleOutputOptions", "save-to-file option must be a digit! (%s)", stfopt.Data());
4977  return -1;
4978  }
4979  } else {
4980  // Default
4981  stfopt = "1";
4982  }
4983  } else {
4984  // Default
4985  stfopt = "1";
4986  }
4987  }
4988  // Remove from original options string
4989  opt.ReplaceAll(tagf, "");
4990  opt.ReplaceAll(tagd, "");
4991  opt.ReplaceAll(tags, "");
4992  }
4993 
4994  // Parse now
4995  if (action == 0) {
4996  // Output file
4997  if (!outfile.IsNull()) {
4998  if (!outfile.BeginsWith("master:")) {
5000  Warning("HandleOutputOptions",
5001  "directory '%s' for the output file does not exists or is not writable:"
5002  " saving to master", gSystem->GetDirName(outfile.Data()).Data());
5003  outfile.Form("master:%s", gSystem->BaseName(outfile.Data()));
5004  } else {
5005  if (!IsLite()) {
5006  // The target file is local, so we need to retrieve it
5007  target = outfile;
5008  if (!stfopt.IsNull()) {
5009  outfile.Form("master:%s", gSystem->BaseName(target.Data()));
5010  } else {
5011  outfile = "";
5012  }
5013  }
5014  }
5015  }
5016  if (outfile.BeginsWith("master:")) {
5017  outfile.ReplaceAll("master:", "");
5018  if (outfile.IsNull() || !gSystem->IsAbsoluteFileName(outfile)) {
5019  // Get the master data dir
5020  TString ddir, emsg;
5021  if (!IsLite()) {
5022  if (Exec("gProofServ->GetDataDir()", "0", kTRUE) == 0) {
5023  TObjString *os = fMacroLog.GetLineWith("const char");
5024  if (os) {
5025  Ssiz_t fst = os->GetString().First('\"');
5026  Ssiz_t lst = os->GetString().Last('\"');
5027  ddir = os->GetString()(fst+1, lst-fst-1);
5028  } else {
5029  emsg = "could not find 'const char *' string in macro log! cannot continue";
5030  }
5031  } else {
5032  emsg = "could not retrieve master data directory info! cannot continue";
5033  }
5034  if (!emsg.IsNull()) {
5035  Error("HandleOutputOptions", "%s", emsg.Data());
5036  return -1;
5037  }
5038  }
5039  if (!ddir.IsNull()) ddir += "/";
5040  if (outfile.IsNull()) {
5041  outfile.Form("%s<file>", ddir.Data());
5042  } else {
5043  outfile.Insert(0, TString::Format("%s", ddir.Data()));
5044  }
5045  }
5046  }
5047  // Set the parameter
5048  if (!outfile.IsNull()) {
5049  if (!outfile.BeginsWith("of:")) outfile.Insert(0, "of:");
5050  SetParameter("PROOF_DefaultOutputOption", outfile.Data());
5051  }
5052  }
5053  // Dataset creation
5054  if (!dsname.IsNull()) {
5055  dsname.Insert(0, "ds:");
5056  // Set the parameter
5057  SetParameter("PROOF_DefaultOutputOption", dsname.Data());
5058  // Check the Save-To-File option
5059  if (!stfopt.IsNull()) {
5060  Int_t ostf = (Int_t) stfopt.Atoi();
5061  if (ostf%10 <= 0) {
5062  Warning("HandleOutputOptions", "Dataset required bu Save-To-File disabled: enabling!");
5063  stfopt.Form("%d", ostf+1);
5064  }
5065  } else {
5066  // Minimal setting
5067  stfopt = "1";
5068  }
5069  }
5070  // Save-To-File options
5071  if (!stfopt.IsNull()) {
5072  // Set the parameter
5073  SetParameter("PROOF_SavePartialResults", (Int_t) stfopt.Atoi());
5074  }
5075  } else {
5076  // Retrieve the file, if required
5077  if (GetOutputList()) {
5078  if (target == "ds|V") {
5079  // Find the dataset
5080  dsname = "";
5081  TIter nxo(GetOutputList());
5082  TObject *o = 0;
5083  while ((o = nxo())) {