Logo ROOT   6.10/09
Reference Guide
TProof.cxx
Go to the documentation of this file.
1 // @(#)root/proof:$Id: a2a50e759072c37ccbc65ecbcce735a76de86e95 $
2 // Author: Fons Rademakers 13/02/97
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 /**
12  \defgroup proof PROOF
13 
14  Classes defining the Parallel ROOT Facility, PROOF, a framework for parallel analysis of ROOT TTrees.
15 
16 */
17 
18 /**
19  \defgroup proofkernel PROOF kernel Libraries
20  \ingroup proof
21 
22  The PROOF kernel libraries (libProof, libProofPlayer, libProofDraw) contain the classes defining
23  the kernel of the PROOF facility, i.e. the protocol and the utilities to steer data processing
24  and handling of results.
25 
26 */
27 
28 /** \class TProof
29 \ingroup proofkernel
30 
31 This class controls a Parallel ROOT Facility, PROOF, cluster.
32 It fires the worker servers, it keeps track of how many workers are
33 running, it keeps track of the workers running status, it broadcasts
34 messages to all workers, it collects results, etc.
35 
36 */
37 
38 #include <stdlib.h>
39 #include <fcntl.h>
40 #include <errno.h>
41 #ifdef WIN32
42 # include <io.h>
43 # include <sys/stat.h>
44 # include <sys/types.h>
45 # include "snprintf.h"
46 #else
47 # include <unistd.h>
48 #endif
49 #include <vector>
50 
51 #include "RConfigure.h"
52 #include "Riostream.h"
53 #include "Getline.h"
54 #include "TBrowser.h"
55 #include "TChain.h"
56 #include "TCondor.h"
57 #include "TDSet.h"
58 #include "TError.h"
59 #include "TEnv.h"
60 #include "TEntryList.h"
61 #include "TEventList.h"
62 #include "TFile.h"
63 #include "TFileInfo.h"
64 #include "TFunction.h"
65 #include "TFTP.h"
66 #include "THashList.h"
67 #include "TInterpreter.h"
68 #include "TKey.h"
69 #include "TMap.h"
70 #include "TMath.h"
71 #include "TMessage.h"
72 #include "TMethodArg.h"
73 #include "TMethodCall.h"
74 #include "TMonitor.h"
75 #include "TObjArray.h"
76 #include "TObjString.h"
77 #include "TParameter.h"
78 #include "TProof.h"
79 #include "TProofNodeInfo.h"
80 #include "TProofOutputFile.h"
81 #include "TVirtualProofPlayer.h"
82 #include "TVirtualPacketizer.h"
83 #include "TProofServ.h"
84 #include "TPluginManager.h"
85 #include "TQueryResult.h"
86 #include "TRandom.h"
87 #include "TRegexp.h"
88 #include "TROOT.h"
89 #include "TSlave.h"
90 #include "TSocket.h"
91 #include "TSortedList.h"
92 #include "TSystem.h"
93 #include "TTree.h"
94 #include "TUrl.h"
95 #include "TFileCollection.h"
96 #include "TDataSetManager.h"
97 #include "TDataSetManagerFile.h"
98 #include "TMacro.h"
99 #include "TSelector.h"
100 #include "TPRegexp.h"
101 #include "TPackMgr.h"
102 
103 #include <mutex>
104 
106 
107 // Rotating indicator
108 char TProofMergePrg::fgCr[4] = {'-', '\\', '|', '/'};
109 
110 TList *TProof::fgProofEnvList = 0; // List of env vars for proofserv
111 TPluginHandler *TProof::fgLogViewer = 0; // Log viewer handler
112 
114 
115 //----- PROOF Interrupt signal handler -----------------------------------------
116 ////////////////////////////////////////////////////////////////////////////////
117 /// TProof interrupt handler.
118 
120 {
121  if (!fProof->IsTty() || fProof->GetRemoteProtocol() < 22) {
122 
123  // Cannot ask the user : abort any remote processing
124  fProof->StopProcess(kTRUE);
125 
126  } else {
127  // Real stop or request to switch to asynchronous?
128  const char *a = 0;
129  if (fProof->GetRemoteProtocol() < 22) {
130  a = Getline("\nSwitch to asynchronous mode not supported remotely:"
131  "\nEnter S/s to stop, Q/q to quit, any other key to continue: ");
132  } else {
133  a = Getline("\nEnter A/a to switch asynchronous, S/s to stop, Q/q to quit,"
134  " any other key to continue: ");
135  }
136  if (a[0] == 'Q' || a[0] == 'S' || a[0] == 'q' || a[0] == 's') {
137 
138  Info("Notify","Processing interrupt signal ... %c", a[0]);
139 
140  // Stop or abort any remote processing
141  Bool_t abort = (a[0] == 'Q' || a[0] == 'q') ? kTRUE : kFALSE;
142  fProof->StopProcess(abort);
143 
144  } else if ((a[0] == 'A' || a[0] == 'a') && fProof->GetRemoteProtocol() >= 22) {
145  // Stop any remote processing
146  fProof->GoAsynchronous();
147  }
148  }
149 
150  return kTRUE;
151 }
152 
153 //----- Input handler for messages from TProofServ -----------------------------
154 ////////////////////////////////////////////////////////////////////////////////
155 /// Constructor
156 
158  : TFileHandler(s->GetDescriptor(),1),
159  fSocket(s), fProof(p)
160 {
161 }
162 
163 ////////////////////////////////////////////////////////////////////////////////
164 /// Handle input
165 
167 {
169  return kTRUE;
170 }
171 
172 
173 //------------------------------------------------------------------------------
174 
176 
177 ////////////////////////////////////////////////////////////////////////////////
178 /// Used to sort slaveinfos by ordinal.
179 
180 Int_t TSlaveInfo::Compare(const TObject *obj) const
181 {
182  if (!obj) return 1;
183 
184  const TSlaveInfo *si = dynamic_cast<const TSlaveInfo*>(obj);
185 
186  if (!si) return fOrdinal.CompareTo(obj->GetName());
187 
188  const char *myord = GetOrdinal();
189  const char *otherord = si->GetOrdinal();
190  while (myord && otherord) {
191  Int_t myval = atoi(myord);
192  Int_t otherval = atoi(otherord);
193  if (myval < otherval) return 1;
194  if (myval > otherval) return -1;
195  myord = strchr(myord, '.');
196  if (myord) myord++;
197  otherord = strchr(otherord, '.');
198  if (otherord) otherord++;
199  }
200  if (myord) return -1;
201  if (otherord) return 1;
202  return 0;
203 }
204 
205 ////////////////////////////////////////////////////////////////////////////////
206 /// Used to compare slaveinfos by ordinal.
207 
209 {
210  if (!obj) return kFALSE;
211  const TSlaveInfo *si = dynamic_cast<const TSlaveInfo*>(obj);
212  if (!si) return kFALSE;
213  return (strcmp(GetOrdinal(), si->GetOrdinal()) == 0);
214 }
215 
216 ////////////////////////////////////////////////////////////////////////////////
217 /// Print slave info. If opt = "active" print only the active
218 /// slaves, if opt="notactive" print only the not active slaves,
219 /// if opt = "bad" print only the bad slaves, else
220 /// print all slaves.
221 
222 void TSlaveInfo::Print(Option_t *opt) const
223 {
224  TString stat = fStatus == kActive ? "active" :
225  fStatus == kBad ? "bad" :
226  "not active";
227 
228  Bool_t newfmt = kFALSE;
229  TString oo(opt);
230  if (oo.Contains("N")) {
231  newfmt = kTRUE;
232  oo.ReplaceAll("N","");
233  }
234  if (oo == "active" && fStatus != kActive) return;
235  if (oo == "notactive" && fStatus != kNotActive) return;
236  if (oo == "bad" && fStatus != kBad) return;
237 
238  if (newfmt) {
239  TString msd, si, datadir;
240  if (!(fMsd.IsNull())) msd.Form("| msd: %s ", fMsd.Data());
241  if (!(fDataDir.IsNull())) datadir.Form("| datadir: %s ", fDataDir.Data());
242  if (fSysInfo.fCpus > 0) {
243  si.Form("| %s, %d cores, %d MB ram", fHostName.Data(),
244  fSysInfo.fCpus, fSysInfo.fPhysRam);
245  } else {
246  si.Form("| %s", fHostName.Data());
247  }
248  Printf("Worker: %9s %s %s%s| %s", fOrdinal.Data(), si.Data(), msd.Data(), datadir.Data(), stat.Data());
249 
250  } else {
251  TString msd = fMsd.IsNull() ? "<null>" : fMsd.Data();
252 
253  std::cout << "Slave: " << fOrdinal
254  << " hostname: " << fHostName
255  << " msd: " << msd
256  << " perf index: " << fPerfIndex
257  << " " << stat
258  << std::endl;
259  }
260 }
261 
262 ////////////////////////////////////////////////////////////////////////////////
263 /// Setter for fSysInfo
264 
266 {
267  fSysInfo.fOS = si.fOS; // OS
268  fSysInfo.fModel = si.fModel; // computer model
269  fSysInfo.fCpuType = si.fCpuType; // type of cpu
270  fSysInfo.fCpus = si.fCpus; // number of cpus
271  fSysInfo.fCpuSpeed = si.fCpuSpeed; // cpu speed in MHz
272  fSysInfo.fBusSpeed = si.fBusSpeed; // bus speed in MHz
273  fSysInfo.fL2Cache = si.fL2Cache; // level 2 cache size in KB
274  fSysInfo.fPhysRam = si.fPhysRam; // Physical RAM
275 }
276 
278 
279 //------------------------------------------------------------------------------
280 
281 ////////////////////////////////////////////////////////////////////////////////
282 /// Destructor
283 
285 {
286  // Just delete the list, the objects are owned by other list
287  if (fWorkers) {
288  fWorkers->SetOwner(kFALSE);
289  SafeDelete(fWorkers);
290  }
291 }
292 ////////////////////////////////////////////////////////////////////////////////
293 /// Increase number of already merged workers by 1
294 
296 {
297  if (AreAllWorkersMerged())
298  Error("SetMergedWorker", "all workers have been already merged before!");
299  else
300  fMergedWorkers++;
301 }
302 
303 ////////////////////////////////////////////////////////////////////////////////
304 /// Add new worker to the list of workers to be merged by this merger
305 
307 {
308  if (!fWorkers)
309  fWorkers = new TList();
310  if (fWorkersToMerge == fWorkers->GetSize()) {
311  Error("AddWorker", "all workers have been already assigned to this merger");
312  return;
313  }
314  fWorkers->Add(sl);
315 }
316 
317 ////////////////////////////////////////////////////////////////////////////////
318 /// Return if merger has already merged all workers, i.e. if it has finished its merging job
319 
321 {
322  return (fWorkersToMerge == fMergedWorkers);
323 }
324 
325 ////////////////////////////////////////////////////////////////////////////////
326 /// Return if the determined number of workers has been already assigned to this merger
327 
329 {
330  if (!fWorkers)
331  return kFALSE;
332 
333  return (fWorkers->GetSize() == fWorkersToMerge);
334 }
335 
336 ////////////////////////////////////////////////////////////////////////////////
337 /// This a private API function.
338 /// It checks whether the connection string contains a PoD cluster protocol.
339 /// If it does, then the connection string will be changed to reflect
340 /// a real PROOF connection string for a PROOF cluster managed by PoD.
341 /// PoD: http://pod.gsi.de .
342 /// Return -1 if the PoD request failed; return 0 otherwise.
343 
344 static Int_t PoDCheckUrl(TString *_cluster)
345 {
346  if ( !_cluster )
347  return 0;
348 
349  // trim spaces from both sides of the string
350  *_cluster = _cluster->Strip( TString::kBoth );
351  // PoD protocol string
352  const TString pod_prot("pod");
353 
354  // URL test
355  // TODO: The URL test is to support remote PoD servers (not managed by pod-remote)
356  TUrl url( _cluster->Data() );
357  if( pod_prot.CompareTo(url.GetProtocol(), TString::kIgnoreCase) )
358  return 0;
359 
360  // PoD cluster is used
361  // call pod-info in a batch mode (-b).
362  // pod-info will find either a local PoD cluster or
363  // a remote one, manged by pod-remote.
364  *_cluster = gSystem->GetFromPipe("pod-info -c -b");
365  if( 0 == _cluster->Length() ) {
366  Error("PoDCheckUrl", "PoD server is not running");
367  return -1;
368  }
369  return 0;
370 }
371 
372 ////////////////////////////////////////////////////////////////////////////////
373 /// Create a PROOF environment. Starting PROOF involves either connecting
374 /// to a master server, which in turn will start a set of slave servers, or
375 /// directly starting as master server (if master = ""). Masterurl is of
376 /// the form: [proof[s]://]host[:port]. Conffile is the name of the config
377 /// file describing the remote PROOF cluster (this argument alows you to
378 /// describe different cluster configurations).
379 /// The default is proof.conf. Confdir is the directory where the config
380 /// file and other PROOF related files are (like motd and noproof files).
381 /// Loglevel is the log level (default = 1). User specified custom config
382 /// files will be first looked for in $HOME/.conffile.
383 
384 TProof::TProof(const char *masterurl, const char *conffile, const char *confdir,
385  Int_t loglevel, const char *alias, TProofMgr *mgr)
386  : fUrl(masterurl)
387 {
388  // Default initializations
389  InitMembers();
390 
391  // This may be needed during init
392  fManager = mgr;
393 
394  // Default server type
396 
397  // Default query mode
398  fQueryMode = kSync;
399 
400  // Parse the main URL, adjusting the missing fields and setting the relevant
401  // bits
404 
405  // Protocol and Host
406  if (!masterurl || strlen(masterurl) <= 0) {
407  fUrl.SetProtocol("proof");
408  fUrl.SetHost("__master__");
409  } else if (!(strstr(masterurl, "://"))) {
410  fUrl.SetProtocol("proof");
411  }
412  // Port
413  if (fUrl.GetPort() == TUrl(" ").GetPort())
414  fUrl.SetPort(TUrl("proof:// ").GetPort());
415 
416  // Make sure to store the FQDN, so to get a solid reference for subsequent checks
417  if (!strcmp(fUrl.GetHost(), "__master__"))
418  fMaster = fUrl.GetHost();
419  else if (!strlen(fUrl.GetHost()))
420  fMaster = gSystem->GetHostByName(gSystem->HostName()).GetHostName();
421  else
422  fMaster = gSystem->GetHostByName(fUrl.GetHost()).GetHostName();
423 
424  // Server type
425  if (strlen(fUrl.GetOptions()) > 0) {
426  TString opts(fUrl.GetOptions());
427  if (!(strncmp(fUrl.GetOptions(),"std",3))) {
429  opts.Remove(0,3);
430  fUrl.SetOptions(opts.Data());
431  } else if (!(strncmp(fUrl.GetOptions(),"lite",4))) {
433  opts.Remove(0,4);
434  fUrl.SetOptions(opts.Data());
435  }
436  }
437 
438  // Instance type
442  if (fMaster == "__master__") {
443  fMasterServ = kTRUE;
446  } else if (fMaster == "prooflite") {
447  // Client and master are merged
448  fMasterServ = kTRUE;
450  }
451  // Flag that we are a client
453  if (!gSystem->Getenv("ROOTPROOFCLIENT")) gSystem->Setenv("ROOTPROOFCLIENT","");
454 
455  Init(masterurl, conffile, confdir, loglevel, alias);
456 
457  // If the user was not set, get it from the master
458  if (strlen(fUrl.GetUser()) <= 0) {
459  TString usr, emsg;
460  if (Exec("gProofServ->GetUser()", "0", kTRUE) == 0) {
461  TObjString *os = fMacroLog.GetLineWith("const char");
462  if (os) {
463  Ssiz_t fst = os->GetString().First('\"');
464  Ssiz_t lst = os->GetString().Last('\"');
465  usr = os->GetString()(fst+1, lst-fst-1);
466  } else {
467  emsg = "could not find 'const char *' string in macro log";
468  }
469  } else {
470  emsg = "could not retrieve user info";
471  }
472  if (!emsg.IsNull()) {
473  // Get user logon name
475  if (pw) {
476  usr = pw->fUser;
477  delete pw;
478  }
479  Warning("TProof", "%s: using local default %s", emsg.Data(), usr.Data());
480  }
481  // Set the user name in the main URL
482  fUrl.SetUser(usr.Data());
483  }
484 
485  // If called by a manager, make sure it stays in last position
486  // for cleaning
487  if (mgr) {
489  gROOT->GetListOfSockets()->Remove(mgr);
490  gROOT->GetListOfSockets()->Add(mgr);
491  }
492 
493  // Old-style server type: we add this to the list and set the global pointer
495  if (!gROOT->GetListOfProofs()->FindObject(this))
496  gROOT->GetListOfProofs()->Add(this);
497 
498  // Still needed by the packetizers: needs to be changed
499  gProof = this;
500 }
501 
502 ////////////////////////////////////////////////////////////////////////////////
503 /// Protected constructor to be used by classes deriving from TProof
504 /// (they have to call Init themselves and override StartSlaves
505 /// appropriately).
506 ///
507 /// This constructor simply closes any previous gProof and sets gProof
508 /// to this instance.
509 
511 {
512  // Default initializations
513  InitMembers();
514 
515  if (!gROOT->GetListOfProofs()->FindObject(this))
516  gROOT->GetListOfProofs()->Add(this);
517 
518  gProof = this;
519 }
520 
521 ////////////////////////////////////////////////////////////////////////////////
522 /// Default initializations
523 
525 {
526  fValid = kFALSE;
527  fTty = kFALSE;
528  fRecvMessages = 0;
529  fSlaveInfo = 0;
533  fLastPollWorkers_s = -1;
534  fActiveSlaves = 0;
535  fInactiveSlaves = 0;
536  fUniqueSlaves = 0;
537  fAllUniqueSlaves = 0;
538  fNonUniqueMasters = 0;
539  fActiveMonitor = 0;
540  fUniqueMonitor = 0;
541  fAllUniqueMonitor = 0;
542  fCurrentMonitor = 0;
543  fBytesRead = 0;
544  fRealTime = 0;
545  fCpuTime = 0;
546  fIntHandler = 0;
547  fProgressDialog = 0;
550  fPlayer = 0;
551  fFeedback = 0;
552  fChains = 0;
553  fDSet = 0;
554  fNotIdle = 0;
555  fSync = kTRUE;
557  fIsWaiting = kFALSE;
558  fRedirLog = kFALSE;
559  fLogFileW = 0;
560  fLogFileR = 0;
563  fMacroLog.SetName("ProofLogMacro");
564 
565  fWaitingSlaves = 0;
566  fQueries = 0;
567  fOtherQueries = 0;
568  fDrawQueries = 0;
569  fMaxDrawQueries = 1;
570  fSeqNum = 0;
571 
572  fSessionID = -1;
573  fEndMaster = kFALSE;
574 
575  fPackMgr = 0;
577 
578  fInputData = 0;
579 
580  fPrintProgress = 0;
581 
582  fLoadedMacros = 0;
583 
584  fProtocol = -1;
585  fSlaves = 0;
587  fBadSlaves = 0;
588  fAllMonitor = 0;
589  fDataReady = kFALSE;
590  fBytesReady = 0;
591  fTotalBytes = 0;
592  fAvailablePackages = 0;
593  fEnabledPackages = 0;
594  fRunningDSets = 0;
595 
596  fCollectTimeout = -1;
597 
598  fManager = 0;
599  fQueryMode = kSync;
601 
604  fMergers = 0;
605  fMergersCount = -1;
607  fWorkersToMerge = 0;
609 
610  fPerfTree = "";
611 
612  fWrksOutputReady = 0;
613 
614  fSelector = 0;
615 
616  fPrepTime = 0.;
617 
618  // Check if the user defined a list of environment variables to send over:
619  // include them into the dedicated list
620  if (gSystem->Getenv("PROOF_ENVVARS")) {
621  TString envs(gSystem->Getenv("PROOF_ENVVARS")), env, envsfound;
622  Int_t from = 0;
623  while (envs.Tokenize(env, from, ",")) {
624  if (!env.IsNull()) {
625  if (!gSystem->Getenv(env)) {
626  Warning("Init", "request for sending over undefined environemnt variable '%s' - ignoring", env.Data());
627  } else {
628  if (!envsfound.IsNull()) envsfound += ",";
629  envsfound += env;
630  TProof::DelEnvVar(env);
631  TProof::AddEnvVar(env, gSystem->Getenv(env));
632  }
633  }
634  }
635  if (envsfound.IsNull()) {
636  Warning("Init", "none of the requested env variables were found: '%s'", envs.Data());
637  } else {
638  Info("Init", "the following environment variables have been added to the list to be sent to the nodes: '%s'", envsfound.Data());
639  }
640  }
641 
642  // Done
643  return;
644 }
645 
646 ////////////////////////////////////////////////////////////////////////////////
647 /// Clean up PROOF environment.
648 
650 {
651  if (fChains) {
652  while (TChain *chain = dynamic_cast<TChain*> (fChains->First()) ) {
653  // remove "chain" from list
654  chain->SetProof(0);
655  RemoveChain(chain);
656  }
657  }
658 
659  // remove links to packages enabled on the client
660  if (TestBit(TProof::kIsClient)) {
661  // iterate over all packages
662  TList *epl = fPackMgr->GetListOfEnabled();
663  TIter nxp(epl);
664  while (TObjString *pck = (TObjString *)(nxp())) {
665  FileStat_t stat;
666  if (gSystem->GetPathInfo(pck->String(), stat) == 0) {
667  // check if symlink, if so unlink
668  // NOTE: GetPathInfo() returns 1 in case of symlink that does not point to
669  // existing file or to a directory, but if fIsLink is true the symlink exists
670  if (stat.fIsLink)
671  gSystem->Unlink(pck->String());
672  }
673  }
674  }
675 
676  Close();
702  if (fWrksOutputReady) {
704  delete fWrksOutputReady;
705  }
706 
707  // remove file with redirected logs
708  if (TestBit(TProof::kIsClient)) {
709  if (fLogFileR)
710  fclose(fLogFileR);
711  if (fLogFileW)
712  fclose(fLogFileW);
713  if (fLogFileName.Length() > 0)
715  }
716 
717  // Remove for the global list
718  gROOT->GetListOfProofs()->Remove(this);
719  // ... and from the manager list
720  if (fManager && fManager->IsValid())
721  fManager->DiscardSession(this);
722 
723  if (gProof && gProof == this) {
724  // Set previous as default
725  TIter pvp(gROOT->GetListOfProofs(), kIterBackward);
726  while ((gProof = (TProof *)pvp())) {
727  if (gProof->InheritsFrom(TProof::Class()))
728  break;
729  }
730  }
731 
732  // For those interested in our destruction ...
733  Emit("~TProof()");
734  Emit("CloseWindow()");
735 }
736 
737 ////////////////////////////////////////////////////////////////////////////////
738 /// Start the PROOF environment. Starting PROOF involves either connecting
739 /// to a master server, which in turn will start a set of slave servers, or
740 /// directly starting as master server (if master = ""). For a description
741 /// of the arguments see the TProof ctor. Returns the number of started
742 /// master or slave servers, returns 0 in case of error, in which case
743 /// fValid remains false.
744 
745 Int_t TProof::Init(const char *, const char *conffile,
746  const char *confdir, Int_t loglevel, const char *alias)
747 {
749 
750  fValid = kFALSE;
751 
752  // Connected to terminal?
753  fTty = (isatty(0) == 0 || isatty(1) == 0) ? kFALSE : kTRUE;
754 
755  // If in attach mode, options is filled with additional info
756  Bool_t attach = kFALSE;
757  if (strlen(fUrl.GetOptions()) > 0) {
758  attach = kTRUE;
759  // A flag from the GUI
760  TString opts = fUrl.GetOptions();
761  if (opts.Contains("GUI")) {
763  opts.Remove(opts.Index("GUI"));
764  fUrl.SetOptions(opts);
765  }
766  }
767 
768  if (TestBit(TProof::kIsMaster)) {
769  // Fill default conf file and conf dir
770  if (!conffile || !conffile[0])
772  if (!confdir || !confdir[0])
774  // The group; the client receives it in the kPROOF_SESSIONTAG message
776  } else {
777  fConfDir = confdir;
778  fConfFile = conffile;
779  }
780 
781  // Analysise the conffile field
782  if (fConfFile.Contains("workers=0")) fConfFile.ReplaceAll("workers=0", "masteronly");
784 
786  fLogLevel = loglevel;
789  fImage = fMasterServ ? "" : "<local>";
790  fIntHandler = 0;
791  fStatus = 0;
792  fRecvMessages = new TList;
794  fSlaveInfo = 0;
795  fChains = new TList;
796  fAvailablePackages = 0;
797  fEnabledPackages = 0;
798  fRunningDSets = 0;
800  fInputData = 0;
802  fPrintProgress = 0;
803 
804  // Timeout for some collect actions
805  fCollectTimeout = gEnv->GetValue("Proof.CollectTimeout", -1);
806 
807  // Should the workers be started dynamically; default: no
808  fDynamicStartup = gEnv->GetValue("Proof.DynamicStartup", kFALSE);
809 
810  // Default entry point for the data pool is the master
812  fDataPoolUrl.Form("root://%s", fMaster.Data());
813  else
814  fDataPoolUrl = "";
815 
816  fProgressDialog = 0;
818 
819  // Default alias is the master name
820  TString al = (alias) ? alias : fMaster.Data();
821  SetAlias(al);
822 
823  // Client logging of messages from the master and slaves
824  fRedirLog = kFALSE;
825  if (TestBit(TProof::kIsClient)) {
826  fLogFileName.Form("%s/ProofLog_%d", gSystem->TempDirectory(), gSystem->GetPid());
827  if ((fLogFileW = fopen(fLogFileName, "w")) == 0)
828  Error("Init", "could not create temporary logfile");
829  if ((fLogFileR = fopen(fLogFileName, "r")) == 0)
830  Error("Init", "could not open temp logfile for reading");
831  }
833 
834  // Status of cluster
835  fNotIdle = 0;
836  // Query type
837  fSync = (attach) ? kFALSE : kTRUE;
838  // Not enqueued
839  fIsWaiting = kFALSE;
840 
841  // Counters
842  fBytesRead = 0;
843  fRealTime = 0;
844  fCpuTime = 0;
845 
846  // List of queries
847  fQueries = 0;
848  fOtherQueries = 0;
849  fDrawQueries = 0;
850  fMaxDrawQueries = 1;
851  fSeqNum = 0;
852 
853  // Remote ID of the session
854  fSessionID = -1;
855 
856  // Part of active query
857  fWaitingSlaves = 0;
858 
859  // Make remote PROOF player
860  fPlayer = 0;
861  MakePlayer();
862 
863  fFeedback = new TList;
864  fFeedback->SetOwner();
865  fFeedback->SetName("FeedbackList");
867 
868  // sort slaves by descending performance index
870  fActiveSlaves = new TList;
871  fInactiveSlaves = new TList;
872  fUniqueSlaves = new TList;
873  fAllUniqueSlaves = new TList;
874  fNonUniqueMasters = new TList;
875  fBadSlaves = new TList;
876  fAllMonitor = new TMonitor;
877  fActiveMonitor = new TMonitor;
878  fUniqueMonitor = new TMonitor;
880  fCurrentMonitor = 0;
881 
884 
885  fLoadedMacros = 0;
886  fPackMgr = 0;
887 
888  // Enable optimized sending of streamer infos to use embedded backward/forward
889  // compatibility support between different ROOT versions and different versions of
890  // users classes
891  Bool_t enableSchemaEvolution = gEnv->GetValue("Proof.SchemaEvolution",1);
892  if (enableSchemaEvolution) {
894  } else {
895  Info("TProof", "automatic schema evolution in TMessage explicitly disabled");
896  }
897 
898  if (IsMaster()) {
899  // to make UploadPackage() method work on the master as well.
901  } else {
902 
903  TString sandbox;
904  if (GetSandbox(sandbox, kTRUE) != 0) {
905  Error("Init", "failure asserting sandbox directory %s", sandbox.Data());
906  return 0;
907  }
908 
909  // Package Dir
910  TString packdir = gEnv->GetValue("Proof.PackageDir", "");
911  if (packdir.IsNull())
912  packdir.Form("%s/%s", sandbox.Data(), kPROOF_PackDir);
913  if (AssertPath(packdir, kTRUE) != 0) {
914  Error("Init", "failure asserting directory %s", packdir.Data());
915  return 0;
916  }
917  fPackMgr = new TPackMgr(packdir);
918  if (gDebug > 0)
919  Info("Init", "package directory set to %s", packdir.Data());
920  }
921 
922  if (!IsMaster()) {
923  // List of directories where to look for global packages
924  TString globpack = gEnv->GetValue("Proof.GlobalPackageDirs","");
925  TProofServ::ResolveKeywords(globpack);
926  Int_t nglb = TPackMgr::RegisterGlobalPath(globpack);
927  if (gDebug > 0)
928  Info("Init", " %d global package directories registered", nglb);
929  }
930 
931  // Master may want dynamic startup
932  if (fDynamicStartup) {
933  if (!IsMaster()) {
934  // If on client - start the master
935  if (!StartSlaves(attach))
936  return 0;
937  }
938  } else {
939 
940  // Master Only mode (for operations requiring only the master, e.g. dataset browsing,
941  // result retrieving, ...)
942  Bool_t masterOnly = gEnv->GetValue("Proof.MasterOnly", kFALSE);
943  if (!IsMaster() || !masterOnly) {
944  // Start slaves (the old, static, per-session way)
945  if (!StartSlaves(attach))
946  return 0;
947  // Client: Is Master in dynamic startup mode?
948  if (!IsMaster()) {
949  Int_t dyn = 0;
950  GetRC("Proof.DynamicStartup", dyn);
951  if (dyn != 0) fDynamicStartup = kTRUE;
952  }
953  }
954  }
955  // we are now properly initialized
956  fValid = kTRUE;
957 
958  // De-activate monitor (will be activated in Collect)
960 
961  // By default go into parallel mode
962  Int_t nwrk = GetRemoteProtocol() > 35 ? -1 : 9999;
963  TNamed *n = 0;
964  if (TProof::GetEnvVars() &&
965  (n = (TNamed *) TProof::GetEnvVars()->FindObject("PROOF_NWORKERS"))) {
966  TString s(n->GetTitle());
967  if (s.IsDigit()) nwrk = s.Atoi();
968  }
969  GoParallel(nwrk, attach);
970 
971  // Send relevant initial state to slaves
972  if (!attach)
974  else if (!IsIdle())
975  // redirect log
976  fRedirLog = kTRUE;
977 
978  // Done at this point, the alias will be communicated to the coordinator, if any
980  SetAlias(al);
981 
982  SetActive(kFALSE);
983 
984  if (IsValid()) {
985 
986  // Activate input handler
988 
990  gROOT->GetListOfSockets()->Add(this);
991  }
992 
993  AskParallel();
994 
995  return fActiveSlaves->GetSize();
996 }
997 
998 ////////////////////////////////////////////////////////////////////////////////
999 /// Set the sandbox path from ' Proof.Sandbox' or the alternative var 'rc'.
1000 /// Use the existing setting or the default if nothing is found.
1001 /// If 'assert' is kTRUE, make also sure that the path exists.
1002 /// Return 0 on success, -1 on failure
1003 
1004 Int_t TProof::GetSandbox(TString &sb, Bool_t assert, const char *rc)
1005 {
1006  // Get it from 'rc', if defined
1007  if (rc && strlen(rc)) sb = gEnv->GetValue(rc, sb);
1008  // Or use the default 'rc'
1009  if (sb.IsNull()) sb = gEnv->GetValue("Proof.Sandbox", "");
1010  // If nothing found , use the default
1011  if (sb.IsNull()) sb.Form("~/%s", kPROOF_WorkDir);
1012  // Expand special settings
1013  if (sb == ".") {
1014  sb = gSystem->pwd();
1015  } else if (sb == "..") {
1016  sb = gSystem->DirName(gSystem->pwd());
1017  }
1018  gSystem->ExpandPathName(sb);
1019 
1020  // Assert the path, if required
1021  if (assert && AssertPath(sb, kTRUE) != 0) return -1;
1022  // Done
1023  return 0;
1024 }
1025 
1026 ////////////////////////////////////////////////////////////////////////////////
1027 /// The config file field may contain special instructions which need to be
1028 /// parsed at the beginning, e.g. for debug runs with valgrind.
1029 /// Several options can be given separated by a ','
1030 
1031 void TProof::ParseConfigField(const char *config)
1032 {
1033  TString sconf(config), opt;
1034  Ssiz_t from = 0;
1035  Bool_t cpuPin = kFALSE;
1036 
1037  // Analysise the field
1038  const char *cq = (IsLite()) ? "\"" : "";
1039  while (sconf.Tokenize(opt, from, ",")) {
1040  if (opt.IsNull()) continue;
1041 
1042  if (opt.BeginsWith("valgrind")) {
1043  // Any existing valgrind setting? User can give full settings, which we fully respect,
1044  // or pass additional options for valgrind by prefixing 'valgrind_opts:'. For example,
1045  // TProof::AddEnvVar("PROOF_MASTER_WRAPPERCMD", "valgrind_opts:--time-stamp --leak-check=full"
1046  // will add option "--time-stamp --leak-check=full" to our default options
1047  TString mst, top, sub, wrk, all;
1048  TList *envs = fgProofEnvList;
1049  TNamed *n = 0;
1050  if (envs) {
1051  if ((n = (TNamed *) envs->FindObject("PROOF_WRAPPERCMD")))
1052  all = n->GetTitle();
1053  if ((n = (TNamed *) envs->FindObject("PROOF_MASTER_WRAPPERCMD")))
1054  mst = n->GetTitle();
1055  if ((n = (TNamed *) envs->FindObject("PROOF_TOPMASTER_WRAPPERCMD")))
1056  top = n->GetTitle();
1057  if ((n = (TNamed *) envs->FindObject("PROOF_SUBMASTER_WRAPPERCMD")))
1058  sub = n->GetTitle();
1059  if ((n = (TNamed *) envs->FindObject("PROOF_SLAVE_WRAPPERCMD")))
1060  wrk = n->GetTitle();
1061  }
1062  if (all != "" && mst == "") mst = all;
1063  if (all != "" && top == "") top = all;
1064  if (all != "" && sub == "") sub = all;
1065  if (all != "" && wrk == "") wrk = all;
1066  if (all != "" && all.BeginsWith("valgrind_opts:")) {
1067  // The field is used to add an option Reset the setting
1068  Info("ParseConfigField","valgrind run: resetting 'PROOF_WRAPPERCMD':"
1069  " must be set again for next run , if any");
1070  TProof::DelEnvVar("PROOF_WRAPPERCMD");
1071  }
1072  TString var, cmd;
1073  cmd.Form("%svalgrind -v --suppressions=<rootsys>/etc/valgrind-root.supp", cq);
1074  TString mstlab("NO"), wrklab("NO");
1075  Bool_t doMaster = (opt == "valgrind" || (opt.Contains("master") &&
1076  !opt.Contains("topmaster") && !opt.Contains("submaster")))
1077  ? kTRUE : kFALSE;
1078  if (doMaster) {
1079  if (!IsLite()) {
1080  // Check if we have to add a var
1081  if (mst == "" || mst.BeginsWith("valgrind_opts:")) {
1082  mst.ReplaceAll("valgrind_opts:","");
1083  var.Form("%s --log-file=<logfilemst>.valgrind.log %s", cmd.Data(), mst.Data());
1084  TProof::AddEnvVar("PROOF_MASTER_WRAPPERCMD", var);
1085  mstlab = "YES";
1086  } else if (mst != "") {
1087  mstlab = "YES";
1088  }
1089  } else {
1090  if (opt.Contains("master")) {
1091  Warning("ParseConfigField",
1092  "master valgrinding does not make sense for PROOF-Lite: ignoring");
1093  opt.ReplaceAll("master", "");
1094  if (!opt.Contains("workers")) return;
1095  }
1096  if (opt == "valgrind" || opt == "valgrind=") opt = "valgrind=workers";
1097  }
1098  }
1099  if (opt.Contains("topmaster")) {
1100  // Check if we have to add a var
1101  if (top == "" || top.BeginsWith("valgrind_opts:")) {
1102  top.ReplaceAll("valgrind_opts:","");
1103  var.Form("%s --log-file=<logfilemst>.valgrind.log %s", cmd.Data(), top.Data());
1104  TProof::AddEnvVar("PROOF_TOPMASTER_WRAPPERCMD", var);
1105  mstlab = "YES";
1106  } else if (top != "") {
1107  mstlab = "YES";
1108  }
1109  }
1110  if (opt.Contains("submaster")) {
1111  // Check if we have to add a var
1112  if (sub == "" || sub.BeginsWith("valgrind_opts:")) {
1113  sub.ReplaceAll("valgrind_opts:","");
1114  var.Form("%s --log-file=<logfilemst>.valgrind.log %s", cmd.Data(), sub.Data());
1115  TProof::AddEnvVar("PROOF_SUBMASTER_WRAPPERCMD", var);
1116  mstlab = "YES";
1117  } else if (sub != "") {
1118  mstlab = "YES";
1119  }
1120  }
1121  if (opt.Contains("=workers") || opt.Contains("+workers")) {
1122  // Check if we have to add a var
1123  if (wrk == "" || wrk.BeginsWith("valgrind_opts:")) {
1124  wrk.ReplaceAll("valgrind_opts:","");
1125  var.Form("%s --log-file=<logfilewrk>.__valgrind__.log %s%s", cmd.Data(), wrk.Data(), cq);
1126  TProof::AddEnvVar("PROOF_SLAVE_WRAPPERCMD", var);
1127  TString nwrks("2");
1128  Int_t inw = opt.Index('#');
1129  if (inw != kNPOS) {
1130  nwrks = opt(inw+1, opt.Length());
1131  if (!nwrks.IsDigit()) nwrks = "2";
1132  }
1133  // Set the relevant variables
1134  if (!IsLite()) {
1135  TProof::AddEnvVar("PROOF_NWORKERS", nwrks);
1136  } else {
1137  gEnv->SetValue("ProofLite.Workers", nwrks.Atoi());
1138  }
1139  wrklab = nwrks;
1140  // Register the additional worker log in the session file
1141  // (for the master this is done automatically)
1142  TProof::AddEnvVar("PROOF_ADDITIONALLOG", "__valgrind__.log*");
1143  } else if (wrk != "") {
1144  wrklab = "ALL";
1145  }
1146  }
1147  // Increase the relevant timeouts
1148  if (!IsLite()) {
1149  TProof::AddEnvVar("PROOF_INTWAIT", "5000");
1150  gEnv->SetValue("Proof.SocketActivityTimeout", 6000);
1151  } else {
1152  gEnv->SetValue("ProofLite.StartupTimeOut", 5000);
1153  }
1154  // Warn for slowness
1155  Printf(" ");
1156  if (!IsLite()) {
1157  Printf(" ---> Starting a debug run with valgrind (master:%s, workers:%s)", mstlab.Data(), wrklab.Data());
1158  } else {
1159  Printf(" ---> Starting a debug run with valgrind (workers:%s)", wrklab.Data());
1160  }
1161  Printf(" ---> Please be patient: startup may be VERY slow ...");
1162  Printf(" ---> Logs will be available as special tags in the log window (from the progress dialog or TProof::LogViewer()) ");
1163  Printf(" ---> (Reminder: this debug run makes sense only if you are running a debug version of ROOT)");
1164  Printf(" ");
1165 
1166  } else if (opt.BeginsWith("igprof-pp")) {
1167 
1168  // IgProf profiling on master and worker. PROOF does not set the
1169  // environment for you: proper environment variables (like PATH and
1170  // LD_LIBRARY_PATH) should be set externally
1171 
1172  Printf("*** Requested IgProf performance profiling ***");
1173  TString addLogExt = "__igprof.pp__.log";
1174  TString addLogFmt = "igprof -pk -pp -t proofserv.exe -o %s.%s";
1175  TString tmp;
1176 
1177  if (IsLite()) {
1178  addLogFmt.Append("\"");
1179  addLogFmt.Prepend("\"");
1180  }
1181 
1182  tmp.Form(addLogFmt.Data(), "<logfilemst>", addLogExt.Data());
1183  TProof::AddEnvVar("PROOF_MASTER_WRAPPERCMD", tmp.Data());
1184 
1185  tmp.Form(addLogFmt.Data(), "<logfilewrk>", addLogExt.Data());
1186  TProof::AddEnvVar("PROOF_SLAVE_WRAPPERCMD", tmp.Data() );
1187 
1188  TProof::AddEnvVar("PROOF_ADDITIONALLOG", addLogExt.Data());
1189 
1190  } else if (opt.BeginsWith("cpupin=")) {
1191  // Enable CPU pinning. Takes as argument the list of processor IDs
1192  // that will be used in order. Processor IDs are numbered from 0,
1193  // use likwid to see how they are organized. A possible parameter
1194  // format would be:
1195  //
1196  // cpupin=3+4+0+9+10+22+7
1197  //
1198  // Only the specified processor IDs will be used in a round-robin
1199  // fashion, dealing with the fact that you can request more workers
1200  // than the number of processor IDs you have specified.
1201  //
1202  // To use all available processors in their order:
1203  //
1204  // cpupin=*
1205 
1206  opt.Remove(0, 7);
1207 
1208  // Remove any char which is neither a number nor a plus '+'
1209  for (Ssiz_t i=0; i<opt.Length(); i++) {
1210  Char_t c = opt[i];
1211  if ((c != '+') && ((c < '0') || (c > '9')))
1212  opt[i] = '_';
1213  }
1214  opt.ReplaceAll("_", "");
1215  TProof::AddEnvVar("PROOF_SLAVE_CPUPIN_ORDER", opt);
1216  cpuPin = kTRUE;
1217  } else if (opt.BeginsWith("workers=")) {
1218 
1219  // Request for a given number of workers (within the max) or worker
1220  // startup combination:
1221  // workers=5 start max 5 workers (or less, if less are assigned)
1222  // workers=2x start max 2 workers per node (or less, if less are assigned)
1223  opt.ReplaceAll("workers=","");
1224  TProof::AddEnvVar("PROOF_NWORKERS", opt);
1225  }
1226  }
1227 
1228  // In case of PROOF-Lite, enable CPU pinning when requested (Linux only)
1229  #ifdef R__LINUX
1230  if (IsLite() && cpuPin) {
1231  Printf("*** Requested CPU pinning ***");
1232  const TList *ev = GetEnvVars();
1233  const char *pinCmd = "taskset -c <cpupin>";
1234  TString val;
1235  TNamed *p;
1236  if (ev && (p = dynamic_cast<TNamed *>(ev->FindObject("PROOF_SLAVE_WRAPPERCMD")))) {
1237  val = p->GetTitle();
1238  val.Insert(val.Length()-1, " ");
1239  val.Insert(val.Length()-1, pinCmd);
1240  }
1241  else {
1242  val.Form("\"%s\"", pinCmd);
1243  }
1244  TProof::AddEnvVar("PROOF_SLAVE_WRAPPERCMD", val.Data());
1245  }
1246  #endif
1247 }
1248 
1249 ////////////////////////////////////////////////////////////////////////////////
1250 /// Make sure that 'path' exists; if 'writable' is kTRUE, make also sure
1251 /// that the path is writable
1252 
1253 Int_t TProof::AssertPath(const char *inpath, Bool_t writable)
1254 {
1255  if (!inpath || strlen(inpath) <= 0) {
1256  Error("AssertPath", "undefined input path");
1257  return -1;
1258  }
1259 
1260  TString path(inpath);
1261  gSystem->ExpandPathName(path);
1262 
1263  if (gSystem->AccessPathName(path, kFileExists)) {
1264  if (gSystem->mkdir(path, kTRUE) != 0) {
1265  Error("AssertPath", "could not create path %s", path.Data());
1266  return -1;
1267  }
1268  }
1269  // It must be writable
1270  if (gSystem->AccessPathName(path, kWritePermission) && writable) {
1271  if (gSystem->Chmod(path, 0666) != 0) {
1272  Error("AssertPath", "could not make path %s writable", path.Data());
1273  return -1;
1274  }
1275  }
1276 
1277  // Done
1278  return 0;
1279 }
1280 
1281 ////////////////////////////////////////////////////////////////////////////////
1282 /// Set manager and schedule its destruction after this for clean
1283 /// operations.
1284 
1286 {
1287  fManager = mgr;
1288 
1289  if (mgr) {
1291  gROOT->GetListOfSockets()->Remove(mgr);
1292  gROOT->GetListOfSockets()->Add(mgr);
1293  }
1294 }
1295 
1296 ////////////////////////////////////////////////////////////////////////////////
1297 /// Works on the master node only.
1298 /// It starts workers on the machines in workerList and sets the paths,
1299 /// packages and macros as on the master.
1300 /// It is a subbstitute for StartSlaves(...)
1301 /// The code is mostly the master part of StartSlaves,
1302 /// with the parallel startup removed.
1303 
1305 {
1306  if (!IsMaster()) {
1307  Error("AddWorkers", "AddWorkers can only be called on the master!");
1308  return -1;
1309  }
1310 
1311  if (!workerList || !(workerList->GetSize())) {
1312  Error("AddWorkers", "empty list of workers!");
1313  return -2;
1314  }
1315 
1316  // Code taken from master part of StartSlaves with the parllel part removed
1317 
1318  fImage = gProofServ->GetImage();
1319  if (fImage.IsNull())
1320  fImage.Form("%s:%s", TUrl(gSystem->HostName()).GetHostFQDN(), gProofServ->GetWorkDir());
1321 
1322  // Get all workers
1323  UInt_t nSlaves = workerList->GetSize();
1324  UInt_t nSlavesDone = 0;
1325  Int_t ord = 0;
1326 
1327  // Loop over all new workers and start them (if we had already workers it means we are
1328  // increasing parallelism or that is not the first time we are called)
1329  Bool_t goMoreParallel = (fSlaves->GetEntries() > 0) ? kTRUE : kFALSE;
1330 
1331  // A list of TSlave objects for workers that are being added
1332  TList *addedWorkers = new TList();
1333  if (!addedWorkers) {
1334  // This is needed to silence Coverity ...
1335  Error("AddWorkers", "cannot create new list for the workers to be added");
1336  return -2;
1337  }
1338  addedWorkers->SetOwner(kFALSE);
1339  TListIter next(workerList);
1340  TObject *to;
1341  TProofNodeInfo *worker;
1342  TSlaveInfo *dummysi = new TSlaveInfo();
1343  while ((to = next())) {
1344  // Get the next worker from the list
1345  worker = (TProofNodeInfo *)to;
1346 
1347  // Read back worker node info
1348  const Char_t *image = worker->GetImage().Data();
1349  const Char_t *workdir = worker->GetWorkDir().Data();
1350  Int_t perfidx = worker->GetPerfIndex();
1351  Int_t sport = worker->GetPort();
1352  if (sport == -1)
1353  sport = fUrl.GetPort();
1354 
1355  // Create worker server
1356  TString fullord;
1357  if (worker->GetOrdinal().Length() > 0) {
1358  fullord.Form("%s.%s", gProofServ->GetOrdinal(), worker->GetOrdinal().Data());
1359  } else {
1360  fullord.Form("%s.%d", gProofServ->GetOrdinal(), ord);
1361  }
1362 
1363  // Remove worker from the list of workers terminated gracefully
1364  dummysi->SetOrdinal(fullord);
1365  TSlaveInfo *rmsi = (TSlaveInfo *)fTerminatedSlaveInfos->Remove(dummysi);
1366  SafeDelete(rmsi);
1367 
1368  // Create worker server
1369  TString wn(worker->GetNodeName());
1370  if (wn == "localhost" || wn.BeginsWith("localhost.")) wn = gSystem->HostName();
1371  TUrl u(TString::Format("%s:%d", wn.Data(), sport));
1372  // Add group info in the password firdl, if any
1373  if (strlen(gProofServ->GetGroup()) > 0) {
1374  // Set also the user, otherwise the password is not exported
1375  if (strlen(u.GetUser()) <= 0)
1376  u.SetUser(gProofServ->GetUser());
1378  }
1379  TSlave *slave = 0;
1380  if (worker->IsWorker()) {
1381  slave = CreateSlave(u.GetUrl(), fullord, perfidx, image, workdir);
1382  } else {
1383  slave = CreateSubmaster(u.GetUrl(), fullord,
1384  image, worker->GetMsd(), worker->GetNWrks());
1385  }
1386 
1387  // Add to global list (we will add to the monitor list after
1388  // finalizing the server startup)
1389  Bool_t slaveOk = kTRUE;
1390  fSlaves->Add(slave);
1391  if (slave->IsValid()) {
1392  addedWorkers->Add(slave);
1393  } else {
1394  slaveOk = kFALSE;
1395  fBadSlaves->Add(slave);
1396  Warning("AddWorkers", "worker '%s' is invalid", slave->GetOrdinal());
1397  }
1398 
1399  PDB(kGlobal,3)
1400  Info("AddWorkers", "worker on host %s created"
1401  " and added to list (ord: %s)", worker->GetName(), slave->GetOrdinal());
1402 
1403  // Notify opening of connection
1404  nSlavesDone++;
1406  m << TString("Opening connections to workers") << nSlaves
1407  << nSlavesDone << slaveOk;
1408  gProofServ->GetSocket()->Send(m);
1409 
1410  ord++;
1411  } //end of the worker loop
1412  SafeDelete(dummysi);
1413 
1414  // Cleanup
1415  SafeDelete(workerList);
1416 
1417  nSlavesDone = 0;
1418 
1419  // Here we finalize the server startup: in this way the bulk
1420  // of remote operations are almost parallelized
1421  TIter nxsl(addedWorkers);
1422  TSlave *sl = 0;
1423  while ((sl = (TSlave *) nxsl())) {
1424 
1425  // Finalize setup of the server
1426  if (sl->IsValid())
1427  sl->SetupServ(TSlave::kSlave, 0);
1428 
1429  // Monitor good slaves
1430  Bool_t slaveOk = kTRUE;
1431  if (sl->IsValid()) {
1432  fAllMonitor->Add(sl->GetSocket());
1433  PDB(kGlobal,3)
1434  Info("AddWorkers", "worker on host %s finalized"
1435  " and added to list", sl->GetOrdinal());
1436  } else {
1437  slaveOk = kFALSE;
1438  fBadSlaves->Add(sl);
1439  }
1440 
1441  // Notify end of startup operations
1442  nSlavesDone++;
1444  m << TString("Setting up worker servers") << nSlaves
1445  << nSlavesDone << slaveOk;
1446  gProofServ->GetSocket()->Send(m);
1447  }
1448 
1449  // Now set new state on the added workers (on all workers for simplicity)
1450  // use fEnabledPackages, fLoadedMacros,
1451  // gSystem->GetDynamicPath() and gSystem->GetIncludePath()
1452  // no need to load packages that are only loaded and not enabled (dyn mode)
1453  Int_t nwrk = GetRemoteProtocol() > 35 ? -1 : 9999;
1454  TNamed *n = 0;
1455  if (TProof::GetEnvVars() &&
1456  (n = (TNamed *) TProof::GetEnvVars()->FindObject("PROOF_NWORKERS"))) {
1457  TString s(n->GetTitle());
1458  if (s.IsDigit()) nwrk = s.Atoi();
1459  }
1460 
1461  if (fDynamicStartup && goMoreParallel) {
1462 
1463  PDB(kGlobal, 3)
1464  Info("AddWorkers", "will invoke GoMoreParallel()");
1465  Int_t nw = GoMoreParallel(nwrk);
1466  PDB(kGlobal, 3)
1467  Info("AddWorkers", "GoMoreParallel()=%d", nw);
1468 
1469  }
1470  else {
1471  // Not in Dynamic Workers mode
1472  PDB(kGlobal, 3)
1473  Info("AddWorkers", "will invoke GoParallel()");
1474  GoParallel(nwrk, kFALSE, 0);
1475  }
1476 
1477  // Set worker processing environment
1478  SetupWorkersEnv(addedWorkers, goMoreParallel);
1479 
1480  // Update list of current workers
1481  PDB(kGlobal, 3)
1482  Info("AddWorkers", "will invoke SaveWorkerInfo()");
1483  SaveWorkerInfo();
1484 
1485  // Inform the client that the number of workers has changed
1486  if (fDynamicStartup && gProofServ) {
1487  PDB(kGlobal, 3)
1488  Info("AddWorkers", "will invoke SendParallel()");
1490 
1491  if (goMoreParallel && fPlayer) {
1492  // In case we are adding workers dynamically to an existing process, we
1493  // should invoke a special player's Process() to set only added workers
1494  // to the proper state
1495  PDB(kGlobal, 3)
1496  Info("AddWorkers", "will send the PROCESS message to selected workers");
1497  fPlayer->JoinProcess(addedWorkers);
1498  // Update merger counters (new workers are not yet active)
1499  fMergePrg.SetNWrks(fActiveSlaves->GetSize() + addedWorkers->GetSize());
1500  }
1501  }
1502 
1503  // Cleanup
1504  delete addedWorkers;
1505 
1506  return 0;
1507 }
1508 
1509 ////////////////////////////////////////////////////////////////////////////////
1510 /// Set up packages, loaded macros, include and lib paths ...
1511 
1512 void TProof::SetupWorkersEnv(TList *addedWorkers, Bool_t increasingWorkers)
1513 {
1514  // Packages
1516  if (packs && packs->GetSize() > 0) {
1517  TIter nxp(packs);
1518  TPair *pck = 0;
1519  while ((pck = (TPair *) nxp())) {
1520  // Upload and Enable methods are intelligent and avoid
1521  // re-uploading or re-enabling of a package to a node that has it.
1522  if (fDynamicStartup && increasingWorkers) {
1523  // Upload only on added workers
1524  PDB(kGlobal, 3)
1525  Info("SetupWorkersEnv", "will invoke UploadPackage() and EnablePackage() on added workers");
1526  if (UploadPackage(pck->GetName(), kUntar, addedWorkers) >= 0)
1527  EnablePackage(pck->GetName(), (TList *) pck->Value(), kTRUE, addedWorkers);
1528  } else {
1529  PDB(kGlobal, 3)
1530  Info("SetupWorkersEnv", "will invoke UploadPackage() and EnablePackage() on all workers");
1531  if (UploadPackage(pck->GetName()) >= 0)
1532  EnablePackage(pck->GetName(), (TList *) pck->Value(), kTRUE);
1533  }
1534  }
1535  }
1536 
1537  // Loaded macros
1538  if (fLoadedMacros) {
1539  TIter nxp(fLoadedMacros);
1540  TObjString *os = 0;
1541  while ((os = (TObjString *) nxp())) {
1542  PDB(kGlobal, 3) {
1543  Info("SetupWorkersEnv", "will invoke Load() on selected workers");
1544  Printf("Loading a macro : %s", os->GetName());
1545  }
1546  Load(os->GetName(), kTRUE, kTRUE, addedWorkers);
1547  }
1548  }
1549 
1550  // Dynamic path
1551  TString dyn = gSystem->GetDynamicPath();
1552  dyn.ReplaceAll(":", " ");
1553  dyn.ReplaceAll("\"", " ");
1554  PDB(kGlobal, 3)
1555  Info("SetupWorkersEnv", "will invoke AddDynamicPath() on selected workers");
1556  AddDynamicPath(dyn, kFALSE, addedWorkers, kFALSE); // Do not Collect
1557 
1558  // Include path
1559  TString inc = gSystem->GetIncludePath();
1560  inc.ReplaceAll("-I", " ");
1561  inc.ReplaceAll("\"", " ");
1562  PDB(kGlobal, 3)
1563  Info("SetupWorkersEnv", "will invoke AddIncludePath() on selected workers");
1564  AddIncludePath(inc, kFALSE, addedWorkers, kFALSE); // Do not Collect
1565 
1566  // Done
1567  return;
1568 }
1569 
1570 ////////////////////////////////////////////////////////////////////////////////
1571 /// Used for shuting down the workres after a query is finished.
1572 /// Sends each of the workers from the workerList, a kPROOF_STOP message.
1573 /// If the workerList == 0, shutdown all the workers.
1574 
1576 {
1577  if (!IsMaster()) {
1578  Error("RemoveWorkers", "RemoveWorkers can only be called on the master!");
1579  return -1;
1580  }
1581 
1582  fFileMap.clear(); // This could be avoided if CopyFromCache was used in SendFile
1583 
1584  if (!workerList) {
1585  // shutdown all the workers
1586  TIter nxsl(fSlaves);
1587  TSlave *sl = 0;
1588  while ((sl = (TSlave *) nxsl())) {
1589  // Shut down the worker assumig that it is not processing
1590  TerminateWorker(sl);
1591  }
1592 
1593  } else {
1594  if (!(workerList->GetSize())) {
1595  Error("RemoveWorkers", "The list of workers should not be empty!");
1596  return -2;
1597  }
1598 
1599  // Loop over all the workers and stop them
1600  TListIter next(workerList);
1601  TObject *to;
1602  TProofNodeInfo *worker;
1603  while ((to = next())) {
1604  TSlave *sl = 0;
1605  if (!strcmp(to->ClassName(), "TProofNodeInfo")) {
1606  // Get the next worker from the list
1607  worker = (TProofNodeInfo *)to;
1608  TIter nxsl(fSlaves);
1609  while ((sl = (TSlave *) nxsl())) {
1610  // Shut down the worker assumig that it is not processing
1611  if (sl->GetName() == worker->GetNodeName())
1612  break;
1613  }
1614  } else if (to->InheritsFrom(TSlave::Class())) {
1615  sl = (TSlave *) to;
1616  } else {
1617  Warning("RemoveWorkers","unknown object type: %s - it should be"
1618  " TProofNodeInfo or inheriting from TSlave", to->ClassName());
1619  }
1620  // Shut down the worker assumig that it is not processing
1621  if (sl) {
1622  if (gDebug > 0)
1623  Info("RemoveWorkers","terminating worker %s", sl->GetOrdinal());
1624  TerminateWorker(sl);
1625  }
1626  }
1627  }
1628 
1629  // Update also the master counter
1630  if (gProofServ && fSlaves->GetSize() <= 0) gProofServ->ReleaseWorker("master");
1631 
1632  return 0;
1633 }
1634 
1635 ////////////////////////////////////////////////////////////////////////////////
1636 /// Start up PROOF slaves.
1637 
1639 {
1640  // If this is a master server, find the config file and start slave
1641  // servers as specified in the config file
1642  if (TestBit(TProof::kIsMaster)) {
1643 
1644  Int_t pc = 0;
1645  TList *workerList = new TList;
1646  // Get list of workers
1647  if (gProofServ->GetWorkers(workerList, pc) == TProofServ::kQueryStop) {
1648  TString emsg("no resource currently available for this session: please retry later");
1649  if (gDebug > 0) Info("StartSlaves", "%s", emsg.Data());
1650  gProofServ->SendAsynMessage(emsg.Data());
1651  return kFALSE;
1652  }
1653  // Setup the workers
1654  if (AddWorkers(workerList) < 0)
1655  return kFALSE;
1656 
1657  } else {
1658 
1659  // create master server
1660  Printf("Starting master: opening connection ...");
1661  TSlave *slave = CreateSubmaster(fUrl.GetUrl(), "0", "master", 0);
1662 
1663  if (slave->IsValid()) {
1664 
1665  // Notify
1666  fprintf(stderr,"Starting master:"
1667  " connection open: setting up server ... \r");
1668  StartupMessage("Connection to master opened", kTRUE, 1, 1);
1669 
1670  if (!attach) {
1671 
1672  // Set worker interrupt handler
1673  slave->SetInterruptHandler(kTRUE);
1674 
1675  // Finalize setup of the server
1677 
1678  if (slave->IsValid()) {
1679 
1680  // Notify
1681  Printf("Starting master: OK ");
1682  StartupMessage("Master started", kTRUE, 1, 1);
1683 
1684  // check protocol compatibility
1685  // protocol 1 is not supported anymore
1686  if (fProtocol == 1) {
1687  Error("StartSlaves",
1688  "client and remote protocols not compatible (%d and %d)",
1690  slave->Close("S");
1691  delete slave;
1692  return kFALSE;
1693  }
1694 
1695  fSlaves->Add(slave);
1696  fAllMonitor->Add(slave->GetSocket());
1697 
1698  // Unset worker interrupt handler
1699  slave->SetInterruptHandler(kFALSE);
1700 
1701  // Set interrupt PROOF handler from now on
1702  fIntHandler = new TProofInterruptHandler(this);
1703 
1704  // Give-up after 5 minutes
1705  Int_t rc = Collect(slave, 300);
1706  Int_t slStatus = slave->GetStatus();
1707  if (slStatus == -99 || slStatus == -98 || rc == 0) {
1708  fSlaves->Remove(slave);
1709  fAllMonitor->Remove(slave->GetSocket());
1710  if (slStatus == -99)
1711  Error("StartSlaves", "no resources available or problems setting up workers (check logs)");
1712  else if (slStatus == -98)
1713  Error("StartSlaves", "could not setup output redirection on master");
1714  else
1715  Error("StartSlaves", "setting up master");
1716  slave->Close("S");
1717  delete slave;
1718  return 0;
1719  }
1720 
1721  if (!slave->IsValid()) {
1722  fSlaves->Remove(slave);
1723  fAllMonitor->Remove(slave->GetSocket());
1724  slave->Close("S");
1725  delete slave;
1726  Error("StartSlaves",
1727  "failed to setup connection with PROOF master server");
1728  return kFALSE;
1729  }
1730 
1731  if (!gROOT->IsBatch() && TestBit(kUseProgressDialog)) {
1732  if ((fProgressDialog =
1733  gROOT->GetPluginManager()->FindHandler("TProofProgressDialog")))
1734  if (fProgressDialog->LoadPlugin() == -1)
1735  fProgressDialog = 0;
1736  }
1737  } else {
1738  // Notify
1739  Printf("Starting master: failure");
1740  }
1741  } else {
1742 
1743  // Notify
1744  Printf("Starting master: OK ");
1745  StartupMessage("Master attached", kTRUE, 1, 1);
1746 
1747  if (!gROOT->IsBatch() && TestBit(kUseProgressDialog)) {
1748  if ((fProgressDialog =
1749  gROOT->GetPluginManager()->FindHandler("TProofProgressDialog")))
1750  if (fProgressDialog->LoadPlugin() == -1)
1751  fProgressDialog = 0;
1752  }
1753 
1754  fSlaves->Add(slave);
1755  fIntHandler = new TProofInterruptHandler(this);
1756  }
1757 
1758  } else {
1759  delete slave;
1760  // Notify only if verbosity is on: most likely the failure has already been notified
1761  if (gDebug > 0)
1762  Error("StartSlaves", "failed to create (or connect to) the PROOF master server");
1763  return kFALSE;
1764  }
1765  }
1766 
1767  return kTRUE;
1768 }
1769 
1770 ////////////////////////////////////////////////////////////////////////////////
1771 /// Close all open slave servers.
1772 /// Client can decide to shutdown the remote session by passing option is 'S'
1773 /// or 's'. Default for clients is detach, if supported. Masters always
1774 /// shutdown the remote counterpart.
1775 
1777 {
1778  { std::lock_guard<std::recursive_mutex> lock(fCloseMutex);
1779 
1780  fValid = kFALSE;
1781  if (fSlaves) {
1782  if (fIntHandler)
1783  fIntHandler->Remove();
1784 
1785  TIter nxs(fSlaves);
1786  TSlave *sl = 0;
1787  while ((sl = (TSlave *)nxs()))
1788  sl->Close(opt);
1789 
1790  fActiveSlaves->Clear("nodelete");
1791  fUniqueSlaves->Clear("nodelete");
1792  fAllUniqueSlaves->Clear("nodelete");
1793  fNonUniqueMasters->Clear("nodelete");
1794  fBadSlaves->Clear("nodelete");
1795  fInactiveSlaves->Clear("nodelete");
1796  fSlaves->Delete();
1797  }
1798  }
1799 
1801  gROOT->GetListOfSockets()->Remove(this);
1802 
1803  if (fChains) {
1804  while (TChain *chain = dynamic_cast<TChain*> (fChains->First()) ) {
1805  // remove "chain" from list
1806  chain->SetProof(0);
1807  RemoveChain(chain);
1808  }
1809  }
1810 
1811  if (IsProofd()) {
1812 
1813  gROOT->GetListOfProofs()->Remove(this);
1814  if (gProof && gProof == this) {
1815  // Set previous proofd-related as default
1816  TIter pvp(gROOT->GetListOfProofs(), kIterBackward);
1817  while ((gProof = (TProof *)pvp())) {
1818  if (gProof->IsProofd())
1819  break;
1820  }
1821  }
1822  }
1823  }
1824 }
1825 
1826 ////////////////////////////////////////////////////////////////////////////////
1827 /// Create a new TSlave of type TSlave::kSlave.
1828 /// Note: creation of TSlave is private with TProof as a friend.
1829 /// Derived classes must use this function to create slaves.
1830 
1831 TSlave *TProof::CreateSlave(const char *url, const char *ord,
1832  Int_t perf, const char *image, const char *workdir)
1833 {
1834  TSlave* sl = TSlave::Create(url, ord, perf, image,
1835  this, TSlave::kSlave, workdir, 0);
1836 
1837  if (sl->IsValid()) {
1838  sl->SetInputHandler(new TProofInputHandler(this, sl->GetSocket()));
1839  // must set fParallel to 1 for slaves since they do not
1840  // report their fParallel with a LOG_DONE message
1841  sl->fParallel = 1;
1842  }
1843 
1844  return sl;
1845 }
1846 
1847 
1848 ////////////////////////////////////////////////////////////////////////////////
1849 /// Create a new TSlave of type TSlave::kMaster.
1850 /// Note: creation of TSlave is private with TProof as a friend.
1851 /// Derived classes must use this function to create slaves.
1852 
1853 TSlave *TProof::CreateSubmaster(const char *url, const char *ord,
1854  const char *image, const char *msd, Int_t nwk)
1855 {
1856  TSlave *sl = TSlave::Create(url, ord, 100, image, this,
1857  TSlave::kMaster, 0, msd, nwk);
1858 
1859  if (sl->IsValid()) {
1860  sl->SetInputHandler(new TProofInputHandler(this, sl->GetSocket()));
1861  }
1862 
1863  return sl;
1864 }
1865 
1866 ////////////////////////////////////////////////////////////////////////////////
1867 /// Find slave that has TSocket s. Returns 0 in case slave is not found.
1868 
1870 {
1871  TSlave *sl;
1872  TIter next(fSlaves);
1873 
1874  while ((sl = (TSlave *)next())) {
1875  if (sl->IsValid() && sl->GetSocket() == s)
1876  return sl;
1877  }
1878  return 0;
1879 }
1880 
1881 ////////////////////////////////////////////////////////////////////////////////
1882 /// Add to the fUniqueSlave list the active slaves that have a unique
1883 /// (user) file system image. This information is used to transfer files
1884 /// only once to nodes that share a file system (an image). Submasters
1885 /// which are not in fUniqueSlaves are put in the fNonUniqueMasters
1886 /// list. That list is used to trigger the transferring of files to
1887 /// the submaster's unique slaves without the need to transfer the file
1888 /// to the submaster.
1889 
1891 {
1892  fUniqueSlaves->Clear();
1897 
1898  TIter next(fActiveSlaves);
1899 
1900  while (TSlave *sl = dynamic_cast<TSlave*>(next())) {
1901  if (fImage == sl->fImage) {
1902  if (sl->GetSlaveType() == TSlave::kMaster) {
1903  fNonUniqueMasters->Add(sl);
1904  fAllUniqueSlaves->Add(sl);
1905  fAllUniqueMonitor->Add(sl->GetSocket());
1906  }
1907  continue;
1908  }
1909 
1910  TIter next2(fUniqueSlaves);
1911  TSlave *replace_slave = 0;
1912  Bool_t add = kTRUE;
1913  while (TSlave *sl2 = dynamic_cast<TSlave*>(next2())) {
1914  if (sl->fImage == sl2->fImage) {
1915  add = kFALSE;
1916  if (sl->GetSlaveType() == TSlave::kMaster) {
1917  if (sl2->GetSlaveType() == TSlave::kSlave) {
1918  // give preference to master
1919  replace_slave = sl2;
1920  add = kTRUE;
1921  } else if (sl2->GetSlaveType() == TSlave::kMaster) {
1922  fNonUniqueMasters->Add(sl);
1923  fAllUniqueSlaves->Add(sl);
1924  fAllUniqueMonitor->Add(sl->GetSocket());
1925  } else {
1926  Error("FindUniqueSlaves", "TSlave is neither Master nor Slave");
1927  R__ASSERT(0);
1928  }
1929  }
1930  break;
1931  }
1932  }
1933 
1934  if (add) {
1935  fUniqueSlaves->Add(sl);
1936  fAllUniqueSlaves->Add(sl);
1937  fUniqueMonitor->Add(sl->GetSocket());
1938  fAllUniqueMonitor->Add(sl->GetSocket());
1939  if (replace_slave) {
1940  fUniqueSlaves->Remove(replace_slave);
1941  fAllUniqueSlaves->Remove(replace_slave);
1942  fUniqueMonitor->Remove(replace_slave->GetSocket());
1943  fAllUniqueMonitor->Remove(replace_slave->GetSocket());
1944  }
1945  }
1946  }
1947 
1948  // will be actiavted in Collect()
1951 }
1952 
1953 ////////////////////////////////////////////////////////////////////////////////
1954 /// Return number of slaves as described in the config file.
1955 
1957 {
1958  return fSlaves->GetSize();
1959 }
1960 
1961 ////////////////////////////////////////////////////////////////////////////////
1962 /// Return number of active slaves, i.e. slaves that are valid and in
1963 /// the current computing group.
1964 
1966 {
1967  return fActiveSlaves->GetSize();
1968 }
1969 
1970 ////////////////////////////////////////////////////////////////////////////////
1971 /// Return number of inactive slaves, i.e. slaves that are valid but not in
1972 /// the current computing group.
1973 
1975 {
1976  return fInactiveSlaves->GetSize();
1977 }
1978 
1979 ////////////////////////////////////////////////////////////////////////////////
1980 /// Return number of unique slaves, i.e. active slaves that have each a
1981 /// unique different user files system.
1982 
1984 {
1985  return fUniqueSlaves->GetSize();
1986 }
1987 
1988 ////////////////////////////////////////////////////////////////////////////////
1989 /// Return number of bad slaves. This are slaves that we in the config
1990 /// file, but refused to startup or that died during the PROOF session.
1991 
1993 {
1994  return fBadSlaves->GetSize();
1995 }
1996 
1997 ////////////////////////////////////////////////////////////////////////////////
1998 /// Ask the for the statistics of the slaves.
1999 
2001 {
2002  if (!IsValid()) return;
2003 
2006 }
2007 
2008 ////////////////////////////////////////////////////////////////////////////////
2009 /// Get statistics about CPU time, real time and bytes read.
2010 /// If verbose, print the resuls (always available via GetCpuTime(), GetRealTime()
2011 /// and GetBytesRead()
2012 
2014 {
2015  if (fProtocol > 27) {
2016  // This returns the correct result
2017  AskStatistics();
2018  } else {
2019  // AskStatistics is buggy: parse the output of Print()
2020  RedirectHandle_t rh;
2021  gSystem->RedirectOutput(fLogFileName, "a", &rh);
2022  Print();
2023  gSystem->RedirectOutput(0, 0, &rh);
2024  TMacro *mp = GetLastLog();
2025  if (mp) {
2026  // Look for global directories
2027  TIter nxl(mp->GetListOfLines());
2028  TObjString *os = 0;
2029  while ((os = (TObjString *) nxl())) {
2030  TString s(os->GetName());
2031  if (s.Contains("Total MB's processed:")) {
2032  s.ReplaceAll("Total MB's processed:", "");
2033  if (s.IsFloat()) fBytesRead = (Long64_t) s.Atof() * (1024*1024);
2034  } else if (s.Contains("Total real time used (s):")) {
2035  s.ReplaceAll("Total real time used (s):", "");
2036  if (s.IsFloat()) fRealTime = s.Atof();
2037  } else if (s.Contains("Total CPU time used (s):")) {
2038  s.ReplaceAll("Total CPU time used (s):", "");
2039  if (s.IsFloat()) fCpuTime = s.Atof();
2040  }
2041  }
2042  delete mp;
2043  }
2044  }
2045 
2046  if (verbose) {
2047  Printf(" Real/CPU time (s): %.3f / %.3f; workers: %d; processed: %.2f MBs",
2048  GetRealTime(), GetCpuTime(), GetParallel(), float(GetBytesRead())/(1024*1024));
2049  }
2050 }
2051 
2052 ////////////////////////////////////////////////////////////////////////////////
2053 /// Ask the for the number of parallel slaves.
2054 
2056 {
2057  if (!IsValid()) return;
2058 
2061 }
2062 
2063 ////////////////////////////////////////////////////////////////////////////////
2064 /// Ask the master for the list of queries.
2065 
2067 {
2068  if (!IsValid() || TestBit(TProof::kIsMaster)) return (TList *)0;
2069 
2070  Bool_t all = ((strchr(opt,'A') || strchr(opt,'a'))) ? kTRUE : kFALSE;
2072  m << all;
2073  Broadcast(m, kActive);
2075 
2076  // This should have been filled by now
2077  return fQueries;
2078 }
2079 
2080 ////////////////////////////////////////////////////////////////////////////////
2081 /// Number of queries processed by this session
2082 
2084 {
2085  if (fQueries)
2086  return fQueries->GetSize() - fOtherQueries;
2087  return 0;
2088 }
2089 
2090 ////////////////////////////////////////////////////////////////////////////////
2091 /// Set max number of draw queries whose results are saved
2092 
2094 {
2095  if (max > 0) {
2096  if (fPlayer)
2097  fPlayer->SetMaxDrawQueries(max);
2098  fMaxDrawQueries = max;
2099  }
2100 }
2101 
2102 ////////////////////////////////////////////////////////////////////////////////
2103 /// Get max number of queries whose full results are kept in the
2104 /// remote sandbox
2105 
2107 {
2109  m << kFALSE;
2110  Broadcast(m, kActive);
2112 }
2113 
2114 ////////////////////////////////////////////////////////////////////////////////
2115 /// Return pointer to the list of query results in the player
2116 
2118 {
2119  return (fPlayer ? fPlayer->GetListOfResults() : (TList *)0);
2120 }
2121 
2122 ////////////////////////////////////////////////////////////////////////////////
2123 /// Return pointer to the full TQueryResult instance owned by the player
2124 /// and referenced by 'ref'. If ref = 0 or "", return the last query result.
2125 
2127 {
2128  return (fPlayer ? fPlayer->GetQueryResult(ref) : (TQueryResult *)0);
2129 }
2130 
2131 ////////////////////////////////////////////////////////////////////////////////
2132 /// Ask the master for the list of queries.
2133 /// Options:
2134 /// "A" show information about all the queries known to the
2135 /// server, i.e. even those processed by other sessions
2136 /// "L" show only information about queries locally available
2137 /// i.e. already retrieved. If "L" is specified, "A" is
2138 /// ignored.
2139 /// "F" show all details available about queries
2140 /// "H" print help menu
2141 /// Default ""
2142 
2144 {
2145  Bool_t help = ((strchr(opt,'H') || strchr(opt,'h'))) ? kTRUE : kFALSE;
2146  if (help) {
2147 
2148  // Help
2149 
2150  Printf("+++");
2151  Printf("+++ Options: \"A\" show all queries known to server");
2152  Printf("+++ \"L\" show retrieved queries");
2153  Printf("+++ \"F\" full listing of query info");
2154  Printf("+++ \"H\" print this menu");
2155  Printf("+++");
2156  Printf("+++ (case insensitive)");
2157  Printf("+++");
2158  Printf("+++ Use Retrieve(<#>) to retrieve the full"
2159  " query results from the master");
2160  Printf("+++ e.g. Retrieve(8)");
2161 
2162  Printf("+++");
2163 
2164  return;
2165  }
2166 
2167  if (!IsValid()) return;
2168 
2169  Bool_t local = ((strchr(opt,'L') || strchr(opt,'l'))) ? kTRUE : kFALSE;
2170 
2171  TObject *pq = 0;
2172  if (!local) {
2173  GetListOfQueries(opt);
2174 
2175  if (!fQueries) return;
2176 
2177  TIter nxq(fQueries);
2178 
2179  // Queries processed by other sessions
2180  if (fOtherQueries > 0) {
2181  Printf("+++");
2182  Printf("+++ Queries processed during other sessions: %d", fOtherQueries);
2183  Int_t nq = 0;
2184  while (nq++ < fOtherQueries && (pq = nxq()))
2185  pq->Print(opt);
2186  }
2187 
2188  // Queries processed by this session
2189  Printf("+++");
2190  Printf("+++ Queries processed during this session: selector: %d, draw: %d",
2192  while ((pq = nxq()))
2193  pq->Print(opt);
2194 
2195  } else {
2196 
2197  // Queries processed by this session
2198  Printf("+++");
2199  Printf("+++ Queries processed during this session: selector: %d, draw: %d",
2201 
2202  // Queries available locally
2203  TList *listlocal = fPlayer ? fPlayer->GetListOfResults() : (TList *)0;
2204  if (listlocal) {
2205  Printf("+++");
2206  Printf("+++ Queries available locally: %d", listlocal->GetSize());
2207  TIter nxlq(listlocal);
2208  while ((pq = nxlq()))
2209  pq->Print(opt);
2210  }
2211  }
2212  Printf("+++");
2213 }
2214 
2215 ////////////////////////////////////////////////////////////////////////////////
2216 /// See if the data is ready to be analyzed.
2217 
2218 Bool_t TProof::IsDataReady(Long64_t &totalbytes, Long64_t &bytesready)
2219 {
2220  if (!IsValid()) return kFALSE;
2221 
2222  TList submasters;
2223  TIter nextSlave(GetListOfActiveSlaves());
2224  while (TSlave *sl = dynamic_cast<TSlave*>(nextSlave())) {
2225  if (sl->GetSlaveType() == TSlave::kMaster) {
2226  submasters.Add(sl);
2227  }
2228  }
2229 
2230  fDataReady = kTRUE; //see if any submasters set it to false
2231  fBytesReady = 0;
2232  fTotalBytes = 0;
2233  //loop over submasters and see if data is ready
2234  if (submasters.GetSize() > 0) {
2235  Broadcast(kPROOF_DATA_READY, &submasters);
2236  Collect(&submasters);
2237  }
2238 
2239  bytesready = fBytesReady;
2240  totalbytes = fTotalBytes;
2241 
2242  EmitVA("IsDataReady(Long64_t,Long64_t)", 2, totalbytes, bytesready);
2243 
2244  PDB(kGlobal,2)
2245  Info("IsDataReady", "%lld / %lld (%s)",
2246  bytesready, totalbytes, fDataReady?"READY":"NOT READY");
2247 
2248  return fDataReady;
2249 }
2250 
2251 ////////////////////////////////////////////////////////////////////////////////
2252 /// Send interrupt to master or slave servers.
2253 
2255 {
2256  if (!IsValid()) return;
2257 
2258  TList *slaves = 0;
2259  if (list == kAll) slaves = fSlaves;
2260  if (list == kActive) slaves = fActiveSlaves;
2261  if (list == kUnique) slaves = fUniqueSlaves;
2262  if (list == kAllUnique) slaves = fAllUniqueSlaves;
2263 
2264  if (slaves->GetSize() == 0) return;
2265 
2266  TSlave *sl;
2267  TIter next(slaves);
2268 
2269  while ((sl = (TSlave *)next())) {
2270  if (sl->IsValid()) {
2271 
2272  // Ask slave to progate the interrupt request
2273  sl->Interrupt((Int_t)type);
2274  }
2275  }
2276 }
2277 
2278 ////////////////////////////////////////////////////////////////////////////////
2279 /// Returns number of slaves active in parallel mode. Returns 0 in case
2280 /// there are no active slaves. Returns -1 in case of error.
2281 
2283 {
2284  if (!IsValid()) return -1;
2285 
2286  // iterate over active slaves and return total number of slaves
2287  TIter nextSlave(GetListOfActiveSlaves());
2288  Int_t nparallel = 0;
2289  while (TSlave* sl = dynamic_cast<TSlave*>(nextSlave()))
2290  if (sl->GetParallel() >= 0)
2291  nparallel += sl->GetParallel();
2292 
2293  return nparallel;
2294 }
2295 
2296 ////////////////////////////////////////////////////////////////////////////////
2297 /// Returns list of TSlaveInfo's. In case of error return 0.
2298 
2300 {
2301  if (!IsValid()) return 0;
2302 
2303  if (fSlaveInfo == 0) {
2305  fSlaveInfo->SetOwner();
2306  } else {
2307  fSlaveInfo->Delete();
2308  }
2309 
2310  TList masters;
2311  TIter next(GetListOfSlaves());
2312  TSlave *slave;
2313 
2314  while ((slave = (TSlave *) next()) != 0) {
2315  if (slave->GetSlaveType() == TSlave::kSlave) {
2316  const char *name = IsLite() ? gSystem->HostName() : slave->GetName();
2317  TSlaveInfo *slaveinfo = new TSlaveInfo(slave->GetOrdinal(),
2318  name,
2319  slave->GetPerfIdx());
2320  fSlaveInfo->Add(slaveinfo);
2321 
2322  TIter nextactive(GetListOfActiveSlaves());
2323  TSlave *activeslave;
2324  while ((activeslave = (TSlave *) nextactive())) {
2325  if (TString(slaveinfo->GetOrdinal()) == activeslave->GetOrdinal()) {
2326  slaveinfo->SetStatus(TSlaveInfo::kActive);
2327  break;
2328  }
2329  }
2330 
2331  TIter nextbad(GetListOfBadSlaves());
2332  TSlave *badslave;
2333  while ((badslave = (TSlave *) nextbad())) {
2334  if (TString(slaveinfo->GetOrdinal()) == badslave->GetOrdinal()) {
2335  slaveinfo->SetStatus(TSlaveInfo::kBad);
2336  break;
2337  }
2338  }
2339  // Get system info if supported
2340  if (slave->IsValid()) {
2341  if (slave->GetSocket()->Send(kPROOF_GETSLAVEINFO) == -1)
2342  MarkBad(slave, "could not send kPROOF_GETSLAVEINFO message");
2343  else
2344  masters.Add(slave);
2345  }
2346 
2347  } else if (slave->GetSlaveType() == TSlave::kMaster) {
2348  if (slave->IsValid()) {
2349  if (slave->GetSocket()->Send(kPROOF_GETSLAVEINFO) == -1)
2350  MarkBad(slave, "could not send kPROOF_GETSLAVEINFO message");
2351  else
2352  masters.Add(slave);
2353  }
2354  } else {
2355  Error("GetSlaveInfo", "TSlave is neither Master nor Slave");
2356  R__ASSERT(0);
2357  }
2358  }
2359  if (masters.GetSize() > 0) Collect(&masters);
2360 
2361  return fSlaveInfo;
2362 }
2363 
2364 ////////////////////////////////////////////////////////////////////////////////
2365 /// Activate slave server list.
2366 
2368 {
2369  TMonitor *mon = fAllMonitor;
2370  mon->DeActivateAll();
2371 
2372  slaves = !slaves ? fActiveSlaves : slaves;
2373 
2374  TIter next(slaves);
2375  TSlave *sl;
2376  while ((sl = (TSlave*) next())) {
2377  if (sl->IsValid())
2378  mon->Activate(sl->GetSocket());
2379  }
2380 }
2381 
2382 ////////////////////////////////////////////////////////////////////////////////
2383 /// Activate (on == TRUE) or deactivate (on == FALSE) all sockets
2384 /// monitored by 'mon'.
2385 
2387 {
2388  TMonitor *m = (mon) ? mon : fCurrentMonitor;
2389  if (m) {
2390  if (on)
2391  m->ActivateAll();
2392  else
2393  m->DeActivateAll();
2394  }
2395 }
2396 
2397 ////////////////////////////////////////////////////////////////////////////////
2398 /// Broadcast the group priority to all workers in the specified list. Returns
2399 /// the number of workers the message was successfully sent to.
2400 /// Returns -1 in case of error.
2401 
2402 Int_t TProof::BroadcastGroupPriority(const char *grp, Int_t priority, TList *workers)
2403 {
2404  if (!IsValid()) return -1;
2405 
2406  if (workers->GetSize() == 0) return 0;
2407 
2408  int nsent = 0;
2409  TIter next(workers);
2410 
2411  TSlave *wrk;
2412  while ((wrk = (TSlave *)next())) {
2413  if (wrk->IsValid()) {
2414  if (wrk->SendGroupPriority(grp, priority) == -1)
2415  MarkBad(wrk, "could not send group priority");
2416  else
2417  nsent++;
2418  }
2419  }
2420 
2421  return nsent;
2422 }
2423 
2424 ////////////////////////////////////////////////////////////////////////////////
2425 /// Broadcast the group priority to all workers in the specified list. Returns
2426 /// the number of workers the message was successfully sent to.
2427 /// Returns -1 in case of error.
2428 
2429 Int_t TProof::BroadcastGroupPriority(const char *grp, Int_t priority, ESlaves list)
2430 {
2431  TList *workers = 0;
2432  if (list == kAll) workers = fSlaves;
2433  if (list == kActive) workers = fActiveSlaves;
2434  if (list == kUnique) workers = fUniqueSlaves;
2435  if (list == kAllUnique) workers = fAllUniqueSlaves;
2436 
2437  return BroadcastGroupPriority(grp, priority, workers);
2438 }
2439 
2440 ////////////////////////////////////////////////////////////////////////////////
2441 /// Reset the merge progress notificator
2442 
2444 {
2446 }
2447 
2448 ////////////////////////////////////////////////////////////////////////////////
2449 /// Broadcast a message to all slaves in the specified list. Returns
2450 /// the number of slaves the message was successfully sent to.
2451 /// Returns -1 in case of error.
2452 
2453 Int_t TProof::Broadcast(const TMessage &mess, TList *slaves)
2454 {
2455  if (!IsValid()) return -1;
2456 
2457  if (!slaves || slaves->GetSize() == 0) return 0;
2458 
2459  int nsent = 0;
2460  TIter next(slaves);
2461 
2462  TSlave *sl;
2463  while ((sl = (TSlave *)next())) {
2464  if (sl->IsValid()) {
2465  if (sl->GetSocket()->Send(mess) == -1)
2466  MarkBad(sl, "could not broadcast request");
2467  else
2468  nsent++;
2469  }
2470  }
2471 
2472  return nsent;
2473 }
2474 
2475 ////////////////////////////////////////////////////////////////////////////////
2476 /// Broadcast a message to all slaves in the specified list (either
2477 /// all slaves or only the active slaves). Returns the number of slaves
2478 /// the message was successfully sent to. Returns -1 in case of error.
2479 
2481 {
2482  TList *slaves = 0;
2483  if (list == kAll) slaves = fSlaves;
2484  if (list == kActive) slaves = fActiveSlaves;
2485  if (list == kUnique) slaves = fUniqueSlaves;
2486  if (list == kAllUnique) slaves = fAllUniqueSlaves;
2487 
2488  return Broadcast(mess, slaves);
2489 }
2490 
2491 ////////////////////////////////////////////////////////////////////////////////
2492 /// Broadcast a character string buffer to all slaves in the specified
2493 /// list. Use kind to set the TMessage what field. Returns the number of
2494 /// slaves the message was sent to. Returns -1 in case of error.
2495 
2496 Int_t TProof::Broadcast(const char *str, Int_t kind, TList *slaves)
2497 {
2498  TMessage mess(kind);
2499  if (str) mess.WriteString(str);
2500  return Broadcast(mess, slaves);
2501 }
2502 
2503 ////////////////////////////////////////////////////////////////////////////////
2504 /// Broadcast a character string buffer to all slaves in the specified
2505 /// list (either all slaves or only the active slaves). Use kind to
2506 /// set the TMessage what field. Returns the number of slaves the message
2507 /// was sent to. Returns -1 in case of error.
2508 
2509 Int_t TProof::Broadcast(const char *str, Int_t kind, ESlaves list)
2510 {
2511  TMessage mess(kind);
2512  if (str) mess.WriteString(str);
2513  return Broadcast(mess, list);
2514 }
2515 
2516 ////////////////////////////////////////////////////////////////////////////////
2517 /// Broadcast an object to all slaves in the specified list. Use kind to
2518 /// set the TMEssage what field. Returns the number of slaves the message
2519 /// was sent to. Returns -1 in case of error.
2520 
2522 {
2523  TMessage mess(kind);
2524  mess.WriteObject(obj);
2525  return Broadcast(mess, slaves);
2526 }
2527 
2528 ////////////////////////////////////////////////////////////////////////////////
2529 /// Broadcast an object to all slaves in the specified list. Use kind to
2530 /// set the TMEssage what field. Returns the number of slaves the message
2531 /// was sent to. Returns -1 in case of error.
2532 
2534 {
2535  TMessage mess(kind);
2536  mess.WriteObject(obj);
2537  return Broadcast(mess, list);
2538 }
2539 
2540 ////////////////////////////////////////////////////////////////////////////////
2541 /// Broadcast a raw buffer of specified length to all slaves in the
2542 /// specified list. Returns the number of slaves the buffer was sent to.
2543 /// Returns -1 in case of error.
2544 
2545 Int_t TProof::BroadcastRaw(const void *buffer, Int_t length, TList *slaves)
2546 {
2547  if (!IsValid()) return -1;
2548 
2549  if (slaves->GetSize() == 0) return 0;
2550 
2551  int nsent = 0;
2552  TIter next(slaves);
2553 
2554  TSlave *sl;
2555  while ((sl = (TSlave *)next())) {
2556  if (sl->IsValid()) {
2557  if (sl->GetSocket()->SendRaw(buffer, length) == -1)
2558  MarkBad(sl, "could not send broadcast-raw request");
2559  else
2560  nsent++;
2561  }
2562  }
2563 
2564  return nsent;
2565 }
2566 
2567 ////////////////////////////////////////////////////////////////////////////////
2568 /// Broadcast a raw buffer of specified length to all slaves in the
2569 /// specified list. Returns the number of slaves the buffer was sent to.
2570 /// Returns -1 in case of error.
2571 
2572 Int_t TProof::BroadcastRaw(const void *buffer, Int_t length, ESlaves list)
2573 {
2574  TList *slaves = 0;
2575  if (list == kAll) slaves = fSlaves;
2576  if (list == kActive) slaves = fActiveSlaves;
2577  if (list == kUnique) slaves = fUniqueSlaves;
2578  if (list == kAllUnique) slaves = fAllUniqueSlaves;
2579 
2580  return BroadcastRaw(buffer, length, slaves);
2581 }
2582 
2583 ////////////////////////////////////////////////////////////////////////////////
2584 /// Broadcast file to all workers in the specified list. Returns the number of workers
2585 /// the buffer was sent to.
2586 /// Returns -1 in case of error.
2587 
2588 Int_t TProof::BroadcastFile(const char *file, Int_t opt, const char *rfile, TList *wrks)
2589 {
2590  if (!IsValid()) return -1;
2591 
2592  if (wrks->GetSize() == 0) return 0;
2593 
2594  int nsent = 0;
2595  TIter next(wrks);
2596 
2597  TSlave *wrk;
2598  while ((wrk = (TSlave *)next())) {
2599  if (wrk->IsValid()) {
2600  if (SendFile(file, opt, rfile, wrk) < 0)
2601  Error("BroadcastFile",
2602  "problems sending file to worker %s (%s)",
2603  wrk->GetOrdinal(), wrk->GetName());
2604  else
2605  nsent++;
2606  }
2607  }
2608 
2609  return nsent;
2610 }
2611 
2612 ////////////////////////////////////////////////////////////////////////////////
2613 /// Broadcast file to all workers in the specified list. Returns the number of workers
2614 /// the buffer was sent to.
2615 /// Returns -1 in case of error.
2616 
2617 Int_t TProof::BroadcastFile(const char *file, Int_t opt, const char *rfile, ESlaves list)
2618 {
2619  TList *wrks = 0;
2620  if (list == kAll) wrks = fSlaves;
2621  if (list == kActive) wrks = fActiveSlaves;
2622  if (list == kUnique) wrks = fUniqueSlaves;
2623  if (list == kAllUnique) wrks = fAllUniqueSlaves;
2624 
2625  return BroadcastFile(file, opt, rfile, wrks);
2626 }
2627 
2628 ////////////////////////////////////////////////////////////////////////////////
2629 /// Release the used monitor to be used, making sure to delete newly created
2630 /// monitors.
2631 
2633 {
2634  if (mon && (mon != fAllMonitor) && (mon != fActiveMonitor)
2635  && (mon != fUniqueMonitor) && (mon != fAllUniqueMonitor)) {
2636  delete mon;
2637  }
2638 }
2639 
2640 ////////////////////////////////////////////////////////////////////////////////
2641 /// Collect responses from slave sl. Returns the number of slaves that
2642 /// responded (=1).
2643 /// If timeout >= 0, wait at most timeout seconds (timeout = -1 by default,
2644 /// which means wait forever).
2645 /// If defined (>= 0) endtype is the message that stops this collection.
2646 
2647 Int_t TProof::Collect(const TSlave *sl, Long_t timeout, Int_t endtype, Bool_t deactonfail)
2648 {
2649  Int_t rc = 0;
2650 
2651  TMonitor *mon = 0;
2652  if (!sl->IsValid()) return 0;
2653 
2654  if (fCurrentMonitor == fAllMonitor) {
2655  mon = new TMonitor;
2656  } else {
2657  mon = fAllMonitor;
2658  mon->DeActivateAll();
2659  }
2660  mon->Activate(sl->GetSocket());
2661 
2662  rc = Collect(mon, timeout, endtype, deactonfail);
2663  ReleaseMonitor(mon);
2664  return rc;
2665 }
2666 
2667 ////////////////////////////////////////////////////////////////////////////////
2668 /// Collect responses from the slave servers. Returns the number of slaves
2669 /// that responded.
2670 /// If timeout >= 0, wait at most timeout seconds (timeout = -1 by default,
2671 /// which means wait forever).
2672 /// If defined (>= 0) endtype is the message that stops this collection.
2673 
2674 Int_t TProof::Collect(TList *slaves, Long_t timeout, Int_t endtype, Bool_t deactonfail)
2675 {
2676  Int_t rc = 0;
2677 
2678  TMonitor *mon = 0;
2679 
2680  if (fCurrentMonitor == fAllMonitor) {
2681  mon = new TMonitor;
2682  } else {
2683  mon = fAllMonitor;
2684  mon->DeActivateAll();
2685  }
2686  TIter next(slaves);
2687  TSlave *sl;
2688  while ((sl = (TSlave*) next())) {
2689  if (sl->IsValid())
2690  mon->Activate(sl->GetSocket());
2691  }
2692 
2693  rc = Collect(mon, timeout, endtype, deactonfail);
2694  ReleaseMonitor(mon);
2695  return rc;
2696 }
2697 
2698 ////////////////////////////////////////////////////////////////////////////////
2699 /// Collect responses from the slave servers. Returns the number of slaves
2700 /// that responded.
2701 /// If timeout >= 0, wait at most timeout seconds (timeout = -1 by default,
2702 /// which means wait forever).
2703 /// If defined (>= 0) endtype is the message that stops this collection.
2704 
2705 Int_t TProof::Collect(ESlaves list, Long_t timeout, Int_t endtype, Bool_t deactonfail)
2706 {
2707  Int_t rc = 0;
2708  TMonitor *mon = 0;
2709 
2710  if (list == kAll) mon = fAllMonitor;
2711  if (list == kActive) mon = fActiveMonitor;
2712  if (list == kUnique) mon = fUniqueMonitor;
2713  if (list == kAllUnique) mon = fAllUniqueMonitor;
2714  if (fCurrentMonitor == mon) {
2715  // Get a copy
2716  mon = new TMonitor(*mon);
2717  }
2718  mon->ActivateAll();
2719 
2720  rc = Collect(mon, timeout, endtype, deactonfail);
2721  ReleaseMonitor(mon);
2722  return rc;
2723 }
2724 
2725 ////////////////////////////////////////////////////////////////////////////////
2726 /// Collect responses from the slave servers. Returns the number of messages
2727 /// received. Can be 0 if there are no active slaves.
2728 /// If timeout >= 0, wait at most timeout seconds (timeout = -1 by default,
2729 /// which means wait forever).
2730 /// If defined (>= 0) endtype is the message that stops this collection.
2731 /// Collect also stops its execution from time to time to check for new
2732 /// workers in Dynamic Startup mode.
2733 
2734 Int_t TProof::Collect(TMonitor *mon, Long_t timeout, Int_t endtype, Bool_t deactonfail)
2735 {
2736  Int_t collectId = gRandom->Integer(9999);
2737 
2738  PDB(kCollect, 3)
2739  Info("Collect", ">>>>>> Entering collect responses #%04d", collectId);
2740 
2741  // Reset the status flag and clear the messages in the list, if any
2742  fStatus = 0;
2743  fRecvMessages->Clear();
2744 
2745  Long_t actto = (Long_t)(gEnv->GetValue("Proof.SocketActivityTimeout", -1) * 1000);
2746 
2747  if (!mon->GetActive(actto)) return 0;
2748 
2750 
2751  // Used by external code to know what we are monitoring
2752  TMonitor *savedMonitor = 0;
2753  if (fCurrentMonitor) {
2754  savedMonitor = fCurrentMonitor;
2755  fCurrentMonitor = mon;
2756  } else {
2757  fCurrentMonitor = mon;
2758  fBytesRead = 0;
2759  fRealTime = 0.0;
2760  fCpuTime = 0.0;
2761  }
2762 
2763  // We want messages on the main window during synchronous collection,
2764  // but we save the present status to restore it at the end
2765  Bool_t saveRedirLog = fRedirLog;
2766  if (!IsIdle() && !IsSync())
2767  fRedirLog = kFALSE;
2768 
2769  int cnt = 0, rc = 0;
2770 
2771  // Timeout counter
2772  Long_t nto = timeout;
2773  PDB(kCollect, 2)
2774  Info("Collect","#%04d: active: %d", collectId, mon->GetActive());
2775 
2776  // On clients, handle Ctrl-C during collection
2777  if (fIntHandler)
2778  fIntHandler->Add();
2779 
2780  // Sockets w/o activity during the last 'sto' millisecs are deactivated
2781  Int_t nact = 0;
2782  Long_t sto = -1;
2783  Int_t nsto = 60;
2784  Int_t pollint = gEnv->GetValue("Proof.DynamicStartupPollInt", (Int_t) kPROOF_DynWrkPollInt_s);
2785  mon->ResetInterrupt();
2786  while ((nact = mon->GetActive(sto)) && (nto < 0 || nto > 0)) {
2787 
2788  // Dump last waiting sockets, if in debug mode
2789  PDB(kCollect, 2) {
2790  if (nact < 4) {
2791  TList *al = mon->GetListOfActives();
2792  if (al && al->GetSize() > 0) {
2793  Info("Collect"," %d node(s) still active:", al->GetSize());
2794  TIter nxs(al);
2795  TSocket *xs = 0;
2796  while ((xs = (TSocket *)nxs())) {
2797  TSlave *wrk = FindSlave(xs);
2798  if (wrk)
2799  Info("Collect"," %s (%s)", wrk->GetName(), wrk->GetOrdinal());
2800  else
2801  Info("Collect"," %p: %s:%d", xs, xs->GetInetAddress().GetHostName(),
2802  xs->GetInetAddress().GetPort());
2803  }
2804  }
2805  }
2806  }
2807 
2808  // Preemptive poll for new workers on the master only in Dynamic Mode and only
2809  // during processing (TODO: should work on Top Master only)
2811  ((fLastPollWorkers_s == -1) || (time(0)-fLastPollWorkers_s >= pollint))) {
2814  fLastPollWorkers_s = time(0);
2816  PDB(kCollect, 1)
2817  Info("Collect","#%04d: now active: %d", collectId, mon->GetActive());
2818  }
2819 
2820  // Wait for a ready socket
2821  PDB(kCollect, 3)
2822  Info("Collect", "Will invoke Select() #%04d", collectId);
2823  TSocket *s = mon->Select(1000);
2824 
2825  if (s && s != (TSocket *)(-1)) {
2826  // Get and analyse the info it did receive
2827  rc = CollectInputFrom(s, endtype, deactonfail);
2828  if (rc == 1 || (rc == 2 && !savedMonitor)) {
2829  // Deactivate it if we are done with it
2830  mon->DeActivate(s);
2831  PDB(kCollect, 2)
2832  Info("Collect","#%04d: deactivating %p (active: %d, %p)", collectId,
2833  s, mon->GetActive(),
2834  mon->GetListOfActives()->First());
2835  } else if (rc == 2) {
2836  // This end message was for the saved monitor
2837  // Deactivate it if we are done with it
2838  if (savedMonitor) {
2839  savedMonitor->DeActivate(s);
2840  PDB(kCollect, 2)
2841  Info("Collect","save monitor: deactivating %p (active: %d, %p)",
2842  s, savedMonitor->GetActive(),
2843  savedMonitor->GetListOfActives()->First());
2844  }
2845  }
2846 
2847  // Update counter (if no error occured)
2848  if (rc >= 0)
2849  cnt++;
2850  } else {
2851  // If not timed-out, exit if not stopped or not aborted
2852  // (player exits status is finished in such a case); otherwise,
2853  // we still need to collect the partial output info
2854  if (!s)
2856  mon->DeActivateAll();
2857  // Decrease the timeout counter if requested
2858  if (s == (TSocket *)(-1) && nto > 0)
2859  nto--;
2860  }
2861 
2862  // Check if there are workers with ready output to be sent and ask the first to send it
2863  if (IsMaster() && fWrksOutputReady && fWrksOutputReady->GetSize() > 0) {
2864  // Maximum number of concurrent sendings
2865  Int_t mxws = gEnv->GetValue("Proof.ControlSendOutput", 1);
2866  if (TProof::GetParameter(fPlayer->GetInputList(), "PROOF_ControlSendOutput", mxws) != 0)
2867  mxws = gEnv->GetValue("Proof.ControlSendOutput", 1);
2868  TIter nxwr(fWrksOutputReady);
2869  TSlave *wrk = 0;
2870  while (mxws && (wrk = (TSlave *) nxwr())) {
2871  if (!wrk->TestBit(TSlave::kOutputRequested)) {
2872  // Ask worker for output
2873  TMessage sendoutput(kPROOF_SENDOUTPUT);
2874  PDB(kCollect, 2)
2875  Info("Collect", "worker %s was asked to send its output to master",
2876  wrk->GetOrdinal());
2877  if (wrk->GetSocket()->Send(sendoutput) != 1) {
2879  mxws--;
2880  }
2881  } else {
2882  // Count
2883  mxws--;
2884  }
2885  }
2886  }
2887 
2888  // Check if we need to check the socket activity (we do it every 10 cycles ~ 10 sec)
2889  sto = -1;
2890  if (--nsto <= 0) {
2891  sto = (Long_t) actto;
2892  nsto = 60;
2893  }
2894 
2895  } // end loop over active monitors
2896 
2897  // If timed-out, deactivate the remaining sockets
2898  if (nto == 0) {
2899  TList *al = mon->GetListOfActives();
2900  if (al && al->GetSize() > 0) {
2901  // Notify the name of those which did timeout
2902  Info("Collect"," %d node(s) went in timeout:", al->GetSize());
2903  TIter nxs(al);
2904  TSocket *xs = 0;
2905  while ((xs = (TSocket *)nxs())) {
2906  TSlave *wrk = FindSlave(xs);
2907  if (wrk)
2908  Info("Collect"," %s", wrk->GetName());
2909  else
2910  Info("Collect"," %p: %s:%d", xs, xs->GetInetAddress().GetHostName(),
2911  xs->GetInetAddress().GetPort());
2912  }
2913  }
2914  mon->DeActivateAll();
2915  }
2916 
2917  // Deactivate Ctrl-C special handler
2918  if (fIntHandler)
2919  fIntHandler->Remove();
2920 
2921  // make sure group view is up to date
2922  SendGroupView();
2923 
2924  // Restore redirection setting
2925  fRedirLog = saveRedirLog;
2926 
2927  // Restore the monitor
2928  fCurrentMonitor = savedMonitor;
2929 
2931 
2932  PDB(kCollect, 3)
2933  Info("Collect", "<<<<<< Exiting collect responses #%04d", collectId);
2934 
2935  return cnt;
2936 }
2937 
2938 ////////////////////////////////////////////////////////////////////////////////
2939 /// Asks the PROOF Serv for new workers in Dynamic Startup mode and activates
2940 /// them. Returns the number of new workers found, or <0 on errors.
2941 
2943 {
2944  // Requests for worker updates
2945  Int_t dummy = 0;
2946  TList *reqWorkers = new TList();
2947  reqWorkers->SetOwner(kFALSE);
2948 
2949  if (!TestBit(TProof::kIsMaster)) {
2950  Error("PollForNewWorkers", "Can't invoke: not on a master -- should not happen!");
2951  return -1;
2952  }
2953  if (!gProofServ) {
2954  Error("PollForNewWorkers", "No ProofServ available -- should not happen!");
2955  return -1;
2956  }
2957 
2958  gProofServ->GetWorkers(reqWorkers, dummy, kTRUE); // last 2 are dummy
2959 
2960  // List of new workers only (TProofNodeInfo)
2961  TList *newWorkers = new TList();
2962  newWorkers->SetOwner(kTRUE);
2963 
2964  TIter next(reqWorkers);
2965  TProofNodeInfo *ni;
2966  TString fullOrd;
2967  while (( ni = dynamic_cast<TProofNodeInfo *>(next()) )) {
2968 
2969  // Form the full ordinal
2970  fullOrd.Form("%s.%s", gProofServ->GetOrdinal(), ni->GetOrdinal().Data());
2971 
2972  TIter nextInner(fSlaves);
2973  TSlave *sl;
2974  Bool_t found = kFALSE;
2975  while (( sl = dynamic_cast<TSlave *>(nextInner()) )) {
2976  if ( strcmp(sl->GetOrdinal(), fullOrd.Data()) == 0 ) {
2977  found = kTRUE;
2978  break;
2979  }
2980  }
2981 
2982  if (found) delete ni;
2983  else {
2984  newWorkers->Add(ni);
2985  PDB(kGlobal, 1)
2986  Info("PollForNewWorkers", "New worker found: %s:%s",
2987  ni->GetNodeName().Data(), fullOrd.Data());
2988  }
2989  }
2990 
2991  delete reqWorkers; // not owner
2992 
2993  Int_t nNewWorkers = newWorkers->GetEntries();
2994 
2995  // Add the new workers
2996  if (nNewWorkers > 0) {
2997  PDB(kGlobal, 1)
2998  Info("PollForNewWorkers", "Requesting to add %d new worker(s)", newWorkers->GetEntries());
2999  Int_t rv = AddWorkers(newWorkers);
3000  if (rv < 0) {
3001  Error("PollForNewWorkers", "Call to AddWorkers() failed (got %d < 0)", rv);
3002  return -1;
3003  }
3004  // Don't delete newWorkers: AddWorkers() will do that
3005  }
3006  else {
3007  PDB(kGlobal, 2)
3008  Info("PollForNewWorkers", "No new worker found");
3009  delete newWorkers;
3010  }
3011 
3012  return nNewWorkers;
3013 }
3014 
3015 ////////////////////////////////////////////////////////////////////////////////
3016 /// Remove links to objects in list 'ol' from gDirectory
3017 
3019 {
3020  if (ol) {
3021  TIter nxo(ol);
3022  TObject *o = 0;
3023  while ((o = nxo()))
3024  gDirectory->RecursiveRemove(o);
3025  }
3026 }
3027 
3028 ////////////////////////////////////////////////////////////////////////////////
3029 /// Collect and analyze available input from socket s.
3030 /// Returns 0 on success, -1 if any failure occurs.
3031 
3033 {
3034  TMessage *mess;
3035 
3036  Int_t recvrc = 0;
3037  if ((recvrc = s->Recv(mess)) < 0) {
3038  PDB(kCollect,2)
3039  Info("CollectInputFrom","%p: got %d from Recv()", s, recvrc);
3040  Bool_t bad = kTRUE;
3041  if (recvrc == -5) {
3042  // Broken connection: try reconnection
3044  if (s->Reconnect() == 0) {
3046  bad = kFALSE;
3047  }
3048  }
3049  if (bad)
3050  MarkBad(s, "problems receiving a message in TProof::CollectInputFrom(...)");
3051  // Ignore this wake up
3052  return -1;
3053  }
3054  if (!mess) {
3055  // we get here in case the remote server died
3056  MarkBad(s, "undefined message in TProof::CollectInputFrom(...)");
3057  return -1;
3058  }
3059  Int_t rc = 0;
3060 
3061  Int_t what = mess->What();
3062  TSlave *sl = FindSlave(s);
3063  rc = HandleInputMessage(sl, mess, deactonfail);
3064  if (rc == 1 && (endtype >= 0) && (what != endtype))
3065  // This message was for the base monitor in recursive case
3066  rc = 2;
3067 
3068  // We are done successfully
3069  return rc;
3070 }
3071 
3072 ////////////////////////////////////////////////////////////////////////////////
3073 /// Analyze the received message.
3074 /// Returns 0 on success (1 if this the last message from this socket), -1 if
3075 /// any failure occurs.
3076 
3078 {
3079  char str[512];
3080  TObject *obj;
3081  Int_t rc = 0;
3082 
3083  if (!mess || !sl) {
3084  Warning("HandleInputMessage", "given an empty message or undefined worker");
3085  return -1;
3086  }
3087  Bool_t delete_mess = kTRUE;
3088  TSocket *s = sl->GetSocket();
3089  if (!s) {
3090  Warning("HandleInputMessage", "worker socket is undefined");
3091  return -1;
3092  }
3093 
3094  // The message type
3095  Int_t what = mess->What();
3096 
3097  PDB(kCollect,3)
3098  Info("HandleInputMessage", "got type %d from '%s'", what, sl->GetOrdinal());
3099 
3100  switch (what) {
3101 
3102  case kMESS_OK:
3103  // Add the message to the list
3104  fRecvMessages->Add(mess);
3105  delete_mess = kFALSE;
3106  break;
3107 
3108  case kMESS_OBJECT:
3109  if (fPlayer) fPlayer->HandleRecvHisto(mess);
3110  break;
3111 
3112  case kPROOF_FATAL:
3113  { TString msg;
3114  if ((mess->BufferSize() > mess->Length()))
3115  (*mess) >> msg;
3116  if (msg.IsNull()) {
3117  MarkBad(s, "received kPROOF_FATAL");
3118  } else {
3119  MarkBad(s, msg);
3120  }
3121  }
3122  if (fProgressDialogStarted) {
3123  // Finalize the progress dialog
3124  Emit("StopProcess(Bool_t)", kTRUE);
3125  }
3126  break;
3127 
3128  case kPROOF_STOP:
3129  // Stop collection from this worker
3130  Info("HandleInputMessage", "received kPROOF_STOP from %s: disabling any further collection this worker",
3131  sl->GetOrdinal());
3132  rc = 1;
3133  break;
3134 
3135  case kPROOF_GETTREEHEADER:
3136  // Add the message to the list
3137  fRecvMessages->Add(mess);
3138  delete_mess = kFALSE;
3139  rc = 1;
3140  break;
3141 
3142  case kPROOF_TOUCH:
3143  // send a request for touching the remote admin file
3144  {
3145  sl->Touch();
3146  }
3147  break;
3148 
3149  case kPROOF_GETOBJECT:
3150  // send slave object it asks for
3151  mess->ReadString(str, sizeof(str));
3152  obj = gDirectory->Get(str);
3153  if (obj)
3154  s->SendObject(obj);
3155  else
3156  s->Send(kMESS_NOTOK);
3157  break;
3158 
3159  case kPROOF_GETPACKET:
3160  {
3161  PDB(kGlobal,2)
3162  Info("HandleInputMessage","%s: kPROOF_GETPACKET", sl->GetOrdinal());
3163  TDSetElement *elem = 0;
3164  elem = fPlayer ? fPlayer->GetNextPacket(sl, mess) : 0;
3165 
3166  if (elem != (TDSetElement*) -1) {
3167  TMessage answ(kPROOF_GETPACKET);
3168  answ << elem;
3169  s->Send(answ);
3170 
3171  while (fWaitingSlaves != 0 && fWaitingSlaves->GetSize()) {
3172  TPair *p = (TPair*) fWaitingSlaves->First();
3173  s = (TSocket*) p->Key();
3174  TMessage *m = (TMessage*) p->Value();
3175 
3176  elem = fPlayer ? fPlayer->GetNextPacket(sl, m) : 0;
3177  if (elem != (TDSetElement*) -1) {
3179  a << elem;
3180  s->Send(a);
3181  // remove has to happen via Links because TPair does not have
3182  // a Compare() function and therefore RemoveFirst() and
3183  // Remove(TObject*) do not work
3185  delete p;
3186  delete m;
3187  } else {
3188  break;
3189  }
3190  }
3191  } else {
3192  if (fWaitingSlaves == 0) fWaitingSlaves = new TList;
3193  fWaitingSlaves->Add(new TPair(s, mess));
3194  delete_mess = kFALSE;
3195  }
3196  }
3197  break;
3198 
3199  case kPROOF_LOGFILE:
3200  {
3201  Int_t size;
3202  (*mess) >> size;
3203  PDB(kGlobal,2)
3204  Info("HandleInputMessage","%s: kPROOF_LOGFILE: size: %d", sl->GetOrdinal(), size);
3205  RecvLogFile(s, size);
3206  }
3207  break;
3208 
3209  case kPROOF_LOGDONE:
3210  (*mess) >> sl->fStatus >> sl->fParallel;
3211  PDB(kCollect,2)
3212  Info("HandleInputMessage","%s: kPROOF_LOGDONE: status %d parallel %d",
3213  sl->GetOrdinal(), sl->fStatus, sl->fParallel);
3214  if (sl->fStatus != 0) {
3215  // Return last nonzero status
3216  fStatus = sl->fStatus;
3217  // Deactivate the worker, if required
3218  if (deactonfail) DeactivateWorker(sl->fOrdinal);
3219  }
3220  // Remove from the workers-ready list
3223  fWrksOutputReady->Remove(sl);
3224  }
3225  rc = 1;
3226  break;
3227 
3228  case kPROOF_GETSTATS:
3229  {
3230  (*mess) >> sl->fBytesRead >> sl->fRealTime >> sl->fCpuTime
3231  >> sl->fWorkDir >> sl->fProofWorkDir;
3232  PDB(kCollect,2)
3233  Info("HandleInputMessage", "kPROOF_GETSTATS: %s", sl->fWorkDir.Data());
3234  TString img;
3235  if ((mess->BufferSize() > mess->Length()))
3236  (*mess) >> img;
3237  // Set image
3238  if (img.IsNull()) {
3239  if (sl->fImage.IsNull())
3240  sl->fImage.Form("%s:%s", TUrl(sl->fName).GetHostFQDN(),
3241  sl->fProofWorkDir.Data());
3242  } else {
3243  sl->fImage = img;
3244  }
3245  PDB(kGlobal,2)
3246  Info("HandleInputMessage",
3247  "kPROOF_GETSTATS:%s image: %s", sl->GetOrdinal(), sl->GetImage());
3248 
3249  fBytesRead += sl->fBytesRead;
3250  fRealTime += sl->fRealTime;
3251  fCpuTime += sl->fCpuTime;
3252  rc = 1;
3253  }
3254  break;
3255 
3256  case kPROOF_GETPARALLEL:
3257  {
3258  Bool_t async = kFALSE;
3259  (*mess) >> sl->fParallel;
3260  if ((mess->BufferSize() > mess->Length()))
3261  (*mess) >> async;
3262  rc = (async) ? 0 : 1;
3263  }
3264  break;
3265 
3266  case kPROOF_CHECKFILE:
3267  { // New servers (>= 5.22) send the status
3268  if ((mess->BufferSize() > mess->Length())) {
3269  (*mess) >> fCheckFileStatus;
3270  } else {
3271  // Form old servers this meant success (failure was signaled with the
3272  // dangerous kPROOF_FATAL)
3273  fCheckFileStatus = 1;
3274  }
3275  rc = 1;
3276  }
3277  break;
3278 
3279  case kPROOF_SENDFILE:
3280  { // New server: signals ending of sendfile operation
3281  rc = 1;
3282  }
3283  break;
3284 
3285  case kPROOF_PACKAGE_LIST:
3286  {
3287  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_PACKAGE_LIST: enter");
3288  Int_t type = 0;
3289  (*mess) >> type;
3290  switch (type) {
3294  if (fEnabledPackages) {
3296  } else {
3297  Error("HandleInputMessage",
3298  "kPROOF_PACKAGE_LIST: kListEnabledPackages: TList not found in message!");
3299  }
3300  break;
3301  case TProof::kListPackages:
3304  if (fAvailablePackages) {
3306  } else {
3307  Error("HandleInputMessage",
3308  "kPROOF_PACKAGE_LIST: kListPackages: TList not found in message!");
3309  }
3310  break;
3311  default:
3312  Error("HandleInputMessage", "kPROOF_PACKAGE_LIST: unknown type: %d", type);
3313  }
3314  }
3315  break;
3316 
3317  case kPROOF_SENDOUTPUT:
3318  {
3319  // We start measuring the merging time
3320  fPlayer->SetMerging();
3321 
3322  // Worker is ready to send output: make sure the relevant bit is reset
3324  PDB(kGlobal,2)
3325  Info("HandleInputMessage","kPROOF_SENDOUTPUT: enter (%s)", sl->GetOrdinal());
3326  // Create the list if not yet done
3327  if (!fWrksOutputReady) {
3328  fWrksOutputReady = new TList;
3330  }
3331  fWrksOutputReady->Add(sl);
3332  }
3333  break;
3334 
3335  case kPROOF_OUTPUTOBJECT:
3336  {
3337  // We start measuring the merging time
3338  fPlayer->SetMerging();
3339 
3340  PDB(kGlobal,2)
3341  Info("HandleInputMessage","kPROOF_OUTPUTOBJECT: enter");
3342  Int_t type = 0;
3343  const char *prefix = gProofServ ? gProofServ->GetPrefix() : "Lite-0";
3345  Info("HandleInputMessage", "finalization on %s started ...", prefix);
3347  }
3348 
3349  while ((mess->BufferSize() > mess->Length())) {
3350  (*mess) >> type;
3351  // If a query result header, add it to the player list
3352  if (fPlayer) {
3353  if (type == 0) {
3354  // Retrieve query result instance (output list not filled)
3355  TQueryResult *pq =
3357  if (pq) {
3358  // Add query to the result list in TProofPlayer
3359  fPlayer->AddQueryResult(pq);
3360  fPlayer->SetCurrentQuery(pq);
3361  // And clear the output list, as we start merging a new set of results
3362  if (fPlayer->GetOutputList())
3363  fPlayer->GetOutputList()->Clear();
3364  // Add the unique query tag as TNamed object to the input list
3365  // so that it is available in TSelectors for monitoring
3366  TString qid = TString::Format("%s:%s",pq->GetTitle(),pq->GetName());
3367  if (fPlayer->GetInputList()->FindObject("PROOF_QueryTag"))
3368  fPlayer->GetInputList()->Remove(fPlayer->GetInputList()->FindObject("PROOF_QueryTag"));
3369  fPlayer->AddInput(new TNamed("PROOF_QueryTag", qid.Data()));
3370  } else {
3371  Warning("HandleInputMessage","kPROOF_OUTPUTOBJECT: query result missing");
3372  }
3373  } else if (type > 0) {
3374  // Read object
3375  TObject *o = mess->ReadObject(TObject::Class());
3376  // Increment counter on the client side
3378  TString msg;
3379  Bool_t changed = kFALSE;
3380  msg.Form("%s: merging output objects ... %s", prefix, fMergePrg.Export(changed));
3381  if (gProofServ) {
3383  } else if (IsTty() || changed) {
3384  fprintf(stderr, "%s\r", msg.Data());
3385  }
3386  // Add or merge it
3387  if ((fPlayer->AddOutputObject(o) == 1)) {
3388  // Remove the object if it has been merged
3389  SafeDelete(o);
3390  }
3391  if (type > 1) {
3392  // Update the merger progress info
3394  if (TestBit(TProof::kIsClient) && !IsLite()) {
3395  // In PROOFLite this has to be done once only in TProofLite::Process
3397  if (pq) {
3399  // Add input objects (do not override remote settings, if any)
3400  TObject *xo = 0;
3401  TIter nxin(fPlayer->GetInputList());
3402  // Servers prior to 5.28/00 do not create the input list in the TQueryResult
3403  if (!pq->GetInputList()) pq->SetInputList(new TList());
3404  while ((xo = nxin()))
3405  if (!pq->GetInputList()->FindObject(xo->GetName()))
3406  pq->AddInput(xo->Clone());
3407  // If the last object, notify the GUI that the result arrived
3408  QueryResultReady(TString::Format("%s:%s", pq->GetTitle(), pq->GetName()));
3409  }
3410  // Processing is over
3411  UpdateDialog();
3412  }
3413  }
3414  }
3415  } else {
3416  Warning("HandleInputMessage", "kPROOF_OUTPUTOBJECT: player undefined!");
3417  }
3418  }
3419  }
3420  break;
3421 
3422  case kPROOF_OUTPUTLIST:
3423  {
3424  // We start measuring the merging time
3425  fPlayer->SetMerging();
3426 
3427  PDB(kGlobal,2)
3428  Info("HandleInputMessage","%s: kPROOF_OUTPUTLIST: enter", sl->GetOrdinal());
3429  TList *out = 0;
3430  if (fPlayer) {
3431  if (TestBit(TProof::kIsMaster) || fProtocol < 7) {
3432  out = (TList *) mess->ReadObject(TList::Class());
3433  } else {
3434  TQueryResult *pq =
3436  if (pq) {
3437  // Add query to the result list in TProofPlayer
3438  fPlayer->AddQueryResult(pq);
3439  fPlayer->SetCurrentQuery(pq);
3440  // To avoid accidental cleanups from anywhere else
3441  // remove objects from gDirectory and clone the list
3442  out = pq->GetOutputList();
3443  CleanGDirectory(out);
3444  out = (TList *) out->Clone();
3445  // Notify the GUI that the result arrived
3446  QueryResultReady(TString::Format("%s:%s", pq->GetTitle(), pq->GetName()));
3447  } else {
3448  PDB(kGlobal,2)
3449  Info("HandleInputMessage",
3450  "%s: kPROOF_OUTPUTLIST: query result missing", sl->GetOrdinal());
3451  }
3452  }
3453  if (out) {
3454  out->SetOwner();
3455  fPlayer->AddOutput(out); // Incorporate the list
3456  SafeDelete(out);
3457  } else {
3458  PDB(kGlobal,2)
3459  Info("HandleInputMessage",
3460  "%s: kPROOF_OUTPUTLIST: outputlist is empty", sl->GetOrdinal());
3461  }
3462  } else {
3463  Warning("HandleInputMessage",
3464  "%s: kPROOF_OUTPUTLIST: player undefined!", sl->GetOrdinal());
3465  }
3466  // On clients at this point processing is over
3467  if (TestBit(TProof::kIsClient) && !IsLite())
3468  UpdateDialog();
3469  }
3470  break;
3471 
3472  case kPROOF_QUERYLIST:
3473  {
3474  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_QUERYLIST: enter");
3475  (*mess) >> fOtherQueries >> fDrawQueries;
3476  if (fQueries) {
3477  fQueries->Delete();
3478  delete fQueries;
3479  fQueries = 0;
3480  }
3481  fQueries = (TList *) mess->ReadObject(TList::Class());
3482  }
3483  break;
3484 
3485  case kPROOF_RETRIEVE:
3486  {
3487  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_RETRIEVE: enter");
3488  TQueryResult *pq =
3490  if (pq && fPlayer) {
3491  fPlayer->AddQueryResult(pq);
3492  // Notify the GUI that the result arrived
3493  QueryResultReady(TString::Format("%s:%s", pq->GetTitle(), pq->GetName()));
3494  } else {
3495  PDB(kGlobal,2)
3496  Info("HandleInputMessage",
3497  "kPROOF_RETRIEVE: query result missing or player undefined");
3498  }
3499  }
3500  break;
3501 
3502  case kPROOF_MAXQUERIES:
3503  {
3504  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_MAXQUERIES: enter");
3505  Int_t max = 0;
3506 
3507  (*mess) >> max;
3508  Printf("Number of queries fully kept remotely: %d", max);
3509  }
3510  break;
3511 
3512  case kPROOF_SERVERSTARTED:
3513  {
3514  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_SERVERSTARTED: enter");
3515 
3516  UInt_t tot = 0, done = 0;
3517  TString action;
3518  Bool_t st = kTRUE;
3519 
3520  (*mess) >> action >> tot >> done >> st;
3521 
3522  if (TestBit(TProof::kIsClient)) {
3523  if (tot) {
3524  TString type = (action.Contains("submas")) ? "submasters"
3525  : "workers";
3526  Int_t frac = (Int_t) (done*100.)/tot;
3527  char msg[512] = {0};
3528  if (frac >= 100) {
3529  snprintf(msg, 512, "%s: OK (%d %s) \n",
3530  action.Data(),tot, type.Data());
3531  } else {
3532  snprintf(msg, 512, "%s: %d out of %d (%d %%)\r",
3533  action.Data(), done, tot, frac);
3534  }
3535  if (fSync)
3536  fprintf(stderr,"%s", msg);
3537  else
3538  NotifyLogMsg(msg, 0);
3539  }
3540  // Notify GUIs
3541  StartupMessage(action.Data(), st, (Int_t)done, (Int_t)tot);
3542  } else {
3543 
3544  // Just send the message one level up
3546  m << action << tot << done << st;
3547  gProofServ->GetSocket()->Send(m);
3548  }
3549  }
3550  break;
3551 
3552  case kPROOF_DATASET_STATUS:
3553  {
3554  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_DATASET_STATUS: enter");
3555 
3556  UInt_t tot = 0, done = 0;
3557  TString action;
3558  Bool_t st = kTRUE;
3559 
3560  (*mess) >> action >> tot >> done >> st;
3561 
3562  if (TestBit(TProof::kIsClient)) {
3563  if (tot) {
3564  TString type = "files";
3565  Int_t frac = (Int_t) (done*100.)/tot;
3566  char msg[512] = {0};
3567  if (frac >= 100) {
3568  snprintf(msg, 512, "%s: OK (%d %s) \n",
3569  action.Data(),tot, type.Data());
3570  } else {
3571  snprintf(msg, 512, "%s: %d out of %d (%d %%)\r",
3572  action.Data(), done, tot, frac);
3573  }
3574  if (fSync)
3575  fprintf(stderr,"%s", msg);
3576  else
3577  NotifyLogMsg(msg, 0);
3578  }
3579  // Notify GUIs
3580  DataSetStatus(action.Data(), st, (Int_t)done, (Int_t)tot);
3581  } else {
3582 
3583  // Just send the message one level up
3585  m << action << tot << done << st;
3586  gProofServ->GetSocket()->Send(m);
3587  }
3588  }
3589  break;
3590 
3591  case kPROOF_STARTPROCESS:
3592  {
3593  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_STARTPROCESS: enter");
3594 
3595  // For Proof-Lite this variable is the number of workers and is set
3596  // by the player
3597  if (!IsLite()) {
3598  fNotIdle = 1;
3599  fIsWaiting = kFALSE;
3600  }
3601 
3602  // Redirect the output, if needed
3603  fRedirLog = (fSync) ? fRedirLog : kTRUE;
3604 
3605  // The signal is used on masters by XrdProofdProtocol to catch
3606  // the start of processing; on clients it allows to update the
3607  // progress dialog
3608  if (!TestBit(TProof::kIsMaster)) {
3609 
3610  // This is the end of preparation
3611  fQuerySTW.Stop();
3613  PDB(kGlobal,2) Info("HandleInputMessage","Preparation time: %f s", fPrepTime);
3614 
3615  TString selec;
3616  Int_t dsz = -1;
3617  Long64_t first = -1, nent = -1;
3618  (*mess) >> selec >> dsz >> first >> nent;
3619  // Start or reset the progress dialog
3620  if (!gROOT->IsBatch()) {
3621  if (fProgressDialog &&
3623  if (!fProgressDialogStarted) {
3624  fProgressDialog->ExecPlugin(5, this,
3625  selec.Data(), dsz, first, nent);
3627  } else {
3628  ResetProgressDialog(selec, dsz, first, nent);
3629  }
3630  }
3632  }
3633  }
3634  }
3635  break;
3636 
3637  case kPROOF_ENDINIT:
3638  {
3639  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_ENDINIT: enter");
3640 
3641  if (TestBit(TProof::kIsMaster)) {
3642  if (fPlayer)
3643  fPlayer->SetInitTime();
3644  }
3645  }
3646  break;
3647 
3648  case kPROOF_SETIDLE:
3649  {
3650  PDB(kGlobal,2)
3651  Info("HandleInputMessage","kPROOF_SETIDLE from '%s': enter (%d)", sl->GetOrdinal(), fNotIdle);
3652 
3653  // The session is idle
3654  if (IsLite()) {
3655  if (fNotIdle > 0) {
3656  fNotIdle--;
3657  PDB(kGlobal,2)
3658  Info("HandleInputMessage", "%s: got kPROOF_SETIDLE", sl->GetOrdinal());
3659  } else {
3660  Warning("HandleInputMessage",
3661  "%s: got kPROOF_SETIDLE but no running workers ! protocol error?",
3662  sl->GetOrdinal());
3663  }
3664  } else {
3665  fNotIdle = 0;
3666  // Check if the query has been enqueued
3667  if ((mess->BufferSize() > mess->Length()))
3668  (*mess) >> fIsWaiting;
3669  }
3670  }
3671  break;
3672 
3673  case kPROOF_QUERYSUBMITTED:
3674  {
3675  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_QUERYSUBMITTED: enter");
3676 
3677  // We have received the sequential number
3678  (*mess) >> fSeqNum;
3679  Bool_t sync = fSync;
3680  if ((mess->BufferSize() > mess->Length()))
3681  (*mess) >> sync;
3682  if (sync != fSync && fSync) {
3683  // The server required to switch to asynchronous mode
3684  Activate();
3685  fSync = kFALSE;
3686  }
3687  DisableGoAsyn();
3688  // Check if the query has been enqueued
3689  fIsWaiting = kTRUE;
3690  // For Proof-Lite this variable is the number of workers and is set by the player
3691  if (!IsLite())
3692  fNotIdle = 1;
3693 
3694  rc = 1;
3695  }
3696  break;
3697 
3698  case kPROOF_SESSIONTAG:
3699  {
3700  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_SESSIONTAG: enter");
3701 
3702  // We have received the unique tag and save it as name of this object
3703  TString stag;
3704  (*mess) >> stag;
3705  SetName(stag);
3706  // In the TSlave object
3707  sl->SetSessionTag(stag);
3708  // Server may have also sent the group
3709  if ((mess->BufferSize() > mess->Length()))
3710  (*mess) >> fGroup;
3711  // Server may have also sent the user
3712  if ((mess->BufferSize() > mess->Length())) {
3713  TString usr;
3714  (*mess) >> usr;
3715  if (!usr.IsNull()) fUrl.SetUser(usr.Data());
3716  }
3717  }
3718  break;
3719 
3720  case kPROOF_FEEDBACK:
3721  {
3722  PDB(kGlobal,2)
3723  Info("HandleInputMessage","kPROOF_FEEDBACK: enter");
3724  TList *out = (TList *) mess->ReadObject(TList::Class());
3725  out->SetOwner();
3726  if (fPlayer)
3727  fPlayer->StoreFeedback(sl, out); // Adopts the list
3728  else
3729  // Not yet ready: stop collect asap
3730  rc = 1;
3731  }
3732  break;
3733 
3734  case kPROOF_AUTOBIN:
3735  {
3736  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_AUTOBIN: enter");
3737 
3738  TString name;
3739  Double_t xmin, xmax, ymin, ymax, zmin, zmax;
3740 
3741  (*mess) >> name >> xmin >> xmax >> ymin >> ymax >> zmin >> zmax;
3742 
3743  if (fPlayer) fPlayer->UpdateAutoBin(name,xmin,xmax,ymin,ymax,zmin,zmax);
3744 
3745  TMessage answ(kPROOF_AUTOBIN);
3746 
3747  answ << name << xmin << xmax << ymin << ymax << zmin << zmax;
3748 
3749  s->Send(answ);
3750  }
3751  break;
3752 
3753  case kPROOF_PROGRESS:
3754  {
3755  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_PROGRESS: enter");
3756 
3757  if (GetRemoteProtocol() > 25) {
3758  // New format
3759  TProofProgressInfo *pi = 0;
3760  (*mess) >> pi;
3761  fPlayer->Progress(sl,pi);
3762  } else if (GetRemoteProtocol() > 11) {
3763  Long64_t total, processed, bytesread;
3764  Float_t initTime, procTime, evtrti, mbrti;
3765  (*mess) >> total >> processed >> bytesread
3766  >> initTime >> procTime
3767  >> evtrti >> mbrti;
3768  if (fPlayer)
3769  fPlayer->Progress(sl, total, processed, bytesread,
3770  initTime, procTime, evtrti, mbrti);
3771 
3772  } else {
3773  // Old format
3774  Long64_t total, processed;
3775  (*mess) >> total >> processed;
3776  if (fPlayer)
3777  fPlayer->Progress(sl, total, processed);
3778  }
3779  }
3780  break;
3781 
3782  case kPROOF_STOPPROCESS:
3783  {
3784  // This message is sent from a worker that finished processing.
3785  // We determine whether it was asked to finish by the
3786  // packetizer or stopped during processing a packet
3787  // (by TProof::RemoveWorkers() or by an external signal).
3788  // In the later case call packetizer->MarkBad.
3789  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_STOPPROCESS: enter");
3790 
3791  Long64_t events = 0;
3792  Bool_t abort = kFALSE;
3793  TProofProgressStatus *status = 0;
3794 
3795  if ((mess->BufferSize() > mess->Length()) && (fProtocol > 18)) {
3796  (*mess) >> status >> abort;
3797  } else if ((mess->BufferSize() > mess->Length()) && (fProtocol > 8)) {
3798  (*mess) >> events >> abort;
3799  } else {
3800  (*mess) >> events;
3801  }
3802  if (fPlayer) {
3803  if (fProtocol > 18) {
3804  TList *listOfMissingFiles = 0;
3805  if (!(listOfMissingFiles = (TList *)GetOutput("MissingFiles"))) {
3806  listOfMissingFiles = new TList();
3807  listOfMissingFiles->SetName("MissingFiles");
3808  if (fPlayer)
3809  fPlayer->AddOutputObject(listOfMissingFiles);
3810  }
3811  if (fPlayer->GetPacketizer()) {
3812  Int_t ret =
3813  fPlayer->GetPacketizer()->AddProcessed(sl, status, 0, &listOfMissingFiles);
3814  if (ret > 0)
3815  fPlayer->GetPacketizer()->MarkBad(sl, status, &listOfMissingFiles);
3816  // This object is now owned by the packetizer
3817  status = 0;
3818  }
3819  if (status) fPlayer->AddEventsProcessed(status->GetEntries());
3820  } else {
3821  fPlayer->AddEventsProcessed(events);
3822  }
3823  }
3824  SafeDelete(status);
3825  if (!TestBit(TProof::kIsMaster))
3826  Emit("StopProcess(Bool_t)", abort);
3827  break;
3828  }
3829 
3830  case kPROOF_SUBMERGER:
3831  {
3832  PDB(kGlobal,2) Info("HandleInputMessage", "kPROOF_SUBMERGER: enter");
3833  HandleSubmerger(mess, sl);
3834  }
3835  break;
3836 
3837  case kPROOF_GETSLAVEINFO:
3838  {
3839  PDB(kGlobal,2) Info("HandleInputMessage", "kPROOF_GETSLAVEINFO: enter");
3840 
3841  Bool_t active = (GetListOfActiveSlaves()->FindObject(sl) != 0);
3842  Bool_t bad = (GetListOfBadSlaves()->FindObject(sl) != 0);
3843  TList* tmpinfo = 0;
3844  (*mess) >> tmpinfo;
3845  if (tmpinfo == 0) {
3846  Error("HandleInputMessage", "kPROOF_GETSLAVEINFO: no list received!");
3847  } else {
3848  tmpinfo->SetOwner(kFALSE);
3849  Int_t nentries = tmpinfo->GetSize();
3850  for (Int_t i=0; i<nentries; i++) {
3851  TSlaveInfo* slinfo =
3852  dynamic_cast<TSlaveInfo*>(tmpinfo->At(i));
3853  if (slinfo) {
3854  // If PROOF-Lite
3855  if (IsLite()) slinfo->fHostName = gSystem->HostName();
3856  // Check if we have already a instance for this worker
3857  TIter nxw(fSlaveInfo);
3858  TSlaveInfo *ourwi = 0;
3859  while ((ourwi = (TSlaveInfo *)nxw())) {
3860  if (!strcmp(ourwi->GetOrdinal(), slinfo->GetOrdinal())) {
3861  ourwi->SetSysInfo(slinfo->GetSysInfo());
3862  ourwi->fHostName = slinfo->GetName();
3863  if (slinfo->GetDataDir() && (strlen(slinfo->GetDataDir()) > 0))
3864  ourwi->fDataDir = slinfo->GetDataDir();
3865  break;
3866  }
3867  }
3868  if (!ourwi) {
3869  fSlaveInfo->Add(slinfo);
3870  } else {
3871  slinfo = ourwi;
3872  }
3873  if (slinfo->fStatus != TSlaveInfo::kBad) {
3874  if (!active) slinfo->SetStatus(TSlaveInfo::kNotActive);
3875  if (bad) slinfo->SetStatus(TSlaveInfo::kBad);
3876  }
3877  if (sl->GetMsd() && (strlen(sl->GetMsd()) > 0))
3878  slinfo->fMsd = sl->GetMsd();
3879  }
3880  }
3881  delete tmpinfo;
3882  rc = 1;
3883  }
3884  }
3885  break;
3886 
3887  case kPROOF_VALIDATE_DSET:
3888  {
3889  PDB(kGlobal,2)
3890  Info("HandleInputMessage", "kPROOF_VALIDATE_DSET: enter");
3891  TDSet* dset = 0;
3892  (*mess) >> dset;
3893  if (!fDSet)
3894  Error("HandleInputMessage", "kPROOF_VALIDATE_DSET: fDSet not set");
3895  else
3896  fDSet->Validate(dset);
3897  delete dset;
3898  }
3899  break;
3900 
3901  case kPROOF_DATA_READY:
3902  {
3903  PDB(kGlobal,2) Info("HandleInputMessage", "kPROOF_DATA_READY: enter");
3904  Bool_t dataready = kFALSE;
3905  Long64_t totalbytes, bytesready;
3906  (*mess) >> dataready >> totalbytes >> bytesready;
3907  fTotalBytes += totalbytes;
3908  fBytesReady += bytesready;
3909  if (dataready == kFALSE) fDataReady = dataready;
3910  }
3911  break;
3912 
3913  case kPROOF_PING:
3914  // do nothing (ping is already acknowledged)
3915  break;
3916 
3917  case kPROOF_MESSAGE:
3918  {
3919  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_MESSAGE: enter");
3920 
3921  // We have received the unique tag and save it as name of this object
3922  TString msg;
3923  (*mess) >> msg;
3924  Bool_t lfeed = kTRUE;
3925  if ((mess->BufferSize() > mess->Length()))
3926  (*mess) >> lfeed;
3927 
3928  if (TestBit(TProof::kIsClient)) {
3929 
3930  if (fSync) {
3931  // Notify locally
3932  fprintf(stderr,"%s%c", msg.Data(), (lfeed ? '\n' : '\r'));
3933  } else {
3934  // Notify locally taking care of redirection, windows logs, ...
3935  NotifyLogMsg(msg, (lfeed ? "\n" : "\r"));
3936  }
3937  } else {
3938 
3939  // The message is logged for debugging purposes.
3940  fprintf(stderr,"%s%c", msg.Data(), (lfeed ? '\n' : '\r'));
3941  if (gProofServ) {
3942  // We hide it during normal operations
3944 
3945  // And send the message one level up
3946  gProofServ->SendAsynMessage(msg, lfeed);
3947  }
3948  }
3949  }
3950  break;
3951 
3952  case kPROOF_VERSARCHCOMP:
3953  {
3954  TString vac;
3955  (*mess) >> vac;
3956  PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_VERSARCHCOMP: %s", vac.Data());
3957  Int_t from = 0;
3958  TString vers, archcomp;
3959  if (vac.Tokenize(vers, from, "|"))
3960  vac.Tokenize(archcomp, from, "|");
3961  sl->SetArchCompiler(archcomp);
3962  vers.ReplaceAll(":","|");
3963  sl->SetROOTVersion(vers);
3964  }
3965  break;
3966 
3967  default:
3968  {
3969  Error("HandleInputMessage", "unknown command received from '%s' (what = %d)",
3970  sl->GetOrdinal(), what);
3971  }
3972  break;
3973  }
3974 
3975  // Cleanup
3976  if (delete_mess)
3977  delete mess;
3978 
3979  // We are done successfully
3980  return rc;
3981 }
3982 
3983 ////////////////////////////////////////////////////////////////////////////////
3984 /// Process a message of type kPROOF_SUBMERGER
3985 
3987 {
3988  // Message sub-type
3989  Int_t type = 0;
3990  (*mess) >> type;
3991  TSocket *s = sl->GetSocket();
3992 
3993  switch (type) {
3994  case kOutputSent:
3995  {
3996  if (IsEndMaster()) {
3997  Int_t merger_id = -1;
3998  (*mess) >> merger_id;
3999 
4000  PDB(kSubmerger, 2)
4001  Info("HandleSubmerger", "kOutputSent: Worker %s:%d:%s had sent its output to merger #%d",
4002  sl->GetName(), sl->GetPort(), sl->GetOrdinal(), merger_id);
4003 
4004  if (!fMergers || fMergers->GetSize() <= merger_id) {
4005  Error("HandleSubmerger", "kOutputSize: #%d not in list ", merger_id);
4006  break;
4007  }
4008  TMergerInfo * mi = (TMergerInfo *) fMergers->At(merger_id);
4009  mi->SetMergedWorker();
4010  if (mi->AreAllWorkersMerged()) {
4011  mi->Deactivate();
4012  if (GetActiveMergersCount() == 0) {
4013  fMergers->Clear();
4014  delete fMergers;
4015  fMergersSet = kFALSE;
4016  fMergersCount = -1;
4017  fLastAssignedMerger = 0;
4018  PDB(kSubmerger, 2) Info("HandleSubmerger", "all mergers removed ... ");
4019  }
4020  }
4021  } else {
4022  PDB(kSubmerger, 2) Error("HandleSubmerger","kOutputSent: received not on endmaster!");
4023  }
4024  }
4025  break;
4026 
4027  case kMergerDown:
4028  {
4029  Int_t merger_id = -1;
4030  (*mess) >> merger_id;
4031 
4032  PDB(kSubmerger, 2) Info("HandleSubmerger", "kMergerDown: #%d ", merger_id);
4033 
4034  if (!fMergers || fMergers->GetSize() <= merger_id) {
4035  Error("HandleSubmerger", "kMergerDown: #%d not in list ", merger_id);
4036  break;
4037  }
4038 
4039  TMergerInfo * mi = (TMergerInfo *) fMergers->At(merger_id);
4040  if (!mi->IsActive()) {
4041  break;
4042  } else {
4043  mi->Deactivate();
4044  }
4045 
4046  // Stop the invalid merger in the case it is still listening
4047  TMessage stop(kPROOF_SUBMERGER);
4048  stop << Int_t(kStopMerging);
4049  stop << 0;
4050  s->Send(stop);
4051 
4052  // Ask for results from merger (only original results from this node as worker are returned)
4053  AskForOutput(mi->GetMerger());
4054 
4055  // Ask for results from all workers assigned to this merger
4056  TIter nxo(mi->GetWorkers());
4057  TObject * o = 0;
4058  while ((o = nxo())) {
4059  AskForOutput((TSlave *)o);
4060  }
4061  PDB(kSubmerger, 2) Info("HandleSubmerger", "kMergerDown:%d: exit", merger_id);
4062  }
4063  break;
4064 
4065  case kOutputSize:
4066  {
4067  if (IsEndMaster()) {
4068  PDB(kSubmerger, 2)
4069  Info("HandleSubmerger", "worker %s reported as finished ", sl->GetOrdinal());
4070 
4071  const char *prefix = gProofServ ? gProofServ->GetPrefix() : "Lite-0";
4072  if (!fFinalizationRunning) {
4073  Info("HandleSubmerger", "finalization on %s started ...", prefix);
4075  }
4076 
4077  Int_t output_size = 0;
4078  Int_t merging_port = 0;
4079  (*mess) >> output_size >> merging_port;
4080 
4081  PDB(kSubmerger, 2) Info("HandleSubmerger",
4082  "kOutputSize: Worker %s:%d:%s reports %d output objects (+ available port %d)",
4083  sl->GetName(), sl->GetPort(), sl->GetOrdinal(), output_size, merging_port);
4084  TString msg;
4085  if (!fMergersSet) {
4086 
4088 
4089  // First pass - setting number of mergers according to user or dynamically
4090  fMergersCount = -1; // No mergers used if not set by user
4091  TParameter<Int_t> *mc = dynamic_cast<TParameter<Int_t> *>(GetParameter("PROOF_UseMergers"));
4092  if (mc) fMergersCount = mc->GetVal(); // Value set by user
4093  TParameter<Int_t> *mh = dynamic_cast<TParameter<Int_t> *>(GetParameter("PROOF_MergersByHost"));
4094  if (mh) fMergersByHost = (mh->GetVal() != 0) ? kTRUE : kFALSE; // Assign submergers by hostname
4095 
4096  // Mergers count specified by user but not valid
4097  if (fMergersCount < 0 || (fMergersCount > (activeWorkers/2) )) {
4098  msg.Form("%s: Invalid request: cannot start %d mergers for %d workers",
4099  prefix, fMergersCount, activeWorkers);
4100  if (gProofServ)
4102  else
4103  Printf("%s",msg.Data());
4104  fMergersCount = 0;
4105  }
4106  // Mergers count will be set dynamically
4107  if ((fMergersCount == 0) && (!fMergersByHost)) {
4108  if (activeWorkers > 1) {
4109  fMergersCount = TMath::Nint(TMath::Sqrt(activeWorkers));
4110  if (activeWorkers / fMergersCount < 2)
4111  fMergersCount = (Int_t) TMath::Sqrt(activeWorkers);
4112  }
4113  if (fMergersCount > 1)
4114  msg.Form("%s: Number of mergers set dynamically to %d (for %d workers)",
4115  prefix, fMergersCount, activeWorkers);
4116  else {
4117  msg.Form("%s: No mergers will be used for %d workers",
4118  prefix, activeWorkers);
4119  fMergersCount = -1;
4120  }
4121  if (gProofServ)
4123  else
4124  Printf("%s",msg.Data());
4125  } else if (fMergersByHost) {
4126  // We force mergers at host level to minimize network traffic
4127  if (activeWorkers > 1) {
4128  fMergersCount = 0;
4129  THashList hosts;
4130  TIter nxwk(fSlaves);
4131  TObject *wrk = 0;
4132  while ((wrk = nxwk())) {
4133  if (!hosts.FindObject(wrk->GetName())) {
4134  hosts.Add(new TObjString(wrk->GetName()));
4135  fMergersCount++;
4136  }
4137  }
4138  }
4139  if (fMergersCount > 1)
4140  msg.Form("%s: Number of mergers set to %d (for %d workers), one for each slave host",
4141  prefix, fMergersCount, activeWorkers);
4142  else {
4143  msg.Form("%s: No mergers will be used for %d workers",
4144  prefix, activeWorkers);
4145  fMergersCount = -1;
4146  }
4147  if (gProofServ)
4149  else
4150  Printf("%s",msg.Data());
4151  } else {
4152  msg.Form("%s: Number of mergers set by user to %d (for %d workers)",
4153  prefix, fMergersCount, activeWorkers);
4154  if (gProofServ)
4156  else
4157  Printf("%s",msg.Data());
4158  }
4159 
4160  // We started merging; we call it here because fMergersCount is still the original number
4161  // and can be saved internally
4163 
4164  // Update merger counters (new workers are not yet active)
4166 
4167  if (fMergersCount > 0) {
4168 
4169  fMergers = new TList();
4170  fLastAssignedMerger = 0;
4171  // Total number of workers, which will not act as mergers ('pure workers')
4172  fWorkersToMerge = (activeWorkers - fMergersCount);
4173  // Establish the first merger
4174  if (!CreateMerger(sl, merging_port)) {
4175  // Cannot establish first merger
4176  AskForOutput(sl);
4177  fWorkersToMerge--;
4178  fMergersCount--;
4179  }
4181  } else {
4182  AskForOutput(sl);
4183  }
4184  fMergersSet = kTRUE;
4185  } else {
4186  // Multiple pass
4187  if (fMergersCount == -1) {
4188  // No mergers. Workers send their outputs directly to master
4189  AskForOutput(sl);
4190  } else {
4191  if ((fRedirectNext > 0 ) && (!fMergersByHost)) {
4192  RedirectWorker(s, sl, output_size);
4193  fRedirectNext--;
4194  } else {
4195  Bool_t newMerger = kTRUE;
4196  if (fMergersByHost) {
4197  TIter nxmg(fMergers);
4198  TMergerInfo *mgi = 0;
4199  while ((mgi = (TMergerInfo *) nxmg())) {
4200  if (!strcmp(sl->GetName(), mgi->GetMerger()->GetName())) {
4201  newMerger = kFALSE;
4202  break;
4203  }
4204  }
4205  }
4206  if ((fMergersCount > fMergers->GetSize()) && newMerger) {
4207  // Still not enough mergers established
4208  if (!CreateMerger(sl, merging_port)) {
4209  // Cannot establish a merger
4210  AskForOutput(sl);
4211  fWorkersToMerge--;
4212  fMergersCount--;
4213  }
4214  } else
4215  RedirectWorker(s, sl, output_size);
4216  }
4217  }
4218  }
4219  } else {
4220  Error("HandleSubMerger","kOutputSize received not on endmaster!");
4221  }
4222  }
4223  break;
4224  }
4225 }
4226 
4227 ////////////////////////////////////////////////////////////////////////////////
4228 /// Redirect output of worker sl to some merger
4229 
4230 void TProof::RedirectWorker(TSocket *s, TSlave * sl, Int_t output_size)
4231 {
4232  Int_t merger_id = -1;
4233 
4234  if (fMergersByHost) {
4235  for (Int_t i = 0; i < fMergers->GetSize(); i++) {
4236  TMergerInfo *mgi = (TMergerInfo *)fMergers->At(i);
4237  if (!strcmp(sl->GetName(), mgi->GetMerger()->GetName())) {
4238  merger_id = i;
4239  break;
4240  }
4241  }
4242  } else {
4243  merger_id = FindNextFreeMerger();
4244  }
4245 
4246  if (merger_id == -1) {
4247  // No free merger (probably it had crashed before)
4248  AskForOutput(sl);
4249  } else {
4250  TMessage sendoutput(kPROOF_SUBMERGER);
4251  sendoutput << Int_t(kSendOutput);
4252  PDB(kSubmerger, 2)
4253  Info("RedirectWorker", "redirecting worker %s to merger %d", sl->GetOrdinal(), merger_id);
4254 
4255  PDB(kSubmerger, 2) Info("RedirectWorker", "redirecting output to merger #%d", merger_id);
4256  if (!fMergers || fMergers->GetSize() <= merger_id) {
4257  Error("RedirectWorker", "#%d not in list ", merger_id);
4258  return;
4259  }
4260  TMergerInfo * mi = (TMergerInfo *) fMergers->At(merger_id);
4261 
4262  TString hname = (IsLite()) ? "localhost" : mi->GetMerger()->GetName();
4263  sendoutput << merger_id;
4264  sendoutput << hname;
4265  sendoutput << mi->GetPort();
4266  s->Send(sendoutput);
4267  mi->AddMergedObjects(output_size);
4268  mi->AddWorker(sl);
4269  }
4270 }
4271 
4272 ////////////////////////////////////////////////////////////////////////////////
4273 /// Return a merger, which is both active and still accepts some workers to be
4274 /// assigned to it. It works on the 'round-robin' basis.
4275 
4277 {
4278  while (fLastAssignedMerger < fMergers->GetSize() &&
4279  (!((TMergerInfo*)fMergers->At(fLastAssignedMerger))->IsActive() ||
4280  ((TMergerInfo*)fMergers->At(fLastAssignedMerger))->AreAllWorkersAssigned())) {
4282  }
4283 
4284  if (fLastAssignedMerger == fMergers->GetSize()) {
4285  fLastAssignedMerger = 0;
4286  } else {
4287  return fLastAssignedMerger++;
4288  }
4289 
4290  while (fLastAssignedMerger < fMergers->GetSize() &&
4291  (!((TMergerInfo*)fMergers->At(fLastAssignedMerger))->IsActive() ||
4292  ((TMergerInfo*)fMergers->At(fLastAssignedMerger))->AreAllWorkersAssigned())) {
4294  }
4295 
4296  if (fLastAssignedMerger == fMergers->GetSize()) {
4297  return -1;
4298  } else {
4299  return fLastAssignedMerger++;
4300  }
4301 }
4302 
4303 ////////////////////////////////////////////////////////////////////////////////
4304 /// Master asks for output from worker sl
4305 
4307 {
4308  TMessage sendoutput(kPROOF_SUBMERGER);
4309  sendoutput << Int_t(kSendOutput);
4310 
4311  PDB(kSubmerger, 2) Info("AskForOutput",
4312  "worker %s was asked to send its output to master",
4313  sl->GetOrdinal());
4314 
4315  sendoutput << -1;
4316  sendoutput << TString("master");
4317  sendoutput << -1;
4318  sl->GetSocket()->Send(sendoutput);
4319  if (IsLite()) fMergePrg.IncreaseNWrks();
4320 }
4321 
4322 ////////////////////////////////////////////////////////////////////////////////
4323 /// Final update of the progress dialog
4324 
4326 {
4327  if (!fPlayer) return;
4328 
4329  // Handle abort ...
4331  if (fSync)
4332  Info("UpdateDialog",
4333  "processing was aborted - %lld events processed",
4335 
4336  if (GetRemoteProtocol() > 11) {
4337  // New format
4338  Progress(-1, fPlayer->GetEventsProcessed(), -1, -1., -1., -1., -1.);
4339  } else {
4341  }
4342  Emit("StopProcess(Bool_t)", kTRUE);
4343  }
4344 
4345  // Handle stop ...
4347  if (fSync)
4348  Info("UpdateDialog",
4349  "processing was stopped - %lld events processed",
4351 
4352  if (GetRemoteProtocol() > 25) {
4353  // New format
4354  Progress(-1, fPlayer->GetEventsProcessed(), -1, -1., -1., -1., -1., -1, -1, -1.);
4355  } else if (GetRemoteProtocol() > 11) {
4356  Progress(-1, fPlayer->GetEventsProcessed(), -1, -1., -1., -1., -1.);
4357  } else {
4359  }
4360  Emit("StopProcess(Bool_t)", kFALSE);
4361  }
4362 
4363  // Final update of the dialog box
4364  if (GetRemoteProtocol() > 25) {
4365  // New format
4366  EmitVA("Progress(Long64_t,Long64_t,Long64_t,Float_t,Float_t,Float_t,Float_t,Int_t,Int_t,Float_t)",
4367  10, (Long64_t)(-1), (Long64_t)(-1), (Long64_t)(-1),(Float_t)(-1.),(Float_t)(-1.),
4368  (Float_t)(-1.),(Float_t)(-1.),(Int_t)(-1),(Int_t)(-1),(Float_t)(-1.));
4369  } else if (GetRemoteProtocol() > 11) {
4370  // New format
4371  EmitVA("Progress(Long64_t,Long64_t,Long64_t,Float_t,Float_t,Float_t,Float_t)",
4372  7, (Long64_t)(-1), (Long64_t)(-1), (Long64_t)(-1),
4373  (Float_t)(-1.),(Float_t)(-1.),(Float_t)(-1.),(Float_t)(-1.));
4374  } else {
4375  EmitVA("Progress(Long64_t,Long64_t)", 2, (Long64_t)(-1), (Long64_t)(-1));
4376  }
4377 }
4378 
4379 ////////////////////////////////////////////////////////////////////////////////
4380 /// Activate the a-sync input handler.
4381 
4383 {
4384  TIter next(fSlaves);
4385  TSlave *sl;
4386 
4387  while ((sl = (TSlave*) next()))
4388  if (sl->GetInputHandler())
4389  sl->GetInputHandler()->Add();
4390 }
4391 
4392 ////////////////////////////////////////////////////////////////////////////////
4393 /// De-activate a-sync input handler.
4394 
4396 {
4397  TIter next(fSlaves);
4398  TSlave *sl;
4399 
4400  while ((sl = (TSlave*) next()))
4401  if (sl->GetInputHandler())
4402  sl->GetInputHandler()->Remove();
4403 }
4404 
4405 ////////////////////////////////////////////////////////////////////////////////
4406 /// Get the active mergers count
4407 
4409 {
4410  if (!fMergers) return 0;
4411 
4412  Int_t active_mergers = 0;
4413 
4414  TIter mergers(fMergers);
4415  TMergerInfo *mi = 0;
4416  while ((mi = (TMergerInfo *)mergers())) {
4417  if (mi->IsActive()) active_mergers++;
4418  }
4419 
4420  return active_mergers;
4421 }
4422 
4423 ////////////////////////////////////////////////////////////////////////////////
4424 /// Create a new merger
4425 
4427 {
4428  PDB(kSubmerger, 2)
4429  Info("CreateMerger", "worker %s will be merger ", sl->GetOrdinal());
4430 
4431  PDB(kSubmerger, 2) Info("CreateMerger","Begin");
4432 
4433  if (port <= 0) {
4434  PDB(kSubmerger,2)
4435  Info("CreateMerger", "cannot create merger on port %d - exit", port);
4436  return kFALSE;
4437  }
4438 
4439  Int_t workers = -1;
4440  if (!fMergersByHost) {
4441  Int_t mergersToCreate = fMergersCount - fMergers->GetSize();
4442  // Number of pure workers, which are not simply divisible by mergers
4443  Int_t rest = fWorkersToMerge % mergersToCreate;
4444  // We add one more worker for each of the first 'rest' mergers being established
4445  if (rest > 0 && fMergers->GetSize() < rest) {
4446  rest = 1;
4447  } else {
4448  rest = 0;
4449  }
4450  workers = (fWorkersToMerge / mergersToCreate) + rest;
4451  } else {
4452  Int_t workersOnHost = 0;
4453  for (Int_t i = 0; i < fActiveSlaves->GetSize(); i++) {
4454  if(!strcmp(sl->GetName(), fActiveSlaves->At(i)->GetName())) workersOnHost++;
4455  }
4456  workers = workersOnHost - 1;
4457  }
4458 
4459  TString msg;
4460  msg.Form("worker %s on host %s will be merger for %d additional workers", sl->GetOrdinal(), sl->GetName(), workers);
4461 
4462  if (gProofServ) {
4464  } else {
4465  Printf("%s",msg.Data());
4466  }
4467  TMergerInfo * merger = new TMergerInfo(sl, port, workers);
4468 
4469  TMessage bemerger(kPROOF_SUBMERGER);
4470  bemerger << Int_t(kBeMerger);
4471  bemerger << fMergers->GetSize();
4472  bemerger << workers;
4473  sl->GetSocket()->Send(bemerger);
4474 
4475  PDB(kSubmerger,2) Info("CreateMerger",
4476  "merger #%d (port: %d) for %d workers started",
4477  fMergers->GetSize(), port, workers);
4478 
4479  fMergers->Add(merger);
4480  fWorkersToMerge = fWorkersToMerge - workers;
4481 
4482  fRedirectNext = workers / 2;
4483 
4484  PDB(kSubmerger, 2) Info("CreateMerger", "exit");
4485  return kTRUE;
4486 }
4487 
4488 ////////////////////////////////////////////////////////////////////////////////
4489 /// Add a bad slave server to the bad slave list and remove it from
4490 /// the active list and from the two monitor objects. Assume that the work
4491 /// done by this worker was lost and ask packerizer to reassign it.
4492 
4493 void TProof::MarkBad(TSlave *wrk, const char *reason)
4494 {
4495  std::lock_guard<std::recursive_mutex> lock(fCloseMutex);
4496 
4497  // We may have been invalidated in the meanwhile: nothing to do in such a case
4498  if (!IsValid()) return;
4499 
4500  if (!wrk) {
4501  Error("MarkBad", "worker instance undefined: protocol error? ");
4502  return;
4503  }
4504 
4505  // Local URL
4506  static TString thisurl;
4507  if (thisurl.IsNull()) {
4508  if (IsMaster()) {
4509  Int_t port = gEnv->GetValue("ProofServ.XpdPort",-1);
4510  thisurl = TUrl(gSystem->HostName()).GetHostFQDN();
4511  if (port > 0) thisurl += TString::Format(":%d", port);
4512  } else {
4513  thisurl.Form("%s@%s:%d", fUrl.GetUser(), fUrl.GetHost(), fUrl.GetPort());
4514  }
4515  }
4516 
4517  if (!reason || (strcmp(reason, kPROOF_TerminateWorker) && strcmp(reason, kPROOF_WorkerIdleTO))) {
4518  // Message for notification
4519  const char *mastertype = (gProofServ && gProofServ->IsTopMaster()) ? "top master" : "master";
4520  TString src = IsMaster() ? Form("%s at %s", mastertype, thisurl.Data()) : "local session";
4521  TString msg;
4522  msg.Form("\n +++ Message from %s : marking %s:%d (%s) as bad\n +++ Reason: %s",
4523  src.Data(), wrk->GetName(), wrk->GetPort(), wrk->GetOrdinal(),
4524  (reason && strlen(reason)) ? reason : "unknown");
4525  Info("MarkBad", "%s", msg.Data());
4526  // Notify one level up, if the case
4527  // Add some hint for diagnostics
4528  if (gProofServ) {
4529  msg += TString::Format("\n\n +++ Most likely your code crashed on worker %s at %s:%d.\n",
4530  wrk->GetOrdinal(), wrk->GetName(), wrk->GetPort());
4531  } else {
4532  msg += TString::Format("\n\n +++ Most likely your code crashed\n");
4533  }
4534  msg += TString::Format(" +++ Please check the session logs for error messages either using\n");
4535  msg += TString::Format(" +++ the 'Show logs' button or executing\n");
4536  msg += TString::Format(" +++\n");
4537  if (gProofServ) {
4538  msg += TString::Format(" +++ root [] TProof::Mgr(\"%s\")->GetSessionLogs()->"
4539  "Display(\"%s\",0)\n\n", thisurl.Data(), wrk->GetOrdinal());
4541  } else {
4542  msg += TString::Format(" +++ root [] TProof::Mgr(\"%s\")->GetSessionLogs()->"
4543  "Display(\"*\")\n\n", thisurl.Data());
4544  Printf("%s", msg.Data());
4545  }
4546  } else if (reason) {
4547  if (gDebug > 0 && strcmp(reason, kPROOF_WorkerIdleTO)) {
4548  Info("MarkBad", "worker %s at %s:%d asked to terminate",
4549  wrk->GetOrdinal(), wrk->GetName(), wrk->GetPort());
4550  }
4551  }
4552 
4553  if (IsMaster() && reason) {
4554  if (strcmp(reason, kPROOF_TerminateWorker)) {
4555  // if the reason was not a planned termination
4556  TList *listOfMissingFiles = 0;
4557  if (!(listOfMissingFiles = (TList *)GetOutput("MissingFiles"))) {
4558  listOfMissingFiles = new TList();
4559  listOfMissingFiles->SetName("MissingFiles");
4560  if (fPlayer)
4561  fPlayer->AddOutputObject(listOfMissingFiles);
4562  }
4563  // If a query is being processed, assume that the work done by
4564  // the worker was lost and needs to be reassigned.
4565  TVirtualPacketizer *packetizer = fPlayer ? fPlayer->GetPacketizer() : 0;
4566  if (packetizer) {
4567  // the worker was lost so do resubmit the packets
4568  packetizer->MarkBad(wrk, 0, &listOfMissingFiles);
4569  }
4570  } else {
4571  // Tell the coordinator that we are gone
4572  if (gProofServ) {
4573  TString ord(wrk->GetOrdinal());
4574  Int_t id = ord.Last('.');
4575  if (id != kNPOS) ord.Remove(0, id+1);
4576  gProofServ->ReleaseWorker(ord.Data());
4577  }
4578  }
4579  } else if (TestBit(TProof::kIsClient) && reason && !strcmp(reason, kPROOF_WorkerIdleTO)) {
4580  // We are invalid after this
4581  fValid = kFALSE;
4582  }
4583 
4584  fActiveSlaves->Remove(wrk);
4585  FindUniqueSlaves();
4586 
4587  fAllMonitor->Remove(wrk->GetSocket());
4588  fActiveMonitor->Remove(wrk->GetSocket());
4589 
4591 
4592  if (IsMaster()) {
4593  if (reason && !strcmp(reason, kPROOF_TerminateWorker)) {
4594  // if the reason was a planned termination then delete the worker and
4595  // remove it from all the lists
4596  fSlaves->Remove(wrk);
4597  fBadSlaves->Remove(wrk);
4598  fActiveSlaves->Remove(wrk);
4599  fInactiveSlaves->Remove(wrk);
4600  fUniqueSlaves->Remove(wrk);
4601  fAllUniqueSlaves->Remove(wrk);
4602  fNonUniqueMasters->Remove(wrk);
4603 
4604  // we add it to the list of terminated slave infos instead, so that it
4605  // stays available in the .workers persistent file
4606  TSlaveInfo *si = new TSlaveInfo(
4607  wrk->GetOrdinal(),
4608  Form("%s@%s:%d", wrk->GetUser(), wrk->GetName(), wrk->GetPort()),
4609  0, "", wrk->GetWorkDir());
4611  else delete si;
4612 
4613  delete wrk;
4614  } else {
4615  fBadSlaves->Add(wrk);
4616  fActiveSlaves->Remove(wrk);
4617  fUniqueSlaves->Remove(wrk);
4618  fAllUniqueSlaves->Remove(wrk);
4619  fNonUniqueMasters->Remove(wrk);
4621  wrk->Close();
4622  // Update the mergers count, if needed
4623  if (fMergersSet) {
4624  Int_t mergersCount = -1;
4625  TParameter<Int_t> *mc = dynamic_cast<TParameter<Int_t> *>(GetParameter("PROOF_UseMergers"));
4626  if (mc) mergersCount = mc->GetVal(); // Value set by user
4627  // Mergers count is set dynamically: recalculate it
4628  if (mergersCount == 0) {
4630  if (activeWorkers > 1) {
4631  fMergersCount = TMath::Nint(TMath::Sqrt(activeWorkers));
4632  if (activeWorkers / fMergersCount < 2)
4633  fMergersCount = (Int_t) TMath::Sqrt(activeWorkers);
4634  }
4635  }
4636  }
4637  }
4638 
4639  // Update session workers files
4640  SaveWorkerInfo();
4641  } else {
4642  // On clients the proof session should be removed from the lists
4643  // and deleted, since it is not valid anymore
4644  fSlaves->Remove(wrk);
4645  if (fManager)
4646  fManager->DiscardSession(this);
4647  }
4648 }
4649 
4650 ////////////////////////////////////////////////////////////////////////////////
4651 /// Add slave with socket s to the bad slave list and remove if from
4652 /// the active list and from the two monitor objects.
4653 
4654 void TProof::MarkBad(TSocket *s, const char *reason)
4655 {
4656  std::lock_guard<std::recursive_mutex> lock(fCloseMutex);
4657 
4658  // We may have been invalidated in the meanwhile: nothing to do in such a case
4659  if (!IsValid()) return;
4660 
4661  TSlave *wrk = FindSlave(s);
4662  MarkBad(wrk, reason);
4663 }
4664 
4665 ////////////////////////////////////////////////////////////////////////////////
4666 /// Ask an active worker 'wrk' to terminate, i.e. to shutdown
4667 
4669 {
4670  if (!wrk) {
4671  Warning("TerminateWorker", "worker instance undefined: protocol error? ");
4672  return;
4673  }
4674 
4675  // Send stop message
4676  if (wrk->GetSocket() && wrk->GetSocket()->IsValid()) {
4677  TMessage mess(kPROOF_STOP);
4678  wrk->GetSocket()->Send(mess);
4679  } else {
4680  if (gDebug > 0)
4681  Info("TerminateWorker", "connection to worker is already down: cannot"
4682  " send termination message");
4683  }
4684 
4685  // This is a bad worker from now on
4687 }
4688 
4689 ////////////////////////////////////////////////////////////////////////////////
4690 /// Ask an active worker 'ord' to terminate, i.e. to shutdown
4691 
4692 void TProof::TerminateWorker(const char *ord)
4693 {
4694  if (ord && strlen(ord) > 0) {
4695  Bool_t all = (ord[0] == '*') ? kTRUE : kFALSE;
4696  if (IsMaster()) {
4697  TIter nxw(fSlaves);
4698  TSlave *wrk = 0;
4699  while ((wrk = (TSlave *)nxw())) {
4700  if (all || !strcmp(wrk->GetOrdinal(), ord)) {
4701  TerminateWorker(wrk);
4702  if (!all) break;
4703  }
4704  }
4705  } else {
4706  TMessage mess(kPROOF_STOP);
4707  mess << TString(ord);
4708  Broadcast(mess);
4709  }
4710  }
4711 }
4712 
4713 ////////////////////////////////////////////////////////////////////////////////
4714 /// Ping PROOF. Returns 1 if master server responded.
4715 
4717 {
4718  return Ping(kActive);
4719 }
4720 
4721 ////////////////////////////////////////////////////////////////////////////////
4722 /// Ping PROOF slaves. Returns the number of slaves that responded.
4723 
4725 {
4726  TList *slaves = 0;
4727  if (list == kAll) slaves = fSlaves;
4728  if (list == kActive) slaves = fActiveSlaves;
4729  if (list == kUnique) slaves = fUniqueSlaves;
4730  if (list == kAllUnique) slaves = fAllUniqueSlaves;
4731 
4732  if (slaves->GetSize() == 0) return 0;
4733 
4734  int nsent = 0;
4735  TIter next(slaves);
4736 
4737  TSlave *sl;
4738  while ((sl = (TSlave *)next())) {
4739  if (sl->IsValid()) {
4740  if (sl->Ping() == -1) {
4741  MarkBad(sl, "ping unsuccessful");
4742  } else {
4743  nsent++;
4744  }
4745  }
4746  }
4747 
4748  return nsent;
4749 }
4750 
4751 ////////////////////////////////////////////////////////////////////////////////
4752 /// Ping PROOF slaves. Returns the number of slaves that responded.
4753 
4755 {
4756  TList *slaves = fSlaves;
4757 
4758  if (slaves->GetSize() == 0) return;
4759 
4760  TIter next(slaves);
4761 
4762  TSlave *sl;
4763  while ((sl = (TSlave *)next())) {
4764  if (sl->IsValid()) {
4765  sl->Touch();
4766  }
4767  }
4768 
4769  return;
4770 }
4771 
4772 ////////////////////////////////////////////////////////////////////////////////
4773 /// Print status of PROOF cluster.
4774 
4775 void TProof::Print(Option_t *option) const
4776 {
4777  TString secCont;
4778 
4779  if (TestBit(TProof::kIsClient)) {
4780  Printf("Connected to: %s (%s)", GetMaster(),
4781  IsValid() ? "valid" : "invalid");
4782  Printf("Port number: %d", GetPort());
4783  Printf("User: %s", GetUser());
4784  Printf("ROOT version|rev: %s|%s", gROOT->GetVersion(), gROOT->GetGitCommit());
4785  Printf("Architecture-Compiler: %s-%s", gSystem->GetBuildArch(),
4787  TSlave *sl = (TSlave *)fActiveSlaves->First();
4788  if (sl) {
4789  TString sc;
4790  if (sl->GetSocket()->GetSecContext())
4791  Printf("Security context: %s",
4792  sl->GetSocket()->GetSecContext()->AsString(sc));
4793  Printf("Proofd protocol version: %d", sl->GetSocket()->GetRemoteProtocol());
4794  } else {
4795  Printf("Security context: Error - No connection");
4796  Printf("Proofd protocol version: Error - No connection");
4797  }
4798  Printf("Client protocol version: %d", GetClientProtocol());
4799  Printf("Remote protocol version: %d", GetRemoteProtocol());
4800  Printf("Log level: %d", GetLogLevel());
4801  Printf("Session unique tag: %s", IsValid() ? GetSessionTag() : "");
4802  Printf("Default data pool: %s", IsValid() ? GetDataPoolUrl() : "");
4803  if (IsValid())
4804  const_cast<TProof*>(this)->SendPrint(option);
4805  } else {
4806  const_cast<TProof*>(this)->AskStatistics();
4807  if (IsParallel())
4808  Printf("*** Master server %s (parallel mode, %d workers):",
4810  else
4811  Printf("*** Master server %s (sequential mode):",
4812  gProofServ->GetOrdinal());
4813 
4814  Printf("Master host name: %s", gSystem->HostName());
4815  Printf("Port number: %d", GetPort());
4816  if (strlen(gProofServ->GetGroup()) > 0) {
4817  Printf("User/Group: %s/%s", GetUser(), gProofServ->GetGroup());
4818  } else {
4819  Printf("User: %s", GetUser());
4820  }
4821  TString ver;
4822  ver.Form("%s|%s", gROOT->GetVersion(), gROOT->GetGitCommit());
4823  if (gSystem->Getenv("ROOTVERSIONTAG"))
4824  ver.Form("%s|%s", gROOT->GetVersion(), gSystem->Getenv("ROOTVERSIONTAG"));
4825  Printf("ROOT version|rev|tag: %s", ver.Data());
4826  Printf("Architecture-Compiler: %s-%s", gSystem->GetBuildArch(),
4828  Printf("Protocol version: %d", GetClientProtocol());
4829  Printf("Image name: %s", GetImage());
4830  Printf("Working directory: %s", gSystem->WorkingDirectory());
4831  Printf("Config directory: %s", GetConfDir());
4832  Printf("Config file: %s", GetConfFile());
4833  Printf("Log level: %d", GetLogLevel());
4834  Printf("Number of workers: %d", GetNumberOfSlaves());
4835  Printf("Number of active workers: %d", GetNumberOfActiveSlaves());
4836  Printf("Number of unique workers: %d", GetNumberOfUniqueSlaves());
4837  Printf("Number of inactive workers: %d", GetNumberOfInactiveSlaves());
4838  Printf("Number of bad workers: %d", GetNumberOfBadSlaves());
4839  Printf("Total MB's processed: %.2f", float(GetBytesRead())/(1024*1024));
4840  Printf("Total real time used (s): %.3f", GetRealTime());
4841  Printf("Total CPU time used (s): %.3f", GetCpuTime());
4842  if (TString(option).Contains("a", TString::kIgnoreCase) && GetNumberOfSlaves()) {
4843  Printf("List of workers:");
4844  TList masters;
4845  TIter nextslave(fSlaves);
4846  while (TSlave* sl = dynamic_cast<TSlave*>(nextslave())) {
4847  if (!sl->IsValid()) continue;
4848 
4849  if (sl->GetSlaveType() == TSlave::kSlave) {
4850  sl->Print(option);
4851  } else if (sl->GetSlaveType() == TSlave::kMaster) {
4852  TMessage mess(kPROOF_PRINT);
4853  mess.WriteString(option);
4854  if (sl->GetSocket()->Send(mess) == -1)
4855  const_cast<TProof*>(this)->MarkBad(sl, "could not send kPROOF_PRINT request");
4856  else
4857  masters.Add(sl);
4858  } else {
4859  Error("Print", "TSlave is neither Master nor Worker");
4860  R__ASSERT(0);
4861  }
4862  }
4863  const_cast<TProof*>(this)->Collect(&masters, fCollectTimeout);
4864  }
4865  }
4866 }
4867 
4868 ////////////////////////////////////////////////////////////////////////////////
4869 /// Extract from opt information about output handling settings.
4870 /// The understood keywords are:
4871 /// of=<file>, outfile=<file> output file location
4872 /// ds=<dsname>, dataset=<dsname> dataset name ('of' and 'ds' are
4873 /// mutually exclusive,execution stops
4874 /// if both are found)
4875 /// sft[=<opt>], savetofile[=<opt>] control saving to file
4876 ///
4877 /// For 'mvf', the <opt> integer has the following meaning:
4878 /// <opt> = <how>*10 + <force>
4879 /// <force> = 0 save to file if memory threshold is reached
4880 /// (the memory threshold is set by the cluster
4881 /// admin); in case an output file is defined, the
4882 /// files are merged at the end;
4883 /// 1 save results to file.
4884 /// <how> = 0 save at the end of the query
4885 /// 1 save results after each packet (to reduce the
4886 /// loss in case of crash).
4887 ///
4888 /// Setting 'ds' automatically sets 'mvf=1'; it is still possible to set 'mvf=11'
4889 /// to save results after each packet.
4890 ///
4891 /// The separator from the next option is either a ' ' or a ';'
4892 ///
4893 /// All recognized settings are removed from the input string opt.
4894 /// If action == 0, set up the output file accordingly, if action == 1 clean related
4895 /// output file settings.
4896 /// If the final target file is local then 'target' is set to the final local path
4897 /// when action == 0 and used to retrieve the file with TFile::Cp when action == 1.
4898 ///
4899 /// Output file settings are in the form
4900 ///
4901 /// <previous_option>of=name <next_option>
4902 /// <previous_option>outfile=name,...;<next_option>
4903 ///
4904 /// The separator from the next option is either a ' ' or a ';'
4905 /// Called interanally by TProof::Process.
4906 ///
4907 /// Returns 0 on success, -1 on error.
4908 
4910 {
4911  TString outfile, dsname, stfopt;
4912  if (action == 0) {
4913  TString tagf, tagd, tags, oo;
4914  Ssiz_t from = 0, iof = kNPOS, iod = kNPOS, ios = kNPOS;
4915  while (opt.Tokenize(oo, from, "[; ]")) {
4916  if (oo.BeginsWith("of=")) {
4917  tagf = "of=";
4918  iof = opt.Index(tagf);
4919  } else if (oo.BeginsWith("outfile=")) {
4920  tagf = "outfile=";
4921  iof = opt.Index(tagf);
4922  } else if (oo.BeginsWith("ds")) {
4923  tagd = "ds";
4924  iod = opt.Index(tagd);
4925  } else if (oo.BeginsWith("dataset")) {
4926  tagd = "dataset";
4927  iod = opt.Index(tagd);
4928  } else if (oo.BeginsWith("stf")) {
4929  tags = "stf";
4930  ios = opt.Index(tags);
4931  } else if (oo.BeginsWith("savetofile")) {
4932  tags = "savetofile";
4933  ios = opt.Index(tags);
4934  }
4935  }
4936  // Check consistency
4937  if (iof != kNPOS && iod != kNPOS) {
4938  Error("HandleOutputOptions", "options 'of'/'outfile' and 'ds'/'dataset' are incompatible!");
4939  return -1;
4940  }
4941 
4942  // Check output file first
4943  if (iof != kNPOS) {
4944  from = iof + tagf.Length();
4945  if (!opt.Tokenize(outfile, from, "[; ]") || outfile.IsNull()) {
4946  Error("HandleOutputOptions", "could not extract output file settings string! (%s)", opt.Data());
4947  return -1;
4948  }
4949  // For removal from original options string
4950  tagf += outfile;
4951  }
4952  // Check dataset
4953  if (iod != kNPOS) {
4954  from = iod + tagd.Length();
4955  if (!opt.Tokenize(dsname, from, "[; ]"))
4956  if (gDebug > 0) Info("HandleOutputOptions", "no dataset name found: use default");
4957  // For removal from original options string
4958  tagd += dsname;
4959  // The name may be empty or beginning with a '='
4960  if (dsname.BeginsWith("=")) dsname.Replace(0, 1, "");
4961  if (dsname.Contains("|V")) {
4962  target = "ds|V";
4963  dsname.ReplaceAll("|V", "");
4964  }
4965  if (dsname.IsNull()) dsname = "dataset_<qtag>";
4966  }
4967  // Check stf
4968  if (ios != kNPOS) {
4969  from = ios + tags.Length();
4970  if (!opt.Tokenize(stfopt, from, "[; ]"))
4971  if (gDebug > 0) Info("HandleOutputOptions", "save-to-file not found: use default");
4972  // For removal from original options string
4973  tags += stfopt;
4974  // It must be digit
4975  if (!stfopt.IsNull()) {
4976  if (stfopt.BeginsWith("=")) stfopt.Replace(0,1,"");
4977  if (!stfopt.IsNull()) {
4978  if (!stfopt.IsDigit()) {
4979  Error("HandleOutputOptions", "save-to-file option must be a digit! (%s)", stfopt.Data());
4980  return -1;
4981  }
4982  } else {
4983  // Default
4984  stfopt = "1";
4985  }
4986  } else {
4987  // Default
4988  stfopt = "1";
4989  }
4990  }
4991  // Remove from original options string
4992  opt.ReplaceAll(tagf, "");
4993  opt.ReplaceAll(tagd, "");
4994  opt.ReplaceAll(tags, "");
4995  }
4996 
4997  // Parse now
4998  if (action == 0) {
4999  // Output file
5000  if (!outfile.IsNull()) {
5001  if (!outfile.BeginsWith("master:")) {
5003  Warning("HandleOutputOptions",
5004  "directory '%s' for the output file does not exists or is not writable:"
5005  " saving to master", gSystem->DirName(outfile.Data()));
5006  outfile.Form("master:%s", gSystem->BaseName(outfile.Data()));
5007  } else {
5008  if (!IsLite()) {
5009  // The target file is local, so we need to retrieve it
5010  target = outfile;
5011  if (!stfopt.IsNull()) {
5012  outfile.Form("master:%s", gSystem->BaseName(target.Data()));
5013  } else {
5014  outfile = "";
5015  }
5016  }
5017  }
5018  }
5019  if (outfile.BeginsWith("master:")) {
5020  outfile.ReplaceAll("master:", "");
5021  if (outfile.IsNull() || !gSystem->IsAbsoluteFileName(outfile)) {
5022  // Get the master data dir
5023  TString ddir, emsg;
5024  if (!IsLite()) {
5025  if (Exec("gProofServ->GetDataDir()", "0", kTRUE) == 0) {
5026  TObjString *os = fMacroLog.GetLineWith("const char");
5027  if (os) {
5028  Ssiz_t fst = os->GetString().First('\"');
5029  Ssiz_t lst = os->GetString().Last('\"');
5030  ddir = os->GetString()(fst+1, lst-fst-1);
5031  } else {
5032  emsg = "could not find 'const char *' string in macro log! cannot continue";
5033  }
5034  } else {
5035  emsg = "could not retrieve master data directory info! cannot continue";
5036  }
5037  if (!emsg.IsNull()) {
5038  Error("HandleOutputOptions", "%s", emsg.Data());
5039  return -1;
5040  }
5041  }
5042  if (!ddir.IsNull()) ddir += "/";
5043  if (outfile.IsNull()) {
5044  outfile.Form("%s<file>", ddir.Data());
5045  } else {
5046  outfile.Insert(0, TString::Format("%s", ddir.Data()));
5047  }
5048  }
5049  }
5050  // Set the parameter
5051  if (!outfile.IsNull()) {
5052  if (!outfile.BeginsWith("of:")) outfile.Insert(0, "of:");
5053  SetParameter("PROOF_DefaultOutputOption", outfile.Data());
5054  }
5055  }
5056  // Dataset creation
5057  if (!dsname.IsNull()) {
5058  dsname.Insert(0, "ds:");
5059  // Set the parameter
5060  SetParameter("PROOF_DefaultOutputOption", dsname.Data());
5061  // Check the Save-To-File option
5062  if (!stfopt.IsNull()) {
5063  Int_t ostf = (Int_t) stfopt.Atoi();
5064  if (ostf%10 <= 0) {
5065  Warning("HandleOutputOptions", "Dataset required bu Save-To-File disabled: enabling!");
5066  stfopt.Form("%d", ostf+1);
5067  }
5068  } else {
5069  // Minimal setting
5070  stfopt = "1";
5071  }
5072  }
5073  // Save-To-File options
5074  if (!stfopt.IsNull()) {
5075  // Set the parameter
5076  SetParameter("PROOF_SavePartialResults", (Int_t) stfopt.Atoi());
5077  }
5078  } else {
5079  // Retrieve the file, if required
5080  if (GetOutputList()) {
5081  if (target == "ds|V") {
5082  // Find the dataset
5083  dsname = "";
5084  TIter nxo(GetOutputList());
5085  TObject *o = 0;
5086  while ((o = nxo())) {
5088  VerifyDataSet(o->GetName());
5089  dsname = o->GetName();
5090  break;
5091  }
5092  }
5093  if (!dsname.IsNull()) {
5094  TFileCollection *fc = GetDataSet(dsname);
5095  if (fc) {
5096  fc->Print();
5097  } else {
5098  Warning("HandleOutputOptions", "could not retrieve TFileCollection for dataset '%s'", dsname.Data());
5099  }
5100  } else {
5101  Warning("HandleOutputOptions", "dataset not found!");
5102  }
5103  } else {
5104  Bool_t targetcopied = kFALSE;
5105  TProofOutputFile *pf = 0;
5106  if (!target.IsNull())
5108  if (pf) {
5109  // Copy the file
5110  if (strcmp(TUrl(pf->GetOutputFileName(), kTRUE).GetUrl(),
5111  TUrl(target, kTRUE).GetUrl())) {
5112  if (TFile::Cp(pf->GetOutputFileName(), target)) {
5113  Printf(" Output successfully copied to %s", target.Data());
5114  targetcopied = kTRUE;
5115  } else {
5116  Warning("HandleOutputOptions", "problems copying output to %s", target.Data());
5117  }
5118  }
5119  }
5120  TFile *fout = 0;
5121  TObject *o = 0;
5122  TIter nxo(GetOutputList());
5123  Bool_t swapcopied = kFALSE;
5124  while ((o = nxo())) {
5125  TProofOutputFile *pof = dynamic_cast<TProofOutputFile *>(o);
5126  if (pof) {
5127  if (pof->TestBit(TProofOutputFile::kSwapFile) && !target.IsNull()) {
5128  if (pof == pf && targetcopied) continue;
5129  // Copy the file
5130  if (strcmp(TUrl(pf->GetOutputFileName(), kTRUE).GetUrl(),
5131  TUrl(target, kTRUE).GetUrl())) {
5132  if (TFile::Cp(pof->GetOutputFileName(), target)) {
5133  Printf(" Output successfully copied to %s", target.Data());
5134  swapcopied = kTRUE;
5135  } else {
5136  Warning("HandleOutputOptions", "problems copying output to %s", target.Data());
5137  }
5138  }
5139  } else if (pof->IsRetrieve()) {
5140  // Retrieve this file to the local path indicated in the title
5141  if (strcmp(TUrl(pf->GetOutputFileName(), kTRUE).GetUrl(),
5142  TUrl(pof->GetTitle(), kTRUE).GetUrl())) {
5143  if (TFile::Cp(pof->GetOutputFileName(), pof->GetTitle())) {
5144  Printf(" Output successfully copied to %s", pof->GetTitle());
5145  } else {
5146  Warning("HandleOutputOptions",
5147  "problems copying %s to %s", pof->GetOutputFileName(), pof->GetTitle());
5148  }
5149  }
5150  }
5151  }
5152  }
5153  if (!target.IsNull() && !swapcopied) {
5154  if (!fout && !pf) {
5155  fout = TFile::Open(target, "RECREATE");
5156  if (!fout || (fout && fout->IsZombie())) {
5157  SafeDelete(fout);
5158  Warning("HandleOutputOptions", "problems opening output file %s", target.Data());
5159  }
5160  }
5161  if (fout) {
5162  nxo.Reset();
5163  while ((o = nxo())) {
5164  TProofOutputFile *pof = dynamic_cast<TProofOutputFile *>(o);
5165  if (!pof) {
5166  // Write the object to the open output file
5167  o->Write();
5168  }
5169  }
5170  }
5171  }
5172  // Clean-up
5173  if (fout) {
5174  fout->Close();
5175  SafeDelete(fout);
5176  Printf(