Logo ROOT  
Reference Guide
TProof.cxx
Go to the documentation of this file.
1// @(#)root/proof:$Id: a2a50e759072c37ccbc65ecbcce735a76de86e95 $
2// Author: Fons Rademakers 13/02/97
3
4/*************************************************************************
5 * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12/**
13\defgroup proof PROOF
14
15Classes defining the Parallel ROOT Facility, PROOF, a framework for parallel analysis of ROOT TTrees.
16
17\deprecated
18We keep PROOF for those who still need it for legacy use cases.
19PROOF is not developed anymore and receiving only limited support.
20%ROOT has since a few years moved to RDataFrame and related products as multi-core/multi-processing engines.
21
22*/
23
24/**
25\defgroup proofkernel PROOF kernel Libraries
26\ingroup proof
27
28The PROOF kernel libraries (libProof, libProofPlayer, libProofDraw) contain the classes defining
29the kernel of the PROOF facility, i.e. the protocol and the utilities to steer data processing
30and handling of results.
31
32*/
33
34/** \class TProof
35\ingroup proofkernel
36
37This class controls a Parallel ROOT Facility, PROOF, cluster.
38It fires the worker servers, it keeps track of how many workers are
39running, it keeps track of the workers running status, it broadcasts
40messages to all workers, it collects results, etc.
41
42*/
43
44#include <stdlib.h>
45#include <fcntl.h>
46#include <errno.h>
47#ifdef WIN32
48# include <io.h>
49# include <sys/stat.h>
50# include <sys/types.h>
51# include "snprintf.h"
52#else
53# include <unistd.h>
54#endif
55#include <vector>
56
57#include "RConfigure.h"
58#include "Riostream.h"
59#include "Getline.h"
60#include "TBrowser.h"
61#include "TChain.h"
62#include "TCondor.h"
63#include "TDSet.h"
64#include "TError.h"
65#include "TEnv.h"
66#include "TEntryList.h"
67#include "TEventList.h"
68#include "TFile.h"
69#include "TFileInfo.h"
70#include "TFTP.h"
71#include "THashList.h"
72#include "TInterpreter.h"
73#include "TKey.h"
74#include "TMap.h"
75#include "TMath.h"
76#include "TMessage.h"
77#include "TMonitor.h"
78#include "TObjArray.h"
79#include "TObjString.h"
80#include "TParameter.h"
81#include "TProof.h"
82#include "TProofNodeInfo.h"
83#include "TProofOutputFile.h"
84#include "TVirtualProofPlayer.h"
85#include "TVirtualPacketizer.h"
86#include "TProofServ.h"
87#include "TPluginManager.h"
88#include "TQueryResult.h"
89#include "TRandom.h"
90#include "TRegexp.h"
91#include "TROOT.h"
92#include "TSlave.h"
93#include "TSocket.h"
94#include "TSortedList.h"
95#include "TSystem.h"
96#include "TTree.h"
97#include "TUrl.h"
98#include "TFileCollection.h"
99#include "TDataSetManager.h"
100#include "TDataSetManagerFile.h"
101#include "TMacro.h"
102#include "TSelector.h"
103#include "TPRegexp.h"
104#include "TPackMgr.h"
105
106#include <mutex>
107
109
110// Rotating indicator
111char TProofMergePrg::fgCr[4] = {'-', '\\', '|', '/'};
112
113TList *TProof::fgProofEnvList = 0; // List of env vars for proofserv
114TPluginHandler *TProof::fgLogViewer = 0; // Log viewer handler
115
117
118//----- PROOF Interrupt signal handler -----------------------------------------
119////////////////////////////////////////////////////////////////////////////////
120/// TProof interrupt handler.
121
123{
124 if (!fProof->IsTty() || fProof->GetRemoteProtocol() < 22) {
125
126 // Cannot ask the user : abort any remote processing
128
129 } else {
130 // Real stop or request to switch to asynchronous?
131 const char *a = 0;
132 if (fProof->GetRemoteProtocol() < 22) {
133 a = Getline("\nSwitch to asynchronous mode not supported remotely:"
134 "\nEnter S/s to stop, Q/q to quit, any other key to continue: ");
135 } else {
136 a = Getline("\nEnter A/a to switch asynchronous, S/s to stop, Q/q to quit,"
137 " any other key to continue: ");
138 }
139 if (a[0] == 'Q' || a[0] == 'S' || a[0] == 'q' || a[0] == 's') {
140
141 Info("Notify","Processing interrupt signal ... %c", a[0]);
142
143 // Stop or abort any remote processing
144 Bool_t abort = (a[0] == 'Q' || a[0] == 'q') ? kTRUE : kFALSE;
145 fProof->StopProcess(abort);
146
147 } else if ((a[0] == 'A' || a[0] == 'a') && fProof->GetRemoteProtocol() >= 22) {
148 // Stop any remote processing
150 }
151 }
152
153 return kTRUE;
154}
155
156//----- Input handler for messages from TProofServ -----------------------------
157////////////////////////////////////////////////////////////////////////////////
158/// Constructor
159
161 : TFileHandler(s->GetDescriptor(),1),
162 fSocket(s), fProof(p)
163{
164}
165
166////////////////////////////////////////////////////////////////////////////////
167/// Handle input
168
170{
172 return kTRUE;
173}
174
175
176//------------------------------------------------------------------------------
177
179
180////////////////////////////////////////////////////////////////////////////////
181/// Used to sort slaveinfos by ordinal.
182
184{
185 if (!obj) return 1;
186
187 const TSlaveInfo *si = dynamic_cast<const TSlaveInfo*>(obj);
188
189 if (!si) return fOrdinal.CompareTo(obj->GetName());
190
191 const char *myord = GetOrdinal();
192 const char *otherord = si->GetOrdinal();
193 while (myord && otherord) {
194 Int_t myval = atoi(myord);
195 Int_t otherval = atoi(otherord);
196 if (myval < otherval) return 1;
197 if (myval > otherval) return -1;
198 myord = strchr(myord, '.');
199 if (myord) myord++;
200 otherord = strchr(otherord, '.');
201 if (otherord) otherord++;
202 }
203 if (myord) return -1;
204 if (otherord) return 1;
205 return 0;
206}
207
208////////////////////////////////////////////////////////////////////////////////
209/// Used to compare slaveinfos by ordinal.
210
212{
213 if (!obj) return kFALSE;
214 const TSlaveInfo *si = dynamic_cast<const TSlaveInfo*>(obj);
215 if (!si) return kFALSE;
216 return (strcmp(GetOrdinal(), si->GetOrdinal()) == 0);
217}
218
219////////////////////////////////////////////////////////////////////////////////
220/// Print slave info. If opt = "active" print only the active
221/// slaves, if opt="notactive" print only the not active slaves,
222/// if opt = "bad" print only the bad slaves, else
223/// print all slaves.
224
226{
227 TString stat = fStatus == kActive ? "active" :
228 fStatus == kBad ? "bad" :
229 "not active";
230
231 Bool_t newfmt = kFALSE;
232 TString oo(opt);
233 if (oo.Contains("N")) {
234 newfmt = kTRUE;
235 oo.ReplaceAll("N","");
236 }
237 if (oo == "active" && fStatus != kActive) return;
238 if (oo == "notactive" && fStatus != kNotActive) return;
239 if (oo == "bad" && fStatus != kBad) return;
240
241 if (newfmt) {
242 TString msd, si, datadir;
243 if (!(fMsd.IsNull())) msd.Form("| msd: %s ", fMsd.Data());
244 if (!(fDataDir.IsNull())) datadir.Form("| datadir: %s ", fDataDir.Data());
245 if (fSysInfo.fCpus > 0) {
246 si.Form("| %s, %d cores, %d MB ram", fHostName.Data(),
248 } else {
249 si.Form("| %s", fHostName.Data());
250 }
251 Printf("Worker: %9s %s %s%s| %s", fOrdinal.Data(), si.Data(), msd.Data(), datadir.Data(), stat.Data());
252
253 } else {
254 TString msd = fMsd.IsNull() ? "<null>" : fMsd.Data();
255
256 std::cout << "Slave: " << fOrdinal
257 << " hostname: " << fHostName
258 << " msd: " << msd
259 << " perf index: " << fPerfIndex
260 << " " << stat
261 << std::endl;
262 }
263}
264
265////////////////////////////////////////////////////////////////////////////////
266/// Setter for fSysInfo
267
269{
270 fSysInfo.fOS = si.fOS; // OS
271 fSysInfo.fModel = si.fModel; // computer model
272 fSysInfo.fCpuType = si.fCpuType; // type of cpu
273 fSysInfo.fCpus = si.fCpus; // number of cpus
274 fSysInfo.fCpuSpeed = si.fCpuSpeed; // cpu speed in MHz
275 fSysInfo.fBusSpeed = si.fBusSpeed; // bus speed in MHz
276 fSysInfo.fL2Cache = si.fL2Cache; // level 2 cache size in KB
277 fSysInfo.fPhysRam = si.fPhysRam; // Physical RAM
278}
279
281
282//------------------------------------------------------------------------------
283
284////////////////////////////////////////////////////////////////////////////////
285/// Destructor
286
288{
289 // Just delete the list, the objects are owned by other list
290 if (fWorkers) {
293 }
294}
295////////////////////////////////////////////////////////////////////////////////
296/// Increase number of already merged workers by 1
297
299{
301 Error("SetMergedWorker", "all workers have been already merged before!");
302 else
304}
305
306////////////////////////////////////////////////////////////////////////////////
307/// Add new worker to the list of workers to be merged by this merger
308
310{
311 if (!fWorkers)
312 fWorkers = new TList();
313 if (fWorkersToMerge == fWorkers->GetSize()) {
314 Error("AddWorker", "all workers have been already assigned to this merger");
315 return;
316 }
317 fWorkers->Add(sl);
318}
319
320////////////////////////////////////////////////////////////////////////////////
321/// Return if merger has already merged all workers, i.e. if it has finished its merging job
322
324{
326}
327
328////////////////////////////////////////////////////////////////////////////////
329/// Return if the determined number of workers has been already assigned to this merger
330
332{
333 if (!fWorkers)
334 return kFALSE;
335
336 return (fWorkers->GetSize() == fWorkersToMerge);
337}
338
339////////////////////////////////////////////////////////////////////////////////
340/// This a private API function.
341/// It checks whether the connection string contains a PoD cluster protocol.
342/// If it does, then the connection string will be changed to reflect
343/// a real PROOF connection string for a PROOF cluster managed by PoD.
344/// PoD: http://pod.gsi.de .
345/// Return -1 if the PoD request failed; return 0 otherwise.
346
347static Int_t PoDCheckUrl(TString *_cluster)
348{
349 if ( !_cluster )
350 return 0;
351
352 // trim spaces from both sides of the string
353 *_cluster = _cluster->Strip( TString::kBoth );
354 // PoD protocol string
355 const TString pod_prot("pod");
356
357 // URL test
358 // TODO: The URL test is to support remote PoD servers (not managed by pod-remote)
359 TUrl url( _cluster->Data() );
360 if( pod_prot.CompareTo(url.GetProtocol(), TString::kIgnoreCase) )
361 return 0;
362
363 // PoD cluster is used
364 // call pod-info in a batch mode (-b).
365 // pod-info will find either a local PoD cluster or
366 // a remote one, manged by pod-remote.
367 *_cluster = gSystem->GetFromPipe("pod-info -c -b");
368 if( 0 == _cluster->Length() ) {
369 Error("PoDCheckUrl", "PoD server is not running");
370 return -1;
371 }
372 return 0;
373}
374
375////////////////////////////////////////////////////////////////////////////////
376/// Create a PROOF environment. Starting PROOF involves either connecting
377/// to a master server, which in turn will start a set of slave servers, or
378/// directly starting as master server (if master = ""). Masterurl is of
379/// the form: [proof[s]://]host[:port]. Conffile is the name of the config
380/// file describing the remote PROOF cluster (this argument alows you to
381/// describe different cluster configurations).
382/// The default is proof.conf. Confdir is the directory where the config
383/// file and other PROOF related files are (like motd and noproof files).
384/// Loglevel is the log level (default = 1). User specified custom config
385/// files will be first looked for in $HOME/.conffile.
386
387TProof::TProof(const char *masterurl, const char *conffile, const char *confdir,
388 Int_t loglevel, const char *alias, TProofMgr *mgr)
389 : fUrl(masterurl)
390{
391 // Default initializations
392 InitMembers();
393
394 // This may be needed during init
395 fManager = mgr;
396
397 // Default server type
399
400 // Default query mode
402
403 // Parse the main URL, adjusting the missing fields and setting the relevant
404 // bits
407
408 // Protocol and Host
409 if (!masterurl || strlen(masterurl) <= 0) {
410 fUrl.SetProtocol("proof");
411 fUrl.SetHost("__master__");
412 } else if (!(strstr(masterurl, "://"))) {
413 fUrl.SetProtocol("proof");
414 }
415 // Port
416 if (fUrl.GetPort() == TUrl(" ").GetPort())
417 fUrl.SetPort(TUrl("proof:// ").GetPort());
418
419 // Make sure to store the FQDN, so to get a solid reference for subsequent checks
420 if (!strcmp(fUrl.GetHost(), "__master__"))
421 fMaster = fUrl.GetHost();
422 else if (!strlen(fUrl.GetHost()))
424 else
426
427 // Server type
428 if (strlen(fUrl.GetOptions()) > 0) {
429 TString opts(fUrl.GetOptions());
430 if (!(strncmp(fUrl.GetOptions(),"std",3))) {
432 opts.Remove(0,3);
433 fUrl.SetOptions(opts.Data());
434 } else if (!(strncmp(fUrl.GetOptions(),"lite",4))) {
436 opts.Remove(0,4);
437 fUrl.SetOptions(opts.Data());
438 }
439 }
440
441 // Instance type
445 if (fMaster == "__master__") {
449 } else if (fMaster == "prooflite") {
450 // Client and master are merged
453 }
454 // Flag that we are a client
456 if (!gSystem->Getenv("ROOTPROOFCLIENT")) gSystem->Setenv("ROOTPROOFCLIENT","");
457
458 Init(masterurl, conffile, confdir, loglevel, alias);
459
460 // If the user was not set, get it from the master
461 if (strlen(fUrl.GetUser()) <= 0) {
462 TString usr, emsg;
463 if (Exec("gProofServ->GetUser()", "0", kTRUE) == 0) {
464 TObjString *os = fMacroLog.GetLineWith("const char");
465 if (os) {
466 Ssiz_t fst = os->GetString().First('\"');
467 Ssiz_t lst = os->GetString().Last('\"');
468 usr = os->GetString()(fst+1, lst-fst-1);
469 } else {
470 emsg = "could not find 'const char *' string in macro log";
471 }
472 } else {
473 emsg = "could not retrieve user info";
474 }
475 if (!emsg.IsNull()) {
476 // Get user logon name
478 if (pw) {
479 usr = pw->fUser;
480 delete pw;
481 }
482 Warning("TProof", "%s: using local default %s", emsg.Data(), usr.Data());
483 }
484 // Set the user name in the main URL
485 fUrl.SetUser(usr.Data());
486 }
487
488 // If called by a manager, make sure it stays in last position
489 // for cleaning
490 if (mgr) {
492 gROOT->GetListOfSockets()->Remove(mgr);
493 gROOT->GetListOfSockets()->Add(mgr);
494 }
495
496 // Old-style server type: we add this to the list and set the global pointer
498 if (!gROOT->GetListOfProofs()->FindObject(this))
499 gROOT->GetListOfProofs()->Add(this);
500
501 // Still needed by the packetizers: needs to be changed
502 gProof = this;
503}
504
505////////////////////////////////////////////////////////////////////////////////
506/// Protected constructor to be used by classes deriving from TProof
507/// (they have to call Init themselves and override StartSlaves
508/// appropriately).
509///
510/// This constructor simply closes any previous gProof and sets gProof
511/// to this instance.
512
513TProof::TProof() : fUrl(""), fServType(TProofMgr::kXProofd)
514{
515 // Default initializations
516 InitMembers();
517
518 if (!gROOT->GetListOfProofs()->FindObject(this))
519 gROOT->GetListOfProofs()->Add(this);
520
521 gProof = this;
522}
523
524////////////////////////////////////////////////////////////////////////////////
525/// Default initializations
526
528{
529 fValid = kFALSE;
530 fTty = kFALSE;
531 fRecvMessages = 0;
532 fSlaveInfo = 0;
537 fActiveSlaves = 0;
538 fInactiveSlaves = 0;
539 fUniqueSlaves = 0;
542 fActiveMonitor = 0;
543 fUniqueMonitor = 0;
545 fCurrentMonitor = 0;
546 fBytesRead = 0;
547 fRealTime = 0;
548 fCpuTime = 0;
549 fIntHandler = 0;
550 fProgressDialog = 0;
553 fPlayer = 0;
554 fFeedback = 0;
555 fChains = 0;
556 fDSet = 0;
557 fNotIdle = 0;
558 fSync = kTRUE;
562 fLogFileW = 0;
563 fLogFileR = 0;
566 fMacroLog.SetName("ProofLogMacro");
567
568 fWaitingSlaves = 0;
569 fQueries = 0;
570 fOtherQueries = 0;
571 fDrawQueries = 0;
572 fMaxDrawQueries = 1;
573 fSeqNum = 0;
574
575 fSessionID = -1;
577
578 fPackMgr = 0;
580
581 fInputData = 0;
582
583 fPrintProgress = 0;
584
585 fLoadedMacros = 0;
586
587 fProtocol = -1;
588 fSlaves = 0;
590 fBadSlaves = 0;
591 fAllMonitor = 0;
593 fBytesReady = 0;
594 fTotalBytes = 0;
597 fRunningDSets = 0;
598
599 fCollectTimeout = -1;
600
601 fManager = 0;
604
607 fMergers = 0;
608 fMergersCount = -1;
610 fWorkersToMerge = 0;
612
613 fPerfTree = "";
614
616
617 fSelector = 0;
618
619 fPrepTime = 0.;
620
621 // Check if the user defined a list of environment variables to send over:
622 // include them into the dedicated list
623 if (gSystem->Getenv("PROOF_ENVVARS")) {
624 TString envs(gSystem->Getenv("PROOF_ENVVARS")), env, envsfound;
625 Int_t from = 0;
626 while (envs.Tokenize(env, from, ",")) {
627 if (!env.IsNull()) {
628 if (!gSystem->Getenv(env)) {
629 Warning("Init", "request for sending over undefined environemnt variable '%s' - ignoring", env.Data());
630 } else {
631 if (!envsfound.IsNull()) envsfound += ",";
632 envsfound += env;
634 TProof::AddEnvVar(env, gSystem->Getenv(env));
635 }
636 }
637 }
638 if (envsfound.IsNull()) {
639 Warning("Init", "none of the requested env variables were found: '%s'", envs.Data());
640 } else {
641 Info("Init", "the following environment variables have been added to the list to be sent to the nodes: '%s'", envsfound.Data());
642 }
643 }
644
645 // Done
646 return;
647}
648
649////////////////////////////////////////////////////////////////////////////////
650/// Clean up PROOF environment.
651
653{
654 if (fChains) {
655 while (TChain *chain = dynamic_cast<TChain*> (fChains->First()) ) {
656 // remove "chain" from list
657 chain->SetProof(0);
658 RemoveChain(chain);
659 }
660 }
661
662 // remove links to packages enabled on the client
664 // iterate over all packages
666 if (epl) {
667 TIter nxp(epl);
668 while (TObjString *pck = (TObjString *)(nxp())) {
669 FileStat_t stat;
670 if (gSystem->GetPathInfo(pck->String(), stat) == 0) {
671 // check if symlink, if so unlink
672 // NOTE: GetPathInfo() returns 1 in case of symlink that does not point to
673 // existing file or to a directory, but if fIsLink is true the symlink exists
674 if (stat.fIsLink)
675 gSystem->Unlink(pck->String());
676 }
677 }
678 epl->Delete();
679 delete epl;
680 }
681 }
682
683 Close();
710 if (fWrksOutputReady) {
712 delete fWrksOutputReady;
713 }
714 if (fQueries) {
715 fQueries->Delete();
716 delete fQueries;
717 }
718
719 // remove file with redirected logs
721 if (fLogFileR)
722 fclose(fLogFileR);
723 if (fLogFileW)
724 fclose(fLogFileW);
725 if (fLogFileName.Length() > 0)
727 }
728
729 // Remove for the global list
730 gROOT->GetListOfProofs()->Remove(this);
731 // ... and from the manager list
732 if (fManager && fManager->IsValid())
734
735 if (gProof && gProof == this) {
736 // Set previous as default
737 TIter pvp(gROOT->GetListOfProofs(), kIterBackward);
738 while ((gProof = (TProof *)pvp())) {
740 break;
741 }
742 }
743
744 // For those interested in our destruction ...
745 Emit("~TProof()");
746 Emit("CloseWindow()");
747}
748
749////////////////////////////////////////////////////////////////////////////////
750/// Start the PROOF environment. Starting PROOF involves either connecting
751/// to a master server, which in turn will start a set of slave servers, or
752/// directly starting as master server (if master = ""). For a description
753/// of the arguments see the TProof ctor. Returns the number of started
754/// master or slave servers, returns 0 in case of error, in which case
755/// fValid remains false.
756
757Int_t TProof::Init(const char *, const char *conffile,
758 const char *confdir, Int_t loglevel, const char *alias)
759{
761
762 fValid = kFALSE;
763
764 // Connected to terminal?
765 fTty = (isatty(0) == 0 || isatty(1) == 0) ? kFALSE : kTRUE;
766
767 // If in attach mode, options is filled with additional info
768 Bool_t attach = kFALSE;
769 if (strlen(fUrl.GetOptions()) > 0) {
770 attach = kTRUE;
771 // A flag from the GUI
772 TString opts = fUrl.GetOptions();
773 if (opts.Contains("GUI")) {
775 opts.Remove(opts.Index("GUI"));
776 fUrl.SetOptions(opts);
777 }
778 }
779
781 // Fill default conf file and conf dir
782 if (!conffile || !conffile[0])
784 if (!confdir || !confdir[0])
786 // The group; the client receives it in the kPROOF_SESSIONTAG message
788 } else {
789 fConfDir = confdir;
790 fConfFile = conffile;
791 }
792
793 // Analysise the conffile field
794 if (fConfFile.Contains("workers=0")) fConfFile.ReplaceAll("workers=0", "masteronly");
796
798 fLogLevel = loglevel;
801 fImage = fMasterServ ? "" : "<local>";
802 fIntHandler = 0;
803 fStatus = 0;
804 fRecvMessages = new TList;
806 fSlaveInfo = 0;
807 fChains = new TList;
810 fRunningDSets = 0;
812 fInputData = 0;
814 fPrintProgress = 0;
815
816 // Timeout for some collect actions
817 fCollectTimeout = gEnv->GetValue("Proof.CollectTimeout", -1);
818
819 // Should the workers be started dynamically; default: no
820 fDynamicStartup = gEnv->GetValue("Proof.DynamicStartup", kFALSE);
821
822 // Default entry point for the data pool is the master
824 fDataPoolUrl.Form("root://%s", fMaster.Data());
825 else
826 fDataPoolUrl = "";
827
828 fProgressDialog = 0;
830
831 // Default alias is the master name
832 TString al = (alias) ? alias : fMaster.Data();
833 SetAlias(al);
834
835 // Client logging of messages from the master and slaves
838 fLogFileName.Form("%s/ProofLog_%d", gSystem->TempDirectory(), gSystem->GetPid());
839 if ((fLogFileW = fopen(fLogFileName, "w")) == 0)
840 Error("Init", "could not create temporary logfile");
841 if ((fLogFileR = fopen(fLogFileName, "r")) == 0)
842 Error("Init", "could not open temp logfile for reading");
843 }
845
846 // Status of cluster
847 fNotIdle = 0;
848 // Query type
849 fSync = (attach) ? kFALSE : kTRUE;
850 // Not enqueued
852
853 // Counters
854 fBytesRead = 0;
855 fRealTime = 0;
856 fCpuTime = 0;
857
858 // List of queries
859 fQueries = 0;
860 fOtherQueries = 0;
861 fDrawQueries = 0;
862 fMaxDrawQueries = 1;
863 fSeqNum = 0;
864
865 // Remote ID of the session
866 fSessionID = -1;
867
868 // Part of active query
869 fWaitingSlaves = 0;
870
871 // Make remote PROOF player
872 fPlayer = 0;
873 MakePlayer();
874
875 fFeedback = new TList;
877 fFeedback->SetName("FeedbackList");
879
880 // sort slaves by descending performance index
882 fActiveSlaves = new TList;
884 fUniqueSlaves = new TList;
887 fBadSlaves = new TList;
888 fAllMonitor = new TMonitor;
892 fCurrentMonitor = 0;
893
896
897 fLoadedMacros = 0;
898 fPackMgr = 0;
899
900 // Enable optimized sending of streamer infos to use embedded backward/forward
901 // compatibility support between different ROOT versions and different versions of
902 // users classes
903 Bool_t enableSchemaEvolution = gEnv->GetValue("Proof.SchemaEvolution",1);
904 if (enableSchemaEvolution) {
906 } else {
907 Info("TProof", "automatic schema evolution in TMessage explicitly disabled");
908 }
909
910 if (IsMaster()) {
911 // to make UploadPackage() method work on the master as well.
913 } else {
914
915 TString sandbox;
916 if (GetSandbox(sandbox, kTRUE) != 0) {
917 Error("Init", "failure asserting sandbox directory %s", sandbox.Data());
918 return 0;
919 }
920
921 // Package Dir
922 TString packdir = gEnv->GetValue("Proof.PackageDir", "");
923 if (packdir.IsNull())
924 packdir.Form("%s/%s", sandbox.Data(), kPROOF_PackDir);
925 if (AssertPath(packdir, kTRUE) != 0) {
926 Error("Init", "failure asserting directory %s", packdir.Data());
927 return 0;
928 }
929 fPackMgr = new TPackMgr(packdir);
930 if (gDebug > 0)
931 Info("Init", "package directory set to %s", packdir.Data());
932 }
933
934 if (!IsMaster()) {
935 // List of directories where to look for global packages
936 TString globpack = gEnv->GetValue("Proof.GlobalPackageDirs","");
938 Int_t nglb = TPackMgr::RegisterGlobalPath(globpack);
939 if (gDebug > 0)
940 Info("Init", " %d global package directories registered", nglb);
941 }
942
943 // Master may want dynamic startup
944 if (fDynamicStartup) {
945 if (!IsMaster()) {
946 // If on client - start the master
947 if (!StartSlaves(attach))
948 return 0;
949 }
950 } else {
951
952 // Master Only mode (for operations requiring only the master, e.g. dataset browsing,
953 // result retrieving, ...)
954 Bool_t masterOnly = gEnv->GetValue("Proof.MasterOnly", kFALSE);
955 if (!IsMaster() || !masterOnly) {
956 // Start slaves (the old, static, per-session way)
957 if (!StartSlaves(attach))
958 return 0;
959 // Client: Is Master in dynamic startup mode?
960 if (!IsMaster()) {
961 Int_t dyn = 0;
962 GetRC("Proof.DynamicStartup", dyn);
963 if (dyn != 0) fDynamicStartup = kTRUE;
964 }
965 }
966 }
967 // we are now properly initialized
968 fValid = kTRUE;
969
970 // De-activate monitor (will be activated in Collect)
972
973 // By default go into parallel mode
974 Int_t nwrk = GetRemoteProtocol() > 35 ? -1 : 9999;
975 TNamed *n = 0;
976 if (TProof::GetEnvVars() &&
977 (n = (TNamed *) TProof::GetEnvVars()->FindObject("PROOF_NWORKERS"))) {
978 TString s(n->GetTitle());
979 if (s.IsDigit()) nwrk = s.Atoi();
980 }
981 GoParallel(nwrk, attach);
982
983 // Send relevant initial state to slaves
984 if (!attach)
986 else if (!IsIdle())
987 // redirect log
989
990 // Done at this point, the alias will be communicated to the coordinator, if any
992 SetAlias(al);
993
995
996 if (IsValid()) {
997
998 // Activate input handler
1000
1002 gROOT->GetListOfSockets()->Add(this);
1003 }
1004
1005 AskParallel();
1006
1007 return fActiveSlaves->GetSize();
1008}
1009
1010////////////////////////////////////////////////////////////////////////////////
1011/// Set the sandbox path from ' Proof.Sandbox' or the alternative var 'rc'.
1012/// Use the existing setting or the default if nothing is found.
1013/// If 'assert' is kTRUE, make also sure that the path exists.
1014/// Return 0 on success, -1 on failure
1015
1016Int_t TProof::GetSandbox(TString &sb, Bool_t assert, const char *rc)
1017{
1018 // Get it from 'rc', if defined
1019 if (rc && strlen(rc)) sb = gEnv->GetValue(rc, sb);
1020 // Or use the default 'rc'
1021 if (sb.IsNull()) sb = gEnv->GetValue("Proof.Sandbox", "");
1022 // If nothing found , use the default
1023 if (sb.IsNull()) sb.Form("~/%s", kPROOF_WorkDir);
1024 // Expand special settings
1025 if (sb == ".") {
1026 sb = gSystem->pwd();
1027 } else if (sb == "..") {
1028 sb = gSystem->GetDirName(gSystem->pwd());
1029 }
1031
1032 // Assert the path, if required
1033 if (assert && AssertPath(sb, kTRUE) != 0) return -1;
1034 // Done
1035 return 0;
1036}
1037
1038////////////////////////////////////////////////////////////////////////////////
1039/// The config file field may contain special instructions which need to be
1040/// parsed at the beginning, e.g. for debug runs with valgrind.
1041/// Several options can be given separated by a ','
1042
1043void TProof::ParseConfigField(const char *config)
1044{
1045 TString sconf(config), opt;
1046 Ssiz_t from = 0;
1047 Bool_t cpuPin = kFALSE;
1048
1049 // Analysise the field
1050 const char *cq = (IsLite()) ? "\"" : "";
1051 while (sconf.Tokenize(opt, from, ",")) {
1052 if (opt.IsNull()) continue;
1053
1054 if (opt.BeginsWith("valgrind")) {
1055 // Any existing valgrind setting? User can give full settings, which we fully respect,
1056 // or pass additional options for valgrind by prefixing 'valgrind_opts:'. For example,
1057 // TProof::AddEnvVar("PROOF_MASTER_WRAPPERCMD", "valgrind_opts:--time-stamp --leak-check=full"
1058 // will add option "--time-stamp --leak-check=full" to our default options
1059 TString mst, top, sub, wrk, all;
1060 TList *envs = fgProofEnvList;
1061 TNamed *n = 0;
1062 if (envs) {
1063 if ((n = (TNamed *) envs->FindObject("PROOF_WRAPPERCMD")))
1064 all = n->GetTitle();
1065 if ((n = (TNamed *) envs->FindObject("PROOF_MASTER_WRAPPERCMD")))
1066 mst = n->GetTitle();
1067 if ((n = (TNamed *) envs->FindObject("PROOF_TOPMASTER_WRAPPERCMD")))
1068 top = n->GetTitle();
1069 if ((n = (TNamed *) envs->FindObject("PROOF_SUBMASTER_WRAPPERCMD")))
1070 sub = n->GetTitle();
1071 if ((n = (TNamed *) envs->FindObject("PROOF_SLAVE_WRAPPERCMD")))
1072 wrk = n->GetTitle();
1073 }
1074 if (all != "" && mst == "") mst = all;
1075 if (all != "" && top == "") top = all;
1076 if (all != "" && sub == "") sub = all;
1077 if (all != "" && wrk == "") wrk = all;
1078 if (all != "" && all.BeginsWith("valgrind_opts:")) {
1079 // The field is used to add an option Reset the setting
1080 Info("ParseConfigField","valgrind run: resetting 'PROOF_WRAPPERCMD':"
1081 " must be set again for next run , if any");
1082 TProof::DelEnvVar("PROOF_WRAPPERCMD");
1083 }
1084 TString var, cmd;
1085 cmd.Form("%svalgrind -v --suppressions=<rootsys>/etc/valgrind-root.supp", cq);
1086 TString mstlab("NO"), wrklab("NO");
1087 Bool_t doMaster = (opt == "valgrind" || (opt.Contains("master") &&
1088 !opt.Contains("topmaster") && !opt.Contains("submaster")))
1089 ? kTRUE : kFALSE;
1090 if (doMaster) {
1091 if (!IsLite()) {
1092 // Check if we have to add a var
1093 if (mst == "" || mst.BeginsWith("valgrind_opts:")) {
1094 mst.ReplaceAll("valgrind_opts:","");
1095 var.Form("%s --log-file=<logfilemst>.valgrind.log %s", cmd.Data(), mst.Data());
1096 TProof::AddEnvVar("PROOF_MASTER_WRAPPERCMD", var);
1097 mstlab = "YES";
1098 } else if (mst != "") {
1099 mstlab = "YES";
1100 }
1101 } else {
1102 if (opt.Contains("master")) {
1103 Warning("ParseConfigField",
1104 "master valgrinding does not make sense for PROOF-Lite: ignoring");
1105 opt.ReplaceAll("master", "");
1106 if (!opt.Contains("workers")) return;
1107 }
1108 if (opt == "valgrind" || opt == "valgrind=") opt = "valgrind=workers";
1109 }
1110 }
1111 if (opt.Contains("topmaster")) {
1112 // Check if we have to add a var
1113 if (top == "" || top.BeginsWith("valgrind_opts:")) {
1114 top.ReplaceAll("valgrind_opts:","");
1115 var.Form("%s --log-file=<logfilemst>.valgrind.log %s", cmd.Data(), top.Data());
1116 TProof::AddEnvVar("PROOF_TOPMASTER_WRAPPERCMD", var);
1117 mstlab = "YES";
1118 } else if (top != "") {
1119 mstlab = "YES";
1120 }
1121 }
1122 if (opt.Contains("submaster")) {
1123 // Check if we have to add a var
1124 if (sub == "" || sub.BeginsWith("valgrind_opts:")) {
1125 sub.ReplaceAll("valgrind_opts:","");
1126 var.Form("%s --log-file=<logfilemst>.valgrind.log %s", cmd.Data(), sub.Data());
1127 TProof::AddEnvVar("PROOF_SUBMASTER_WRAPPERCMD", var);
1128 mstlab = "YES";
1129 } else if (sub != "") {
1130 mstlab = "YES";
1131 }
1132 }
1133 if (opt.Contains("=workers") || opt.Contains("+workers")) {
1134 // Check if we have to add a var
1135 if (wrk == "" || wrk.BeginsWith("valgrind_opts:")) {
1136 wrk.ReplaceAll("valgrind_opts:","");
1137 var.Form("%s --log-file=<logfilewrk>.__valgrind__.log %s%s", cmd.Data(), wrk.Data(), cq);
1138 TProof::AddEnvVar("PROOF_SLAVE_WRAPPERCMD", var);
1139 TString nwrks("2");
1140 Int_t inw = opt.Index('#');
1141 if (inw != kNPOS) {
1142 nwrks = opt(inw+1, opt.Length());
1143 if (!nwrks.IsDigit()) nwrks = "2";
1144 }
1145 // Set the relevant variables
1146 if (!IsLite()) {
1147 TProof::AddEnvVar("PROOF_NWORKERS", nwrks);
1148 } else {
1149 gEnv->SetValue("ProofLite.Workers", nwrks.Atoi());
1150 }
1151 wrklab = nwrks;
1152 // Register the additional worker log in the session file
1153 // (for the master this is done automatically)
1154 TProof::AddEnvVar("PROOF_ADDITIONALLOG", "__valgrind__.log*");
1155 } else if (wrk != "") {
1156 wrklab = "ALL";
1157 }
1158 }
1159 // Increase the relevant timeouts
1160 if (!IsLite()) {
1161 TProof::AddEnvVar("PROOF_INTWAIT", "5000");
1162 gEnv->SetValue("Proof.SocketActivityTimeout", 6000);
1163 } else {
1164 gEnv->SetValue("ProofLite.StartupTimeOut", 5000);
1165 }
1166 // Warn for slowness
1167 Printf(" ");
1168 if (!IsLite()) {
1169 Printf(" ---> Starting a debug run with valgrind (master:%s, workers:%s)", mstlab.Data(), wrklab.Data());
1170 } else {
1171 Printf(" ---> Starting a debug run with valgrind (workers:%s)", wrklab.Data());
1172 }
1173 Printf(" ---> Please be patient: startup may be VERY slow ...");
1174 Printf(" ---> Logs will be available as special tags in the log window (from the progress dialog or TProof::LogViewer()) ");
1175 Printf(" ---> (Reminder: this debug run makes sense only if you are running a debug version of ROOT)");
1176 Printf(" ");
1177
1178 } else if (opt.BeginsWith("igprof-pp")) {
1179
1180 // IgProf profiling on master and worker. PROOF does not set the
1181 // environment for you: proper environment variables (like PATH and
1182 // LD_LIBRARY_PATH) should be set externally
1183
1184 Printf("*** Requested IgProf performance profiling ***");
1185 TString addLogExt = "__igprof.pp__.log";
1186 TString addLogFmt = "igprof -pk -pp -t proofserv.exe -o %s.%s";
1187 TString tmp;
1188
1189 if (IsLite()) {
1190 addLogFmt.Append("\"");
1191 addLogFmt.Prepend("\"");
1192 }
1193
1194 tmp.Form(addLogFmt.Data(), "<logfilemst>", addLogExt.Data());
1195 TProof::AddEnvVar("PROOF_MASTER_WRAPPERCMD", tmp.Data());
1196
1197 tmp.Form(addLogFmt.Data(), "<logfilewrk>", addLogExt.Data());
1198 TProof::AddEnvVar("PROOF_SLAVE_WRAPPERCMD", tmp.Data() );
1199
1200 TProof::AddEnvVar("PROOF_ADDITIONALLOG", addLogExt.Data());
1201
1202 } else if (opt.BeginsWith("cpupin=")) {
1203 // Enable CPU pinning. Takes as argument the list of processor IDs
1204 // that will be used in order. Processor IDs are numbered from 0,
1205 // use likwid to see how they are organized. A possible parameter
1206 // format would be:
1207 //
1208 // cpupin=3+4+0+9+10+22+7
1209 //
1210 // Only the specified processor IDs will be used in a round-robin
1211 // fashion, dealing with the fact that you can request more workers
1212 // than the number of processor IDs you have specified.
1213 //
1214 // To use all available processors in their order:
1215 //
1216 // cpupin=*
1217
1218 opt.Remove(0, 7);
1219
1220 // Remove any char which is neither a number nor a plus '+'
1221 for (Ssiz_t i=0; i<opt.Length(); i++) {
1222 Char_t c = opt[i];
1223 if ((c != '+') && ((c < '0') || (c > '9')))
1224 opt[i] = '_';
1225 }
1226 opt.ReplaceAll("_", "");
1227 TProof::AddEnvVar("PROOF_SLAVE_CPUPIN_ORDER", opt);
1228 cpuPin = kTRUE;
1229 } else if (opt.BeginsWith("workers=")) {
1230
1231 // Request for a given number of workers (within the max) or worker
1232 // startup combination:
1233 // workers=5 start max 5 workers (or less, if less are assigned)
1234 // workers=2x start max 2 workers per node (or less, if less are assigned)
1235 opt.ReplaceAll("workers=","");
1236 TProof::AddEnvVar("PROOF_NWORKERS", opt);
1237 }
1238 }
1239
1240 // In case of PROOF-Lite, enable CPU pinning when requested (Linux only)
1241 #ifdef R__LINUX
1242 if (IsLite() && cpuPin) {
1243 Printf("*** Requested CPU pinning ***");
1244 const TList *ev = GetEnvVars();
1245 const char *pinCmd = "taskset -c <cpupin>";
1246 TString val;
1247 TNamed *p;
1248 if (ev && (p = dynamic_cast<TNamed *>(ev->FindObject("PROOF_SLAVE_WRAPPERCMD")))) {
1249 val = p->GetTitle();
1250 val.Insert(val.Length()-1, " ");
1251 val.Insert(val.Length()-1, pinCmd);
1252 }
1253 else {
1254 val.Form("\"%s\"", pinCmd);
1255 }
1256 TProof::AddEnvVar("PROOF_SLAVE_WRAPPERCMD", val.Data());
1257 }
1258 #endif
1259}
1260
1261////////////////////////////////////////////////////////////////////////////////
1262/// Make sure that 'path' exists; if 'writable' is kTRUE, make also sure
1263/// that the path is writable
1264
1265Int_t TProof::AssertPath(const char *inpath, Bool_t writable)
1266{
1267 if (!inpath || strlen(inpath) <= 0) {
1268 Error("AssertPath", "undefined input path");
1269 return -1;
1270 }
1271
1272 TString path(inpath);
1273 gSystem->ExpandPathName(path);
1274
1275 if (gSystem->AccessPathName(path, kFileExists)) {
1276 if (gSystem->mkdir(path, kTRUE) != 0) {
1277 Error("AssertPath", "could not create path %s", path.Data());
1278 return -1;
1279 }
1280 }
1281 // It must be writable
1282 if (gSystem->AccessPathName(path, kWritePermission) && writable) {
1283 if (gSystem->Chmod(path, 0666) != 0) {
1284 Error("AssertPath", "could not make path %s writable", path.Data());
1285 return -1;
1286 }
1287 }
1288
1289 // Done
1290 return 0;
1291}
1292
1293////////////////////////////////////////////////////////////////////////////////
1294/// Set manager and schedule its destruction after this for clean
1295/// operations.
1296
1298{
1299 fManager = mgr;
1300
1301 if (mgr) {
1303 gROOT->GetListOfSockets()->Remove(mgr);
1304 gROOT->GetListOfSockets()->Add(mgr);
1305 }
1306}
1307
1308////////////////////////////////////////////////////////////////////////////////
1309/// Works on the master node only.
1310/// It starts workers on the machines in workerList and sets the paths,
1311/// packages and macros as on the master.
1312/// It is a subbstitute for StartSlaves(...)
1313/// The code is mostly the master part of StartSlaves,
1314/// with the parallel startup removed.
1315
1317{
1318 if (!IsMaster()) {
1319 Error("AddWorkers", "AddWorkers can only be called on the master!");
1320 return -1;
1321 }
1322
1323 if (!workerList || !(workerList->GetSize())) {
1324 Error("AddWorkers", "empty list of workers!");
1325 return -2;
1326 }
1327
1328 // Code taken from master part of StartSlaves with the parllel part removed
1329
1331 if (fImage.IsNull())
1333
1334 // Get all workers
1335 UInt_t nSlaves = workerList->GetSize();
1336 UInt_t nSlavesDone = 0;
1337 Int_t ord = 0;
1338
1339 // Loop over all new workers and start them (if we had already workers it means we are
1340 // increasing parallelism or that is not the first time we are called)
1341 Bool_t goMoreParallel = (fSlaves->GetEntries() > 0) ? kTRUE : kFALSE;
1342
1343 // A list of TSlave objects for workers that are being added
1344 TList *addedWorkers = new TList();
1345 if (!addedWorkers) {
1346 // This is needed to silence Coverity ...
1347 Error("AddWorkers", "cannot create new list for the workers to be added");
1348 return -2;
1349 }
1350 addedWorkers->SetOwner(kFALSE);
1351 TListIter next(workerList);
1352 TObject *to;
1353 TProofNodeInfo *worker;
1354 TSlaveInfo *dummysi = new TSlaveInfo();
1355 while ((to = next())) {
1356 // Get the next worker from the list
1357 worker = (TProofNodeInfo *)to;
1358
1359 // Read back worker node info
1360 const Char_t *image = worker->GetImage().Data();
1361 const Char_t *workdir = worker->GetWorkDir().Data();
1362 Int_t perfidx = worker->GetPerfIndex();
1363 Int_t sport = worker->GetPort();
1364 if (sport == -1)
1365 sport = fUrl.GetPort();
1366
1367 // Create worker server
1368 TString fullord;
1369 if (worker->GetOrdinal().Length() > 0) {
1370 fullord.Form("%s.%s", gProofServ->GetOrdinal(), worker->GetOrdinal().Data());
1371 } else {
1372 fullord.Form("%s.%d", gProofServ->GetOrdinal(), ord);
1373 }
1374
1375 // Remove worker from the list of workers terminated gracefully
1376 dummysi->SetOrdinal(fullord);
1378 SafeDelete(rmsi);
1379
1380 // Create worker server
1381 TString wn(worker->GetNodeName());
1382 if (wn == "localhost" || wn.BeginsWith("localhost.")) wn = gSystem->HostName();
1383 TUrl u(TString::Format("%s:%d", wn.Data(), sport));
1384 // Add group info in the password firdl, if any
1385 if (strlen(gProofServ->GetGroup()) > 0) {
1386 // Set also the user, otherwise the password is not exported
1387 if (strlen(u.GetUser()) <= 0)
1390 }
1391 TSlave *slave = 0;
1392 if (worker->IsWorker()) {
1393 slave = CreateSlave(u.GetUrl(), fullord, perfidx, image, workdir);
1394 } else {
1395 slave = CreateSubmaster(u.GetUrl(), fullord,
1396 image, worker->GetMsd(), worker->GetNWrks());
1397 }
1398
1399 // Add to global list (we will add to the monitor list after
1400 // finalizing the server startup)
1401 Bool_t slaveOk = kTRUE;
1402 fSlaves->Add(slave);
1403 if (slave->IsValid()) {
1404 addedWorkers->Add(slave);
1405 } else {
1406 slaveOk = kFALSE;
1407 fBadSlaves->Add(slave);
1408 Warning("AddWorkers", "worker '%s' is invalid", slave->GetOrdinal());
1409 }
1410
1411 PDB(kGlobal,3)
1412 Info("AddWorkers", "worker on host %s created"
1413 " and added to list (ord: %s)", worker->GetName(), slave->GetOrdinal());
1414
1415 // Notify opening of connection
1416 nSlavesDone++;
1418 m << TString("Opening connections to workers") << nSlaves
1419 << nSlavesDone << slaveOk;
1421
1422 ord++;
1423 } //end of the worker loop
1424 SafeDelete(dummysi);
1425
1426 // Cleanup
1427 SafeDelete(workerList);
1428
1429 nSlavesDone = 0;
1430
1431 // Here we finalize the server startup: in this way the bulk
1432 // of remote operations are almost parallelized
1433 TIter nxsl(addedWorkers);
1434 TSlave *sl = 0;
1435 while ((sl = (TSlave *) nxsl())) {
1436
1437 // Finalize setup of the server
1438 if (sl->IsValid())
1439 sl->SetupServ(TSlave::kSlave, 0);
1440
1441 // Monitor good slaves
1442 Bool_t slaveOk = kTRUE;
1443 if (sl->IsValid()) {
1444 fAllMonitor->Add(sl->GetSocket());
1445 PDB(kGlobal,3)
1446 Info("AddWorkers", "worker on host %s finalized"
1447 " and added to list", sl->GetOrdinal());
1448 } else {
1449 slaveOk = kFALSE;
1450 fBadSlaves->Add(sl);
1451 }
1452
1453 // Notify end of startup operations
1454 nSlavesDone++;
1456 m << TString("Setting up worker servers") << nSlaves
1457 << nSlavesDone << slaveOk;
1459 }
1460
1461 // Now set new state on the added workers (on all workers for simplicity)
1462 // use fEnabledPackages, fLoadedMacros,
1463 // gSystem->GetDynamicPath() and gSystem->GetIncludePath()
1464 // no need to load packages that are only loaded and not enabled (dyn mode)
1465 Int_t nwrk = GetRemoteProtocol() > 35 ? -1 : 9999;
1466 TNamed *n = 0;
1467 if (TProof::GetEnvVars() &&
1468 (n = (TNamed *) TProof::GetEnvVars()->FindObject("PROOF_NWORKERS"))) {
1469 TString s(n->GetTitle());
1470 if (s.IsDigit()) nwrk = s.Atoi();
1471 }
1472
1473 if (fDynamicStartup && goMoreParallel) {
1474
1475 PDB(kGlobal, 3)
1476 Info("AddWorkers", "will invoke GoMoreParallel()");
1477 Int_t nw = GoMoreParallel(nwrk);
1478 PDB(kGlobal, 3)
1479 Info("AddWorkers", "GoMoreParallel()=%d", nw);
1480
1481 }
1482 else {
1483 // Not in Dynamic Workers mode
1484 PDB(kGlobal, 3)
1485 Info("AddWorkers", "will invoke GoParallel()");
1486 GoParallel(nwrk, kFALSE, 0);
1487 }
1488
1489 // Set worker processing environment
1490 SetupWorkersEnv(addedWorkers, goMoreParallel);
1491
1492 // Update list of current workers
1493 PDB(kGlobal, 3)
1494 Info("AddWorkers", "will invoke SaveWorkerInfo()");
1496
1497 // Inform the client that the number of workers has changed
1498 if (fDynamicStartup && gProofServ) {
1499 PDB(kGlobal, 3)
1500 Info("AddWorkers", "will invoke SendParallel()");
1502
1503 if (goMoreParallel && fPlayer) {
1504 // In case we are adding workers dynamically to an existing process, we
1505 // should invoke a special player's Process() to set only added workers
1506 // to the proper state
1507 PDB(kGlobal, 3)
1508 Info("AddWorkers", "will send the PROCESS message to selected workers");
1509 fPlayer->JoinProcess(addedWorkers);
1510 // Update merger counters (new workers are not yet active)
1511 fMergePrg.SetNWrks(fActiveSlaves->GetSize() + addedWorkers->GetSize());
1512 }
1513 }
1514
1515 // Cleanup
1516 delete addedWorkers;
1517
1518 return 0;
1519}
1520
1521////////////////////////////////////////////////////////////////////////////////
1522/// Set up packages, loaded macros, include and lib paths ...
1523
1524void TProof::SetupWorkersEnv(TList *addedWorkers, Bool_t increasingWorkers)
1525{
1526 TList *server_packs = gProofServ ? gProofServ->GetEnabledPackages() : nullptr;
1527 // Packages
1528 TList *packs = server_packs ? server_packs : GetEnabledPackages();
1529 if (packs && packs->GetSize() > 0) {
1530 TIter nxp(packs);
1531 TPair *pck = 0;
1532 while ((pck = (TPair *) nxp())) {
1533 // Upload and Enable methods are intelligent and avoid
1534 // re-uploading or re-enabling of a package to a node that has it.
1535 if (fDynamicStartup && increasingWorkers) {
1536 // Upload only on added workers
1537 PDB(kGlobal, 3)
1538 Info("SetupWorkersEnv", "will invoke UploadPackage() and EnablePackage() on added workers");
1539 if (UploadPackage(pck->GetName(), kUntar, addedWorkers) >= 0)
1540 EnablePackage(pck->GetName(), (TList *) pck->Value(), kTRUE, addedWorkers);
1541 } else {
1542 PDB(kGlobal, 3)
1543 Info("SetupWorkersEnv", "will invoke UploadPackage() and EnablePackage() on all workers");
1544 if (UploadPackage(pck->GetName()) >= 0)
1545 EnablePackage(pck->GetName(), (TList *) pck->Value(), kTRUE);
1546 }
1547 }
1548 }
1549
1550 if (server_packs) {
1551 server_packs->Delete();
1552 delete server_packs;
1553 }
1554
1555 // Loaded macros
1556 if (fLoadedMacros) {
1557 TIter nxp(fLoadedMacros);
1558 TObjString *os = 0;
1559 while ((os = (TObjString *) nxp())) {
1560 PDB(kGlobal, 3) {
1561 Info("SetupWorkersEnv", "will invoke Load() on selected workers");
1562 Printf("Loading a macro : %s", os->GetName());
1563 }
1564 Load(os->GetName(), kTRUE, kTRUE, addedWorkers);
1565 }
1566 }
1567
1568 // Dynamic path
1570 dyn.ReplaceAll(":", " ");
1571 dyn.ReplaceAll("\"", " ");
1572 PDB(kGlobal, 3)
1573 Info("SetupWorkersEnv", "will invoke AddDynamicPath() on selected workers");
1574 AddDynamicPath(dyn, kFALSE, addedWorkers, kFALSE); // Do not Collect
1575
1576 // Include path
1578 inc.ReplaceAll("-I", " ");
1579 inc.ReplaceAll("\"", " ");
1580 PDB(kGlobal, 3)
1581 Info("SetupWorkersEnv", "will invoke AddIncludePath() on selected workers");
1582 AddIncludePath(inc, kFALSE, addedWorkers, kFALSE); // Do not Collect
1583
1584 // Done
1585 return;
1586}
1587
1588////////////////////////////////////////////////////////////////////////////////
1589/// Used for shuting down the workres after a query is finished.
1590/// Sends each of the workers from the workerList, a kPROOF_STOP message.
1591/// If the workerList == 0, shutdown all the workers.
1592
1594{
1595 if (!IsMaster()) {
1596 Error("RemoveWorkers", "RemoveWorkers can only be called on the master!");
1597 return -1;
1598 }
1599
1600 fFileMap.clear(); // This could be avoided if CopyFromCache was used in SendFile
1601
1602 if (!workerList) {
1603 // shutdown all the workers
1604 TIter nxsl(fSlaves);
1605 TSlave *sl = 0;
1606 while ((sl = (TSlave *) nxsl())) {
1607 // Shut down the worker assumig that it is not processing
1608 TerminateWorker(sl);
1609 }
1610
1611 } else {
1612 if (!(workerList->GetSize())) {
1613 Error("RemoveWorkers", "The list of workers should not be empty!");
1614 return -2;
1615 }
1616
1617 // Loop over all the workers and stop them
1618 TListIter next(workerList);
1619 TObject *to;
1620 TProofNodeInfo *worker;
1621 while ((to = next())) {
1622 TSlave *sl = 0;
1623 if (!strcmp(to->ClassName(), "TProofNodeInfo")) {
1624 // Get the next worker from the list
1625 worker = (TProofNodeInfo *)to;
1626 TIter nxsl(fSlaves);
1627 while ((sl = (TSlave *) nxsl())) {
1628 // Shut down the worker assumig that it is not processing
1629 if (sl->GetName() == worker->GetNodeName())
1630 break;
1631 }
1632 } else if (to->InheritsFrom(TSlave::Class())) {
1633 sl = (TSlave *) to;
1634 } else {
1635 Warning("RemoveWorkers","unknown object type: %s - it should be"
1636 " TProofNodeInfo or inheriting from TSlave", to->ClassName());
1637 }
1638 // Shut down the worker assumig that it is not processing
1639 if (sl) {
1640 if (gDebug > 0)
1641 Info("RemoveWorkers","terminating worker %s", sl->GetOrdinal());
1642 TerminateWorker(sl);
1643 }
1644 }
1645 }
1646
1647 // Update also the master counter
1648 if (gProofServ && fSlaves->GetSize() <= 0) gProofServ->ReleaseWorker("master");
1649
1650 return 0;
1651}
1652
1653////////////////////////////////////////////////////////////////////////////////
1654/// Start up PROOF slaves.
1655
1657{
1658 // If this is a master server, find the config file and start slave
1659 // servers as specified in the config file
1661
1662 Int_t pc = 0;
1663 TList *workerList = new TList;
1664 // Get list of workers
1665 if (gProofServ->GetWorkers(workerList, pc) == TProofServ::kQueryStop) {
1666 TString emsg("no resource currently available for this session: please retry later");
1667 if (gDebug > 0) Info("StartSlaves", "%s", emsg.Data());
1669 return kFALSE;
1670 }
1671 // Setup the workers
1672 if (AddWorkers(workerList) < 0)
1673 return kFALSE;
1674
1675 } else {
1676
1677 // create master server
1678 Printf("Starting master: opening connection ...");
1679 TSlave *slave = CreateSubmaster(fUrl.GetUrl(), "0", "master", 0);
1680
1681 if (slave->IsValid()) {
1682
1683 // Notify
1684 fprintf(stderr,"Starting master:"
1685 " connection open: setting up server ... \r");
1686 StartupMessage("Connection to master opened", kTRUE, 1, 1);
1687
1688 if (!attach) {
1689
1690 // Set worker interrupt handler
1691 slave->SetInterruptHandler(kTRUE);
1692
1693 // Finalize setup of the server
1695
1696 if (slave->IsValid()) {
1697
1698 // Notify
1699 Printf("Starting master: OK ");
1700 StartupMessage("Master started", kTRUE, 1, 1);
1701
1702 // check protocol compatibility
1703 // protocol 1 is not supported anymore
1704 if (fProtocol == 1) {
1705 Error("StartSlaves",
1706 "client and remote protocols not compatible (%d and %d)",
1708 slave->Close("S");
1709 delete slave;
1710 return kFALSE;
1711 }
1712
1713 fSlaves->Add(slave);
1714 fAllMonitor->Add(slave->GetSocket());
1715
1716 // Unset worker interrupt handler
1718
1719 // Set interrupt PROOF handler from now on
1721
1722 // Give-up after 5 minutes
1723 Int_t rc = Collect(slave, 300);
1724 Int_t slStatus = slave->GetStatus();
1725 if (slStatus == -99 || slStatus == -98 || rc == 0) {
1726 fSlaves->Remove(slave);
1727 fAllMonitor->Remove(slave->GetSocket());
1728 if (slStatus == -99)
1729 Error("StartSlaves", "no resources available or problems setting up workers (check logs)");
1730 else if (slStatus == -98)
1731 Error("StartSlaves", "could not setup output redirection on master");
1732 else
1733 Error("StartSlaves", "setting up master");
1734 slave->Close("S");
1735 delete slave;
1736 return 0;
1737 }
1738
1739 if (!slave->IsValid()) {
1740 fSlaves->Remove(slave);
1741 fAllMonitor->Remove(slave->GetSocket());
1742 slave->Close("S");
1743 delete slave;
1744 Error("StartSlaves",
1745 "failed to setup connection with PROOF master server");
1746 return kFALSE;
1747 }
1748
1749 if (!gROOT->IsBatch() && TestBit(kUseProgressDialog)) {
1750 if ((fProgressDialog =
1751 gROOT->GetPluginManager()->FindHandler("TProofProgressDialog")))
1752 if (fProgressDialog->LoadPlugin() == -1)
1753 fProgressDialog = 0;
1754 }
1755 } else {
1756 // Notify
1757 Printf("Starting master: failure");
1758 }
1759 } else {
1760
1761 // Notify
1762 Printf("Starting master: OK ");
1763 StartupMessage("Master attached", kTRUE, 1, 1);
1764
1765 if (!gROOT->IsBatch() && TestBit(kUseProgressDialog)) {
1766 if ((fProgressDialog =
1767 gROOT->GetPluginManager()->FindHandler("TProofProgressDialog")))
1768 if (fProgressDialog->LoadPlugin() == -1)
1769 fProgressDialog = 0;
1770 }
1771
1772 fSlaves->Add(slave);
1774 }
1775
1776 } else {
1777 delete slave;
1778 // Notify only if verbosity is on: most likely the failure has already been notified
1779 if (gDebug > 0)
1780 Error("StartSlaves", "failed to create (or connect to) the PROOF master server");
1781 return kFALSE;
1782 }
1783 }
1784
1785 return kTRUE;
1786}
1787
1788////////////////////////////////////////////////////////////////////////////////
1789/// Close all open slave servers.
1790/// Client can decide to shutdown the remote session by passing option is 'S'
1791/// or 's'. Default for clients is detach, if supported. Masters always
1792/// shutdown the remote counterpart.
1793
1795{
1796 { std::lock_guard<std::recursive_mutex> lock(fCloseMutex);
1797
1798 fValid = kFALSE;
1799 if (fSlaves) {
1800 if (fIntHandler)
1802
1803 TIter nxs(fSlaves);
1804 TSlave *sl = 0;
1805 while ((sl = (TSlave *)nxs()))
1806 sl->Close(opt);
1807
1808 fActiveSlaves->Clear("nodelete");
1809 fUniqueSlaves->Clear("nodelete");
1810 fAllUniqueSlaves->Clear("nodelete");
1811 fNonUniqueMasters->Clear("nodelete");
1812 fBadSlaves->Clear("nodelete");
1813 fInactiveSlaves->Clear("nodelete");
1814 fSlaves->Delete();
1815 }
1816 }
1817
1819 gROOT->GetListOfSockets()->Remove(this);
1820
1821 if (fChains) {
1822 while (TChain *chain = dynamic_cast<TChain*> (fChains->First()) ) {
1823 // remove "chain" from list
1824 chain->SetProof(0);
1825 RemoveChain(chain);
1826 }
1827 }
1828
1829 if (IsProofd()) {
1830
1831 gROOT->GetListOfProofs()->Remove(this);
1832 if (gProof && gProof == this) {
1833 // Set previous proofd-related as default
1834 TIter pvp(gROOT->GetListOfProofs(), kIterBackward);
1835 while ((gProof = (TProof *)pvp())) {
1836 if (gProof->IsProofd())
1837 break;
1838 }
1839 }
1840 }
1841 }
1842}
1843
1844////////////////////////////////////////////////////////////////////////////////
1845/// Create a new TSlave of type TSlave::kSlave.
1846/// Note: creation of TSlave is private with TProof as a friend.
1847/// Derived classes must use this function to create slaves.
1848
1849TSlave *TProof::CreateSlave(const char *url, const char *ord,
1850 Int_t perf, const char *image, const char *workdir)
1851{
1852 TSlave* sl = TSlave::Create(url, ord, perf, image,
1853 this, TSlave::kSlave, workdir, 0);
1854
1855 if (sl->IsValid()) {
1856 sl->SetInputHandler(new TProofInputHandler(this, sl->GetSocket()));
1857 // must set fParallel to 1 for slaves since they do not
1858 // report their fParallel with a LOG_DONE message
1859 sl->fParallel = 1;
1860 }
1861
1862 return sl;
1863}
1864
1865
1866////////////////////////////////////////////////////////////////////////////////
1867/// Create a new TSlave of type TSlave::kMaster.
1868/// Note: creation of TSlave is private with TProof as a friend.
1869/// Derived classes must use this function to create slaves.
1870
1871TSlave *TProof::CreateSubmaster(const char *url, const char *ord,
1872 const char *image, const char *msd, Int_t nwk)
1873{
1874 TSlave *sl = TSlave::Create(url, ord, 100, image, this,
1875 TSlave::kMaster, 0, msd, nwk);
1876
1877 if (sl->IsValid()) {
1878 sl->SetInputHandler(new TProofInputHandler(this, sl->GetSocket()));
1879 }
1880
1881 return sl;
1882}
1883
1884////////////////////////////////////////////////////////////////////////////////
1885/// Find slave that has TSocket s. Returns 0 in case slave is not found.
1886
1888{
1889 TSlave *sl;
1890 TIter next(fSlaves);
1891
1892 while ((sl = (TSlave *)next())) {
1893 if (sl->IsValid() && sl->GetSocket() == s)
1894 return sl;
1895 }
1896 return 0;
1897}
1898
1899////////////////////////////////////////////////////////////////////////////////
1900/// Add to the fUniqueSlave list the active slaves that have a unique
1901/// (user) file system image. This information is used to transfer files
1902/// only once to nodes that share a file system (an image). Submasters
1903/// which are not in fUniqueSlaves are put in the fNonUniqueMasters
1904/// list. That list is used to trigger the transferring of files to
1905/// the submaster's unique slaves without the need to transfer the file
1906/// to the submaster.
1907
1909{
1915
1916 TIter next(fActiveSlaves);
1917
1918 while (TSlave *sl = dynamic_cast<TSlave*>(next())) {
1919 if (fImage == sl->fImage) {
1920 if (sl->GetSlaveType() == TSlave::kMaster) {
1922 fAllUniqueSlaves->Add(sl);
1923 fAllUniqueMonitor->Add(sl->GetSocket());
1924 }
1925 continue;
1926 }
1927
1928 TIter next2(fUniqueSlaves);
1929 TSlave *replace_slave = 0;
1930 Bool_t add = kTRUE;
1931 while (TSlave *sl2 = dynamic_cast<TSlave*>(next2())) {
1932 if (sl->fImage == sl2->fImage) {
1933 add = kFALSE;
1934 if (sl->GetSlaveType() == TSlave::kMaster) {
1935 if (sl2->GetSlaveType() == TSlave::kSlave) {
1936 // give preference to master
1937 replace_slave = sl2;
1938 add = kTRUE;
1939 } else if (sl2->GetSlaveType() == TSlave::kMaster) {
1941 fAllUniqueSlaves->Add(sl);
1942 fAllUniqueMonitor->Add(sl->GetSocket());
1943 } else {
1944 Error("FindUniqueSlaves", "TSlave is neither Master nor Slave");
1945 R__ASSERT(0);
1946 }
1947 }
1948 break;
1949 }
1950 }
1951
1952 if (add) {
1953 fUniqueSlaves->Add(sl);
1954 fAllUniqueSlaves->Add(sl);
1955 fUniqueMonitor->Add(sl->GetSocket());
1956 fAllUniqueMonitor->Add(sl->GetSocket());
1957 if (replace_slave) {
1958 fUniqueSlaves->Remove(replace_slave);
1959 fAllUniqueSlaves->Remove(replace_slave);
1960 fUniqueMonitor->Remove(replace_slave->GetSocket());
1961 fAllUniqueMonitor->Remove(replace_slave->GetSocket());
1962 }
1963 }
1964 }
1965
1966 // will be actiavted in Collect()
1969}
1970
1971////////////////////////////////////////////////////////////////////////////////
1972/// Return number of slaves as described in the config file.
1973
1975{
1976 return fSlaves->GetSize();
1977}
1978
1979////////////////////////////////////////////////////////////////////////////////
1980/// Return number of active slaves, i.e. slaves that are valid and in
1981/// the current computing group.
1982
1984{
1985 return fActiveSlaves->GetSize();
1986}
1987
1988////////////////////////////////////////////////////////////////////////////////
1989/// Return number of inactive slaves, i.e. slaves that are valid but not in
1990/// the current computing group.
1991
1993{
1994 return fInactiveSlaves->GetSize();
1995}
1996
1997////////////////////////////////////////////////////////////////////////////////
1998/// Return number of unique slaves, i.e. active slaves that have each a
1999/// unique different user files system.
2000
2002{
2003 return fUniqueSlaves->GetSize();
2004}
2005
2006////////////////////////////////////////////////////////////////////////////////
2007/// Return number of bad slaves. This are slaves that we in the config
2008/// file, but refused to startup or that died during the PROOF session.
2009
2011{
2012 return fBadSlaves->GetSize();
2013}
2014
2015////////////////////////////////////////////////////////////////////////////////
2016/// Ask the for the statistics of the slaves.
2017
2019{
2020 if (!IsValid()) return;
2021
2024}
2025
2026////////////////////////////////////////////////////////////////////////////////
2027/// Get statistics about CPU time, real time and bytes read.
2028/// If verbose, print the resuls (always available via GetCpuTime(), GetRealTime()
2029/// and GetBytesRead()
2030
2032{
2033 if (fProtocol > 27) {
2034 // This returns the correct result
2035 AskStatistics();
2036 } else {
2037 // AskStatistics is buggy: parse the output of Print()
2040 Print();
2041 gSystem->RedirectOutput(0, 0, &rh);
2042 TMacro *mp = GetLastLog();
2043 if (mp) {
2044 // Look for global directories
2045 TIter nxl(mp->GetListOfLines());
2046 TObjString *os = 0;
2047 while ((os = (TObjString *) nxl())) {
2048 TString s(os->GetName());
2049 if (s.Contains("Total MB's processed:")) {
2050 s.ReplaceAll("Total MB's processed:", "");
2051 if (s.IsFloat()) fBytesRead = (Long64_t) s.Atof() * (1024*1024);
2052 } else if (s.Contains("Total real time used (s):")) {
2053 s.ReplaceAll("Total real time used (s):", "");
2054 if (s.IsFloat()) fRealTime = s.Atof();
2055 } else if (s.Contains("Total CPU time used (s):")) {
2056 s.ReplaceAll("Total CPU time used (s):", "");
2057 if (s.IsFloat()) fCpuTime = s.Atof();
2058 }
2059 }
2060 delete mp;
2061 }
2062 }
2063
2064 if (verbose) {
2065 Printf(" Real/CPU time (s): %.3f / %.3f; workers: %d; processed: %.2f MBs",
2066 GetRealTime(), GetCpuTime(), GetParallel(), float(GetBytesRead())/(1024*1024));
2067 }
2068}
2069
2070////////////////////////////////////////////////////////////////////////////////
2071/// Ask the for the number of parallel slaves.
2072
2074{
2075 if (!IsValid()) return;
2076
2079}
2080
2081////////////////////////////////////////////////////////////////////////////////
2082/// Ask the master for the list of queries.
2083
2085{
2086 if (!IsValid() || TestBit(TProof::kIsMaster)) return (TList *)0;
2087
2088 Bool_t all = ((strchr(opt,'A') || strchr(opt,'a'))) ? kTRUE : kFALSE;
2090 m << all;
2093
2094 // This should have been filled by now
2095 return fQueries;
2096}
2097
2098////////////////////////////////////////////////////////////////////////////////
2099/// Number of queries processed by this session
2100
2102{
2103 if (fQueries)
2104 return fQueries->GetSize() - fOtherQueries;
2105 return 0;
2106}
2107
2108////////////////////////////////////////////////////////////////////////////////
2109/// Set max number of draw queries whose results are saved
2110
2112{
2113 if (max > 0) {
2114 if (fPlayer)
2116 fMaxDrawQueries = max;
2117 }
2118}
2119
2120////////////////////////////////////////////////////////////////////////////////
2121/// Get max number of queries whose full results are kept in the
2122/// remote sandbox
2123
2125{
2127 m << kFALSE;
2130}
2131
2132////////////////////////////////////////////////////////////////////////////////
2133/// Return pointer to the list of query results in the player
2134
2136{
2137 return (fPlayer ? fPlayer->GetListOfResults() : (TList *)0);
2138}
2139
2140////////////////////////////////////////////////////////////////////////////////
2141/// Return pointer to the full TQueryResult instance owned by the player
2142/// and referenced by 'ref'. If ref = 0 or "", return the last query result.
2143
2145{
2146 return (fPlayer ? fPlayer->GetQueryResult(ref) : (TQueryResult *)0);
2147}
2148
2149////////////////////////////////////////////////////////////////////////////////
2150/// Ask the master for the list of queries.
2151/// Options:
2152/// "A" show information about all the queries known to the
2153/// server, i.e. even those processed by other sessions
2154/// "L" show only information about queries locally available
2155/// i.e. already retrieved. If "L" is specified, "A" is
2156/// ignored.
2157/// "F" show all details available about queries
2158/// "H" print help menu
2159/// Default ""
2160
2162{
2163 Bool_t help = ((strchr(opt,'H') || strchr(opt,'h'))) ? kTRUE : kFALSE;
2164 if (help) {
2165
2166 // Help
2167
2168 Printf("+++");
2169 Printf("+++ Options: \"A\" show all queries known to server");
2170 Printf("+++ \"L\" show retrieved queries");
2171 Printf("+++ \"F\" full listing of query info");
2172 Printf("+++ \"H\" print this menu");
2173 Printf("+++");
2174 Printf("+++ (case insensitive)");
2175 Printf("+++");
2176 Printf("+++ Use Retrieve(<#>) to retrieve the full"
2177 " query results from the master");
2178 Printf("+++ e.g. Retrieve(8)");
2179
2180 Printf("+++");
2181
2182 return;
2183 }
2184
2185 if (!IsValid()) return;
2186
2187 Bool_t local = ((strchr(opt,'L') || strchr(opt,'l'))) ? kTRUE : kFALSE;
2188
2189 TObject *pq = 0;
2190 if (!local) {
2191 GetListOfQueries(opt);
2192
2193 if (!fQueries) return;
2194
2195 TIter nxq(fQueries);
2196
2197 // Queries processed by other sessions
2198 if (fOtherQueries > 0) {
2199 Printf("+++");
2200 Printf("+++ Queries processed during other sessions: %d", fOtherQueries);
2201 Int_t nq = 0;
2202 while (nq++ < fOtherQueries && (pq = nxq()))
2203 pq->Print(opt);
2204 }
2205
2206 // Queries processed by this session
2207 Printf("+++");
2208 Printf("+++ Queries processed during this session: selector: %d, draw: %d",
2210 while ((pq = nxq()))
2211 pq->Print(opt);
2212
2213 } else {
2214
2215 // Queries processed by this session
2216 Printf("+++");
2217 Printf("+++ Queries processed during this session: selector: %d, draw: %d",
2219
2220 // Queries available locally
2221 TList *listlocal = fPlayer ? fPlayer->GetListOfResults() : (TList *)0;
2222 if (listlocal) {
2223 Printf("+++");
2224 Printf("+++ Queries available locally: %d", listlocal->GetSize());
2225 TIter nxlq(listlocal);
2226 while ((pq = nxlq()))
2227 pq->Print(opt);
2228 }
2229 }
2230 Printf("+++");
2231}
2232
2233////////////////////////////////////////////////////////////////////////////////
2234/// See if the data is ready to be analyzed.
2235
2237{
2238 if (!IsValid()) return kFALSE;
2239
2240 TList submasters;
2241 TIter nextSlave(GetListOfActiveSlaves());
2242 while (TSlave *sl = dynamic_cast<TSlave*>(nextSlave())) {
2243 if (sl->GetSlaveType() == TSlave::kMaster) {
2244 submasters.Add(sl);
2245 }
2246 }
2247
2248 fDataReady = kTRUE; //see if any submasters set it to false
2249 fBytesReady = 0;
2250 fTotalBytes = 0;
2251 //loop over submasters and see if data is ready
2252 if (submasters.GetSize() > 0) {
2253 Broadcast(kPROOF_DATA_READY, &submasters);
2254 Collect(&submasters);
2255 }
2256
2257 bytesready = fBytesReady;
2258 totalbytes = fTotalBytes;
2259
2260 EmitVA("IsDataReady(Long64_t,Long64_t)", 2, totalbytes, bytesready);
2261
2262 PDB(kGlobal,2)
2263 Info("IsDataReady", "%lld / %lld (%s)",
2264 bytesready, totalbytes, fDataReady?"READY":"NOT READY");
2265
2266 return fDataReady;
2267}
2268
2269////////////////////////////////////////////////////////////////////////////////
2270/// Send interrupt to master or slave servers.
2271
2273{
2274 if (!IsValid()) return;
2275
2276 TList *slaves = 0;
2277 if (list == kAll) slaves = fSlaves;
2278 if (list == kActive) slaves = fActiveSlaves;
2279 if (list == kUnique) slaves = fUniqueSlaves;
2280 if (list == kAllUnique) slaves = fAllUniqueSlaves;
2281
2282 if (slaves->GetSize() == 0) return;
2283
2284 TSlave *sl;
2285 TIter next(slaves);
2286
2287 while ((sl = (TSlave *)next())) {
2288 if (sl->IsValid()) {
2289
2290 // Ask slave to progate the interrupt request
2291 sl->Interrupt((Int_t)type);
2292 }
2293 }
2294}
2295
2296////////////////////////////////////////////////////////////////////////////////
2297/// Returns number of slaves active in parallel mode. Returns 0 in case
2298/// there are no active slaves. Returns -1 in case of error.
2299
2301{
2302 if (!IsValid()) return -1;
2303
2304 // iterate over active slaves and return total number of slaves
2305 TIter nextSlave(GetListOfActiveSlaves());
2306 Int_t nparallel = 0;
2307 while (TSlave* sl = dynamic_cast<TSlave*>(nextSlave()))
2308 if (sl->GetParallel() >= 0)
2309 nparallel += sl->GetParallel();
2310
2311 return nparallel;
2312}
2313
2314////////////////////////////////////////////////////////////////////////////////
2315/// Returns list of TSlaveInfo's. In case of error return 0.
2316
2318{
2319 if (!IsValid()) return 0;
2320
2321 if (fSlaveInfo == 0) {
2324 } else {
2325 fSlaveInfo->Delete();
2326 }
2327
2328 TList masters;
2329 TIter next(GetListOfSlaves());
2330 TSlave *slave;
2331
2332 while ((slave = (TSlave *) next()) != 0) {
2333 if (slave->GetSlaveType() == TSlave::kSlave) {
2334 const char *name = IsLite() ? gSystem->HostName() : slave->GetName();
2335 TSlaveInfo *slaveinfo = new TSlaveInfo(slave->GetOrdinal(),
2336 name,
2337 slave->GetPerfIdx());
2338 fSlaveInfo->Add(slaveinfo);
2339
2340 TIter nextactive(GetListOfActiveSlaves());
2341 TSlave *activeslave;
2342 while ((activeslave = (TSlave *) nextactive())) {
2343 if (TString(slaveinfo->GetOrdinal()) == activeslave->GetOrdinal()) {
2344 slaveinfo->SetStatus(TSlaveInfo::kActive);
2345 break;
2346 }
2347 }
2348
2349 TIter nextbad(GetListOfBadSlaves());
2350 TSlave *badslave;
2351 while ((badslave = (TSlave *) nextbad())) {
2352 if (TString(slaveinfo->GetOrdinal()) == badslave->GetOrdinal()) {
2353 slaveinfo->SetStatus(TSlaveInfo::kBad);
2354 break;
2355 }
2356 }
2357 // Get system info if supported
2358 if (slave->IsValid()) {
2359 if (slave->GetSocket()->Send(kPROOF_GETSLAVEINFO) == -1)
2360 MarkBad(slave, "could not send kPROOF_GETSLAVEINFO message");
2361 else
2362 masters.Add(slave);
2363 }
2364
2365 } else if (slave->GetSlaveType() == TSlave::kMaster) {
2366 if (slave->IsValid()) {
2367 if (slave->GetSocket()->Send(kPROOF_GETSLAVEINFO) == -1)
2368 MarkBad(slave, "could not send kPROOF_GETSLAVEINFO message");
2369 else
2370 masters.Add(slave);
2371 }
2372 } else {
2373 Error("GetSlaveInfo", "TSlave is neither Master nor Slave");
2374 R__ASSERT(0);
2375 }
2376 }
2377 if (masters.GetSize() > 0) Collect(&masters);
2378
2379 return fSlaveInfo;
2380}
2381
2382////////////////////////////////////////////////////////////////////////////////
2383/// Activate slave server list.
2384
2386{
2387 TMonitor *mon = fAllMonitor;
2388 mon->DeActivateAll();
2389
2390 slaves = !slaves ? fActiveSlaves : slaves;
2391
2392 TIter next(slaves);
2393 TSlave *sl;
2394 while ((sl = (TSlave*) next())) {
2395 if (sl->IsValid())
2396 mon->Activate(sl->GetSocket());
2397 }
2398}
2399
2400////////////////////////////////////////////////////////////////////////////////
2401/// Activate (on == TRUE) or deactivate (on == FALSE) all sockets
2402/// monitored by 'mon'.
2403
2405{
2406 TMonitor *m = (mon) ? mon : fCurrentMonitor;
2407 if (m) {
2408 if (on)
2409 m->ActivateAll();
2410 else
2411 m->DeActivateAll();
2412 }
2413}
2414
2415////////////////////////////////////////////////////////////////////////////////
2416/// Broadcast the group priority to all workers in the specified list. Returns
2417/// the number of workers the message was successfully sent to.
2418/// Returns -1 in case of error.
2419
2420Int_t TProof::BroadcastGroupPriority(const char *grp, Int_t priority, TList *workers)
2421{
2422 if (!IsValid()) return -1;
2423
2424 if (workers->GetSize() == 0) return 0;
2425
2426 int nsent = 0;
2427 TIter next(workers);
2428
2429 TSlave *wrk;
2430 while ((wrk = (TSlave *)next())) {
2431 if (wrk->IsValid()) {
2432 if (wrk->SendGroupPriority(grp, priority) == -1)
2433 MarkBad(wrk, "could not send group priority");
2434 else
2435 nsent++;
2436 }
2437 }
2438
2439 return nsent;
2440}
2441
2442////////////////////////////////////////////////////////////////////////////////
2443/// Broadcast the group priority to all workers in the specified list. Returns
2444/// the number of workers the message was successfully sent to.
2445/// Returns -1 in case of error.
2446
2447Int_t TProof::BroadcastGroupPriority(const char *grp, Int_t priority, ESlaves list)
2448{
2449 TList *workers = 0;
2450 if (list == kAll) workers = fSlaves;
2451 if (list == kActive) workers = fActiveSlaves;
2452 if (list == kUnique) workers = fUniqueSlaves;
2453 if (list == kAllUnique) workers = fAllUniqueSlaves;
2454
2455 return BroadcastGroupPriority(grp, priority, workers);
2456}
2457
2458////////////////////////////////////////////////////////////////////////////////
2459/// Reset the merge progress notificator
2460
2462{
2464}
2465
2466////////////////////////////////////////////////////////////////////////////////
2467/// Broadcast a message to all slaves in the specified list. Returns
2468/// the number of slaves the message was successfully sent to.
2469/// Returns -1 in case of error.
2470
2472{
2473 if (!IsValid()) return -1;
2474
2475 if (!slaves || slaves->GetSize() == 0) return 0;
2476
2477 int nsent = 0;
2478 TIter next(slaves);
2479
2480 TSlave *sl;
2481 while ((sl = (TSlave *)next())) {
2482 if (sl->IsValid()) {
2483 if (sl->GetSocket()->Send(mess) == -1)
2484 MarkBad(sl, "could not broadcast request");
2485 else
2486 nsent++;
2487 }
2488 }
2489
2490 return nsent;
2491}
2492
2493////////////////////////////////////////////////////////////////////////////////
2494/// Broadcast a message to all slaves in the specified list (either
2495/// all slaves or only the active slaves). Returns the number of slaves
2496/// the message was successfully sent to. Returns -1 in case of error.
2497
2499{
2500 TList *slaves = 0;
2501 if (list == kAll) slaves = fSlaves;
2502 if (list == kActive) slaves = fActiveSlaves;
2503 if (list == kUnique) slaves = fUniqueSlaves;
2504 if (list == kAllUnique) slaves = fAllUniqueSlaves;
2505
2506 return Broadcast(mess, slaves);
2507}
2508
2509////////////////////////////////////////////////////////////////////////////////
2510/// Broadcast a character string buffer to all slaves in the specified
2511/// list. Use kind to set the TMessage what field. Returns the number of
2512/// slaves the message was sent to. Returns -1 in case of error.
2513
2514Int_t TProof::Broadcast(const char *str, Int_t kind, TList *slaves)
2515{
2516 TMessage mess(kind);
2517 if (str) mess.WriteString(str);
2518 return Broadcast(mess, slaves);
2519}
2520
2521////////////////////////////////////////////////////////////////////////////////
2522/// Broadcast a character string buffer to all slaves in the specified
2523/// list (either all slaves or only the active slaves). Use kind to
2524/// set the TMessage what field. Returns the number of slaves the message
2525/// was sent to. Returns -1 in case of error.
2526
2527Int_t TProof::Broadcast(const char *str, Int_t kind, ESlaves list)
2528{
2529 TMessage mess(kind);
2530 if (str) mess.WriteString(str);
2531 return Broadcast(mess, list);
2532}
2533
2534////////////////////////////////////////////////////////////////////////////////
2535/// Broadcast an object to all slaves in the specified list. Use kind to
2536/// set the TMEssage what field. Returns the number of slaves the message
2537/// was sent to. Returns -1 in case of error.
2538
2540{
2541 TMessage mess(kind);
2542 mess.WriteObject(obj);
2543 return Broadcast(mess, slaves);
2544}
2545
2546////////////////////////////////////////////////////////////////////////////////
2547/// Broadcast an object to all slaves in the specified list. Use kind to
2548/// set the TMEssage what field. Returns the number of slaves the message
2549/// was sent to. Returns -1 in case of error.
2550
2552{
2553 TMessage mess(kind);
2554 mess.WriteObject(obj);
2555 return Broadcast(mess, list);
2556}
2557
2558////////////////////////////////////////////////////////////////////////////////
2559/// Broadcast a raw buffer of specified length to all slaves in the
2560/// specified list. Returns the number of slaves the buffer was sent to.
2561/// Returns -1 in case of error.
2562
2563Int_t TProof::BroadcastRaw(const void *buffer, Int_t length, TList *slaves)
2564{
2565 if (!IsValid()) return -1;
2566
2567 if (slaves->GetSize() == 0) return 0;
2568
2569 int nsent = 0;
2570 TIter next(slaves);
2571
2572 TSlave *sl;
2573 while ((sl = (TSlave *)next())) {
2574 if (sl->IsValid()) {
2575 if (sl->GetSocket()->SendRaw(buffer, length) == -1)
2576 MarkBad(sl, "could not send broadcast-raw request");
2577 else
2578 nsent++;
2579 }
2580 }
2581
2582 return nsent;
2583}
2584
2585////////////////////////////////////////////////////////////////////////////////
2586/// Broadcast a raw buffer of specified length to all slaves in the
2587/// specified list. Returns the number of slaves the buffer was sent to.
2588/// Returns -1 in case of error.
2589
2590Int_t TProof::BroadcastRaw(const void *buffer, Int_t length, ESlaves list)
2591{
2592 TList *slaves = 0;
2593 if (list == kAll) slaves = fSlaves;
2594 if (list == kActive) slaves = fActiveSlaves;
2595 if (list == kUnique) slaves = fUniqueSlaves;
2596 if (list == kAllUnique) slaves = fAllUniqueSlaves;
2597
2598 return BroadcastRaw(buffer, length, slaves);
2599}
2600
2601////////////////////////////////////////////////////////////////////////////////
2602/// Broadcast file to all workers in the specified list. Returns the number of workers
2603/// the buffer was sent to.
2604/// Returns -1 in case of error.
2605
2606Int_t TProof::BroadcastFile(const char *file, Int_t opt, const char *rfile, TList *wrks)
2607{
2608 if (!IsValid()) return -1;
2609
2610 if (wrks->GetSize() == 0) return 0;
2611
2612 int nsent = 0;
2613 TIter next(wrks);
2614
2615 TSlave *wrk;
2616 while ((wrk = (TSlave *)next())) {
2617 if (wrk->IsValid()) {
2618 if (SendFile(file, opt, rfile, wrk) < 0)
2619 Error("BroadcastFile",
2620 "problems sending file to worker %s (%s)",
2621 wrk->GetOrdinal(), wrk->GetName());
2622 else
2623 nsent++;
2624 }
2625 }
2626
2627 return nsent;
2628}
2629
2630////////////////////////////////////////////////////////////////////////////////
2631/// Broadcast file to all workers in the specified list. Returns the number of workers
2632/// the buffer was sent to.
2633/// Returns -1 in case of error.
2634
2635Int_t TProof::BroadcastFile(const char *file, Int_t opt, const char *rfile, ESlaves list)
2636{
2637 TList *wrks = 0;
2638 if (list == kAll) wrks = fSlaves;
2639 if (list == kActive) wrks = fActiveSlaves;
2640 if (list == kUnique) wrks = fUniqueSlaves;
2641 if (list == kAllUnique) wrks = fAllUniqueSlaves;
2642
2643 return BroadcastFile(file, opt, rfile, wrks);
2644}
2645
2646////////////////////////////////////////////////////////////////////////////////
2647/// Release the used monitor to be used, making sure to delete newly created
2648/// monitors.
2649
2651{
2652 if (mon && (mon != fAllMonitor) && (mon != fActiveMonitor)
2653 && (mon != fUniqueMonitor) && (mon != fAllUniqueMonitor)) {
2654 delete mon;
2655 }
2656}
2657
2658////////////////////////////////////////////////////////////////////////////////
2659/// Collect responses from slave sl. Returns the number of slaves that
2660/// responded (=1).
2661/// If timeout >= 0, wait at most timeout seconds (timeout = -1 by default,
2662/// which means wait forever).
2663/// If defined (>= 0) endtype is the message that stops this collection.
2664
2665Int_t TProof::Collect(const TSlave *sl, Long_t timeout, Int_t endtype, Bool_t deactonfail)
2666{
2667 Int_t rc = 0;
2668
2669 TMonitor *mon = 0;
2670 if (!sl->IsValid()) return 0;
2671
2673 mon = new TMonitor;
2674 } else {
2675 mon = fAllMonitor;
2676 mon->DeActivateAll();
2677 }
2678 mon->Activate(sl->GetSocket());
2679
2680 rc = Collect(mon, timeout, endtype, deactonfail);
2681 ReleaseMonitor(mon);
2682 return rc;
2683}
2684
2685////////////////////////////////////////////////////////////////////////////////
2686/// Collect responses from the slave servers. Returns the number of slaves
2687/// that responded.
2688/// If timeout >= 0, wait at most timeout seconds (timeout = -1 by default,
2689/// which means wait forever).
2690/// If defined (>= 0) endtype is the message that stops this collection.
2691
2692Int_t TProof::Collect(TList *slaves, Long_t timeout, Int_t endtype, Bool_t deactonfail)
2693{
2694 Int_t rc = 0;
2695
2696 TMonitor *mon = 0;
2697
2699 mon = new TMonitor;
2700 } else {
2701 mon = fAllMonitor;
2702 mon->DeActivateAll();
2703 }
2704 TIter next(slaves);
2705 TSlave *sl;
2706 while ((sl = (TSlave*) next())) {
2707 if (sl->IsValid())
2708 mon->Activate(sl->GetSocket());
2709 }
2710
2711 rc = Collect(mon, timeout, endtype, deactonfail);
2712 ReleaseMonitor(mon);
2713 return rc;
2714}
2715
2716////////////////////////////////////////////////////////////////////////////////
2717/// Collect responses from the slave servers. Returns the number of slaves
2718/// that responded.
2719/// If timeout >= 0, wait at most timeout seconds (timeout = -1 by default,
2720/// which means wait forever).
2721/// If defined (>= 0) endtype is the message that stops this collection.
2722
2723Int_t TProof::Collect(ESlaves list, Long_t timeout, Int_t endtype, Bool_t deactonfail)
2724{
2725 Int_t rc = 0;
2726 TMonitor *mon = 0;
2727
2728 if (list == kAll) mon = fAllMonitor;
2729 if (list == kActive) mon = fActiveMonitor;
2730 if (list == kUnique) mon = fUniqueMonitor;
2731 if (list == kAllUnique) mon = fAllUniqueMonitor;
2732 if (fCurrentMonitor == mon) {
2733 // Get a copy
2734 mon = new TMonitor(*mon);
2735 }
2736 mon->ActivateAll();
2737
2738 rc = Collect(mon, timeout, endtype, deactonfail);
2739 ReleaseMonitor(mon);
2740 return rc;
2741}
2742
2743////////////////////////////////////////////////////////////////////////////////
2744/// Collect responses from the slave servers. Returns the number of messages
2745/// received. Can be 0 if there are no active slaves.
2746/// If timeout >= 0, wait at most timeout seconds (timeout = -1 by default,
2747/// which means wait forever).
2748/// If defined (>= 0) endtype is the message that stops this collection.
2749/// Collect also stops its execution from time to time to check for new
2750/// workers in Dynamic Startup mode.
2751
2752Int_t TProof::Collect(TMonitor *mon, Long_t timeout, Int_t endtype, Bool_t deactonfail)
2753{
2754 Int_t collectId = gRandom->Integer(9999);
2755
2756 PDB(kCollect, 3)
2757 Info("Collect", ">>>>>> Entering collect responses #%04d", collectId);
2758
2759 // Reset the status flag and clear the messages in the list, if any
2760 fStatus = 0;
2762
2763 Long_t actto = (Long_t)(gEnv->GetValue("Proof.SocketActivityTimeout", -1) * 1000);
2764
2765 if (!mon->GetActive(actto)) return 0;
2766
2768
2769 // Used by external code to know what we are monitoring
2770 TMonitor *savedMonitor = 0;
2771 if (fCurrentMonitor) {
2772 savedMonitor = fCurrentMonitor;
2773 fCurrentMonitor = mon;
2774 } else {
2775 fCurrentMonitor = mon;
2776 fBytesRead = 0;
2777 fRealTime = 0.0;
2778 fCpuTime = 0.0;
2779 }
2780
2781 // We want messages on the main window during synchronous collection,
2782 // but we save the present status to restore it at the end
2783 Bool_t saveRedirLog = fRedirLog;
2784 if (!IsIdle() && !IsSync())
2785 fRedirLog = kFALSE;
2786
2787 int cnt = 0, rc = 0;
2788
2789 // Timeout counter
2790 Long_t nto = timeout;
2791 PDB(kCollect, 2)
2792 Info("Collect","#%04d: active: %d", collectId, mon->GetActive());
2793
2794 // On clients, handle Ctrl-C during collection
2795 if (fIntHandler)
2796 fIntHandler->Add();
2797
2798 // Sockets w/o activity during the last 'sto' millisecs are deactivated
2799 Int_t nact = 0;
2800 Long_t sto = -1;
2801 Int_t nsto = 60;
2802 Int_t pollint = gEnv->GetValue("Proof.DynamicStartupPollInt", (Int_t) kPROOF_DynWrkPollInt_s);
2803 mon->ResetInterrupt();
2804 while ((nact = mon->GetActive(sto)) && (nto < 0 || nto > 0)) {
2805
2806 // Dump last waiting sockets, if in debug mode
2807 PDB(kCollect, 2) {
2808 if (nact < 4) {
2809 TList *al = mon->GetListOfActives();
2810 if (al && al->GetSize() > 0) {
2811 Info("Collect"," %d node(s) still active:", al->GetSize());
2812 TIter nxs(al);
2813 TSocket *xs = 0;
2814 while ((xs = (TSocket *)nxs())) {
2815 TSlave *wrk = FindSlave(xs);
2816 if (wrk)
2817 Info("Collect"," %s (%s)", wrk->GetName(), wrk->GetOrdinal());
2818 else
2819 Info("Collect"," %p: %s:%d", xs, xs->GetInetAddress().GetHostName(),
2820 xs->GetInetAddress().GetPort());
2821 }
2822 }
2823 delete al;
2824 }
2825 }
2826
2827 // Preemptive poll for new workers on the master only in Dynamic Mode and only
2828 // during processing (TODO: should work on Top Master only)
2830 ((fLastPollWorkers_s == -1) || (time(0)-fLastPollWorkers_s >= pollint))) {
2833 fLastPollWorkers_s = time(0);
2835 PDB(kCollect, 1)
2836 Info("Collect","#%04d: now active: %d", collectId, mon->GetActive());
2837 }
2838
2839 // Wait for a ready socket
2840 PDB(kCollect, 3)
2841 Info("Collect", "Will invoke Select() #%04d", collectId);
2842 TSocket *s = mon->Select(1000);
2843
2844 if (s && s != (TSocket *)(-1)) {
2845 // Get and analyse the info it did receive
2846 rc = CollectInputFrom(s, endtype, deactonfail);
2847 if (rc == 1 || (rc == 2 && !savedMonitor)) {
2848 // Deactivate it if we are done with it
2849 mon->DeActivate(s);
2850 TList *al = mon->GetListOfActives();
2851 PDB(kCollect, 2)
2852 Info("Collect","#%04d: deactivating %p (active: %d, %p)", collectId,
2853 s, mon->GetActive(),
2854 al->First());
2855 delete al;
2856 } else if (rc == 2) {
2857 // This end message was for the saved monitor
2858 // Deactivate it if we are done with it
2859 if (savedMonitor) {
2860 savedMonitor->DeActivate(s);
2861 TList *al = mon->GetListOfActives();
2862 PDB(kCollect, 2)
2863 Info("Collect","save monitor: deactivating %p (active: %d, %p)",
2864 s, savedMonitor->GetActive(),
2865 al->First());
2866 delete al;
2867 }
2868 }
2869
2870 // Update counter (if no error occured)
2871 if (rc >= 0)
2872 cnt++;
2873 } else {
2874 // If not timed-out, exit if not stopped or not aborted
2875 // (player exits status is finished in such a case); otherwise,
2876 // we still need to collect the partial output info
2877 if (!s)
2879 mon->DeActivateAll();
2880 // Decrease the timeout counter if requested
2881 if (s == (TSocket *)(-1) && nto > 0)
2882 nto--;
2883 }
2884
2885 // Check if there are workers with ready output to be sent and ask the first to send it
2887 // Maximum number of concurrent sendings
2888 Int_t mxws = gEnv->GetValue("Proof.ControlSendOutput", 1);
2889 if (TProof::GetParameter(fPlayer->GetInputList(), "PROOF_ControlSendOutput", mxws) != 0)
2890 mxws = gEnv->GetValue("Proof.ControlSendOutput", 1);
2891 TIter nxwr(fWrksOutputReady);
2892 TSlave *wrk = 0;
2893 while (mxws && (wrk = (TSlave *) nxwr())) {
2894 if (!wrk->TestBit(TSlave::kOutputRequested)) {
2895 // Ask worker for output
2896 TMessage sendoutput(kPROOF_SENDOUTPUT);
2897 PDB(kCollect, 2)
2898 Info("Collect", "worker %s was asked to send its output to master",
2899 wrk->GetOrdinal());
2900 if (wrk->GetSocket()->Send(sendoutput) != 1) {
2902 mxws--;
2903 }
2904 } else {
2905 // Count
2906 mxws--;
2907 }
2908 }
2909 }
2910
2911 // Check if we need to check the socket activity (we do it every 10 cycles ~ 10 sec)
2912 sto = -1;
2913 if (--nsto <= 0) {
2914 sto = (Long_t) actto;
2915 nsto = 60;
2916 }
2917
2918 } // end loop over active monitors
2919
2920 // If timed-out, deactivate the remaining sockets
2921 if (nto == 0) {
2922 TList *al = mon->GetListOfActives();
2923 if (al && al->GetSize() > 0) {
2924 // Notify the name of those which did timeout
2925 Info("Collect"," %d node(s) went in timeout:", al->GetSize());
2926 TIter nxs(al);
2927 TSocket *xs = 0;
2928 while ((xs = (TSocket *)nxs())) {
2929 TSlave *wrk = FindSlave(xs);
2930 if (wrk)
2931 Info("Collect"," %s", wrk->GetName());
2932 else
2933 Info("Collect"," %p: %s:%d", xs, xs->GetInetAddress().GetHostName(),
2934 xs->GetInetAddress().GetPort());
2935 }
2936 }
2937 delete al;
2938 mon->DeActivateAll();
2939 }
2940
2941 // Deactivate Ctrl-C special handler
2942 if (fIntHandler)
2944
2945 // make sure group view is up to date
2946 SendGroupView();
2947
2948 // Restore redirection setting
2949 fRedirLog = saveRedirLog;
2950
2951 // Restore the monitor
2952 fCurrentMonitor = savedMonitor;
2953
2955
2956 PDB(kCollect, 3)
2957 Info("Collect", "<<<<<< Exiting collect responses #%04d", collectId);
2958
2959 return cnt;
2960}
2961
2962////////////////////////////////////////////////////////////////////////////////
2963/// Asks the PROOF Serv for new workers in Dynamic Startup mode and activates
2964/// them. Returns the number of new workers found, or <0 on errors.
2965
2967{
2968 // Requests for worker updates
2969 Int_t dummy = 0;
2970 TList *reqWorkers = new TList();
2971 reqWorkers->SetOwner(kFALSE);
2972
2973 if (!TestBit(TProof::kIsMaster)) {
2974 Error("PollForNewWorkers", "Can't invoke: not on a master -- should not happen!");
2975 return -1;
2976 }
2977 if (!gProofServ) {
2978 Error("PollForNewWorkers", "No ProofServ available -- should not happen!");
2979 return -1;
2980 }
2981
2982 gProofServ->GetWorkers(reqWorkers, dummy, kTRUE); // last 2 are dummy
2983
2984 // List of new workers only (TProofNodeInfo)
2985 TList *newWorkers = new TList();
2986 newWorkers->SetOwner(kTRUE);
2987
2988 TIter next(reqWorkers);
2989 TProofNodeInfo *ni;
2990 TString fullOrd;
2991 while (( ni = dynamic_cast<TProofNodeInfo *>(next()) )) {
2992
2993 // Form the full ordinal
2994 fullOrd.Form("%s.%s", gProofServ->GetOrdinal(), ni->GetOrdinal().Data());
2995
2996 TIter nextInner(fSlaves);
2997 TSlave *sl;
2998 Bool_t found = kFALSE;
2999 while (( sl = dynamic_cast<TSlave *>(nextInner()) )) {
3000 if ( strcmp(sl->GetOrdinal(), fullOrd.Data()) == 0 ) {
3001 found = kTRUE;
3002 break;
3003 }
3004 }
3005
3006 if (found) delete ni;
3007 else {
3008 newWorkers->Add(ni);
3009 PDB(kGlobal, 1)
3010 Info("PollForNewWorkers", "New worker found: %s:%s",
3011 ni->GetNodeName().Data(), fullOrd.Data());
3012 }
3013 }
3014
3015 delete reqWorkers; // not owner
3016
3017 Int_t nNewWorkers = newWorkers->GetEntries();
3018
3019 // Add the new workers
3020 if (nNewWorkers > 0) {
3021 PDB(kGlobal, 1)
3022 Info("PollForNewWorkers", "Requesting to add %d new worker(s)", newWorkers->GetEntries());
3023 Int_t rv = AddWorkers(newWorkers);
3024 if (rv < 0) {
3025 Error("PollForNewWorkers", "Call to AddWorkers() failed (got %d < 0)", rv);
3026 return -1;
3027 }
3028 // Don't delete newWorkers: AddWorkers() will do that
3029 }
3030 else {
3031 PDB(kGlobal, 2)
3032 Info("PollForNewWorkers", "No new worker found");
3033 delete newWorkers;
3034 }
3035
3036 return nNewWorkers;
3037}
3038
3039////////////////////////////////////////////////////////////////////////////////
3040/// Remove links to objects in list 'ol' from gDirectory
3041
3043{
3044 if (ol) {
3045 TIter nxo(ol);
3046 TObject *o = 0;
3047 while ((o = nxo()))
3048 gDirectory->RecursiveRemove(o);
3049 }
3050}
3051
3052////////////////////////////////////////////////////////////////////////////////
3053/// Collect and analyze available input from socket s.
3054/// Returns 0 on success, -1 if any failure occurs.
3055
3057{
3058 TMessage *mess;
3059
3060 Int_t recvrc = 0;
3061 if ((recvrc = s->Recv(mess)) < 0) {
3062 PDB(kCollect,2)
3063 Info("CollectInputFrom","%p: got %d from Recv()", s, recvrc);
3064 Bool_t bad = kTRUE;
3065 if (recvrc == -5) {
3066 // Broken connection: try reconnection
3068 if (s->Reconnect() == 0) {
3070 bad = kFALSE;
3071 }
3072 }
3073 if (bad)
3074 MarkBad(s, "problems receiving a message in TProof::CollectInputFrom(...)");
3075 // Ignore this wake up
3076 return -1;
3077 }
3078 if (!mess) {
3079 // we get here in case the remote server died
3080 MarkBad(s, "undefined message in TProof::CollectInputFrom(...)");
3081 return -1;
3082 }
3083 Int_t rc = 0;
3084
3085 Int_t what = mess->What();
3086 TSlave *sl = FindSlave(s);
3087 rc = HandleInputMessage(sl, mess, deactonfail);
3088 if (rc == 1 && (endtype >= 0) && (what != endtype))
3089 // This message was for the base monitor in recursive case
3090 rc = 2;
3091
3092 // We are done successfully
3093 return rc;
3094}
3095
3096////////////////////////////////////////////////////////////////////////////////
3097/// Analyze the received message.
3098/// Returns 0 on success (1 if this the last message from this socket), -1 if
3099/// any failure occurs.
3100
3102{
3103 char str[512];
3104 TObject *obj;
3105 Int_t rc = 0;
3106
3107 if (!mess || !sl) {
3108 Warning("HandleInputMessage", "given an empty message or undefined worker");
3109 return -1;
3110 }
3111 Bool_t delete_mess = kTRUE;
3112 TSocket *s = sl->GetSocket();
3113 if (!s) {
3114 Warning("HandleInputMessage", "worker socket is undefined");
3115 return -1;
3116 }
3117
3118 // The message type
3119 Int_t what = mess->What();
3120
3121 PDB(kCollect,3)
3122 Info("HandleInputMessage", "got type %d from '%s'", what, sl->GetOrdinal());
3123
3124 switch (what) {
3125
3126 case kMESS_OK:
3127 // Add the message to the list
3128 fRecvMessages->Add(mess);
3129 delete_mess = kFALSE;
3130 break;
3131
3132 case kMESS_OBJECT:
3133 if (fPlayer) fPlayer->HandleRecvHisto(mess);
3134 break;
3135
3136 case kPROOF_FATAL:
3137 { TString msg;
3138 if ((mess->BufferSize() > mess->Length()))
3139 (*mess) >> msg;
3140 if (msg.IsNull()) {
3141 MarkBad(s, "received kPROOF_FATAL");
3142 } else {
3143 MarkBad(s, msg);
3144 }
3145 }
3147 // Finalize the progress dialog
3148 Emit("StopProcess(Bool_t)", kTRUE);
3149 }
3150 break;
3151
3152 case kPROOF_STOP:
3153 // Stop collection from this worker
3154 Info("HandleInputMessage", "received kPROOF_STOP from %s: disabling any further collection this worker",
3155 sl->GetOrdinal());
3156 rc = 1;
3157 break;
3158
3160 // Add the message to the list
3161 fRecvMessages->Add(mess);
3162 delete_mess = kFALSE;
3163 rc = 1;
3164 break;
3165
3166 case kPROOF_TOUCH:
3167 // send a request for touching the remote admin file
3168 {
3169 sl->Touch();
3170 }
3171 break;
3172
3173 case kPROOF_GETOBJECT:
3174 // send slave object it asks for
3175 mess->ReadString(str, sizeof(str));
3176 obj = gDirectory->Get(str);
3177 if (obj)
3178 s->SendObject(obj);
3179 else
3180 s->Send(kMESS_NOTOK);
3181 break;
3182
3183 case kPROOF_GETPACKET:
3184 {
3185 PDB(kGlobal,2)
3186 Info("HandleInputMessage","%s: kPROOF_GETPACKET", sl->GetOrdinal());
3187 TDSetElement *elem = 0;
3188 elem = fPlayer ? fPlayer->GetNextPacket(sl, mess) : 0;
3189
3190 if (elem != (TDSetElement*) -1) {
3192 answ << elem;
3193 s->Send(answ);
3194
3195 while (fWaitingSlaves != 0 && fWaitingSlaves->GetSize()) {
3196 TPair *p = (TPair*) fWaitingSlaves->First();
3197 s = (TSocket*) p->Key();
3198 TMessage *m = (TMessage*) p->Value();
3199
3200 elem = fPlayer ? fPlayer->GetNextPacket(sl, m) : 0;
3201 if (elem != (TDSetElement*) -1) {
3203 a << elem;
3204 s->Send(a);
3205 // remove has to happen via Links because TPair does not have
3206 // a Compare() function and therefore RemoveFirst() and
3207 // Remove(TObject*) do not work
3209 delete p;
3210 delete m;
3211 } else {
3212 break;
3213 }
3214 }
3215 } else {
3216 if (fWaitingSlaves == 0) fWaitingSlaves = new TList;
3217 fWaitingSlaves->Add(new TPair(s, mess));
3218 delete_mess = kFALSE;
3219 }
3220 }
3221 break;
3222
3223 case kPROOF_LOGFILE:
3224 {
3225 Int_t size;
3226 (*mess) >> size;
3227 PDB(kGlobal,2)
3228 Info("HandleInputMessage","%s: kPROOF_LOGFILE: size: %d", sl->GetOrdinal(), size);
3229 RecvLogFile(s, size);
3230 }
3231 break;
3232
3233 case kPROOF_LOGDONE:
3234 (*mess) >> sl->fStatus >> sl->fParallel;
3235 PDB(kCollect,2)
3236 Info("HandleInputMessage","%s: kPROOF_LOGDONE: status %d parallel %d",
3237 sl->GetOrdinal(), sl->fStatus, sl->fParallel);
3238 if (sl->fStatus != 0) {
3239 // Return last nonzero status
3240 fStatus = sl->fStatus;
3241 // Deactivate the worker, if required
3242 if (deactonfail) DeactivateWorker(sl->fOrdinal);
3243 }
3244 // Remove from the workers-ready list
3248 }
3249 rc = 1;
3250 break;
3251
3252 case kPROOF_GETSTATS:
3253 {
3254 (*mess) >> sl->fBytesRead >> sl->fRealTime >> sl->fCpuTime
3255 >> sl->fWorkDir >> sl->fProofWorkDir;
3256 PDB(kCollect,2)
3257 Info("HandleInputMessage", "kPROOF_GETSTATS: %s", sl->fWorkDir.Data());
3258 TString img;
3259 if ((mess->BufferSize() > mess->Length()))
3260 (*mess) >> img;
3261 // Set image
3262 if (img.IsNull()) {
3263 if (sl->fImage.IsNull())
3264 sl->fImage.Form("%s:%s", TUrl(sl->fName).GetHostFQDN(),
3265 sl->fProofWorkDir.Data());
3266 } else {
3267 sl->fImage = img;
3268 }
3269 PDB(kGlobal,2)
3270 Info("HandleInputMessage",
3271 "kPROOF_GETSTATS:%s image: %s", sl->GetOrdinal(), sl->GetImage());
3272
3273 fBytesRead += sl->fBytesRead;
3274 fRealTime += sl->fRealTime;
3275 fCpuTime += sl->fCpuTime;
3276 rc = 1;
3277 }
3278 break;
3279
3280 case kPROOF_GETPARALLEL:
3281 {
3282 Bool_t async = kFALSE;
3283 (*mess) >> sl->fParallel;
3284 if ((mess->BufferSize() > mess->Length()))
3285 (*mess) >> async;
3286 rc = (async) ? 0 : 1;
3287 }
3288 break;
3289
3290 case kPROOF_CHECKFILE:
3291 { // New servers (>= 5.22) send the status
3292 if ((mess->BufferSize() > mess->Length())) {
3293 (*mess) >> fCheckFileStatus;
3294 } else {
3295 // Form old servers this meant success (failure was signaled with the
3296 // dangerous kPROOF_FATAL)
3297 fCheckFileStatus = 1;
3298 }
3299 rc = 1;
3300 }
3301 break;
3302
3303 case kPROOF_SENDFILE:
3304 { // New server: signals ending of sendfile operation
3305 rc = 1;
3306 }
3307 break;
3308
3310 {
3311 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_PACKAGE_LIST: enter");
3312 Int_t type = 0;
3313 (*mess) >> type;
3314 switch (type) {
3318 if (fEnabledPackages) {
3320 } else {
3321 Error("HandleInputMessage",
3322 "kPROOF_PACKAGE_LIST: kListEnabledPackages: TList not found in message!");
3323 }
3324 break;
3328 if (fAvailablePackages) {
3330 } else {
3331 Error("HandleInputMessage",
3332 "kPROOF_PACKAGE_LIST: kListPackages: TList not found in message!");
3333 }
3334 break;
3335 default:
3336 Error("HandleInputMessage", "kPROOF_PACKAGE_LIST: unknown type: %d", type);
3337 }
3338 }
3339 break;
3340
3341 case kPROOF_SENDOUTPUT:
3342 {
3343 // We start measuring the merging time
3345
3346 // Worker is ready to send output: make sure the relevant bit is reset
3348 PDB(kGlobal,2)
3349 Info("HandleInputMessage","kPROOF_SENDOUTPUT: enter (%s)", sl->GetOrdinal());
3350 // Create the list if not yet done
3351 if (!fWrksOutputReady) {
3352 fWrksOutputReady = new TList;
3354 }
3355 fWrksOutputReady->Add(sl);
3356 }
3357 break;
3358
3360 {
3361 // We start measuring the merging time
3363
3364 PDB(kGlobal,2)
3365 Info("HandleInputMessage","kPROOF_OUTPUTOBJECT: enter");
3366 Int_t type = 0;
3367 const char *prefix = gProofServ ? gProofServ->GetPrefix() : "Lite-0";
3369 Info("HandleInputMessage", "finalization on %s started ...", prefix);
3371 }
3372
3373 while ((mess->BufferSize() > mess->Length())) {
3374 (*mess) >> type;
3375 // If a query result header, add it to the player list
3376 if (fPlayer) {
3377 if (type == 0) {
3378 // Retrieve query result instance (output list not filled)
3379 TQueryResult *pq =
3381 if (pq) {
3382 // Add query to the result list in TProofPlayer
3385 // And clear the output list, as we start merging a new set of results
3386 if (fPlayer->GetOutputList())
3388 // Add the unique query tag as TNamed object to the input list
3389 // so that it is available in TSelectors for monitoring
3390 TString qid = TString::Format("%s:%s",pq->GetTitle(),pq->GetName());
3391 if (fPlayer->GetInputList()->FindObject("PROOF_QueryTag"))
3392 fPlayer->GetInputList()->Remove(fPlayer->GetInputList()->FindObject("PROOF_QueryTag"));
3393 fPlayer->AddInput(new TNamed("PROOF_QueryTag", qid.Data()));
3394 } else {
3395 Warning("HandleInputMessage","kPROOF_OUTPUTOBJECT: query result missing");
3396 }
3397 } else if (type > 0) {
3398 // Read object
3399 TObject *o = mess->ReadObject(TObject::Class());
3400 // Increment counter on the client side
3402 TString msg;
3403 Bool_t changed = kFALSE;
3404 msg.Form("%s: merging output objects ... %s", prefix, fMergePrg.Export(changed));
3405 if (gProofServ) {
3407 } else if (IsTty() || changed) {
3408 fprintf(stderr, "%s\r", msg.Data());
3409 }
3410 // Add or merge it
3411 if ((fPlayer->AddOutputObject(o) == 1)) {
3412 // Remove the object if it has been merged
3413 SafeDelete(o);
3414 }
3415 if (type > 1) {
3416 // Update the merger progress info
3418 if (TestBit(TProof::kIsClient) && !IsLite()) {
3419 // In PROOFLite this has to be done once only in TProofLite::Process
3421 if (pq) {
3423 // Add input objects (do not override remote settings, if any)
3424 TObject *xo = 0;
3425 TIter nxin(fPlayer->GetInputList());
3426 // Servers prior to 5.28/00 do not create the input list in the TQueryResult
3427 if (!pq->GetInputList()) pq->SetInputList(new TList());
3428 while ((xo = nxin()))
3429 if (!pq->GetInputList()->FindObject(xo->GetName()))
3430 pq->AddInput(xo->Clone());
3431 // If the last object, notify the GUI that the result arrived
3432 QueryResultReady(TString::Format("%s:%s", pq->GetTitle(), pq->GetName()));
3433 }
3434 // Processing is over
3435 UpdateDialog();
3436 }
3437 }
3438 }
3439 } else {
3440 Warning("HandleInputMessage", "kPROOF_OUTPUTOBJECT: player undefined!");
3441 }
3442 }
3443 }
3444 break;
3445
3446 case kPROOF_OUTPUTLIST:
3447 {
3448 // We start measuring the merging time
3449
3450 PDB(kGlobal,2)
3451 Info("HandleInputMessage","%s: kPROOF_OUTPUTLIST: enter", sl->GetOrdinal());
3452 TList *out = 0;
3453 if (fPlayer) {
3455 if (TestBit(TProof::kIsMaster) || fProtocol < 7) {
3456 out = (TList *) mess->ReadObject(TList::Class());
3457 } else {
3458 TQueryResult *pq =
3460 if (pq) {
3461 // Add query to the result list in TProofPlayer
3464 // To avoid accidental cleanups from anywhere else
3465 // remove objects from gDirectory and clone the list
3466 out = pq->GetOutputList();
3467 CleanGDirectory(out);
3468 out = (TList *) out->Clone();
3469 // Notify the GUI that the result arrived
3470 QueryResultReady(TString::Format("%s:%s", pq->GetTitle(), pq->GetName()));
3471 } else {
3472 PDB(kGlobal,2)
3473 Info("HandleInputMessage",
3474 "%s: kPROOF_OUTPUTLIST: query result missing", sl->GetOrdinal());
3475 }
3476 }
3477 if (out) {
3478 out->SetOwner();
3479 fPlayer->AddOutput(out); // Incorporate the list
3480 SafeDelete(out);
3481 } else {
3482 PDB(kGlobal,2)
3483 Info("HandleInputMessage",
3484 "%s: kPROOF_OUTPUTLIST: outputlist is empty", sl->GetOrdinal());
3485 }
3486 } else {
3487 Warning("HandleInputMessage",
3488 "%s: kPROOF_OUTPUTLIST: player undefined!", sl->GetOrdinal());
3489 }
3490 // On clients at this point processing is over
3491 if (TestBit(TProof::kIsClient) && !IsLite())
3492 UpdateDialog();
3493 }
3494 break;
3495
3496 case kPROOF_QUERYLIST:
3497 {
3498 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_QUERYLIST: enter");
3499 (*mess) >> fOtherQueries >> fDrawQueries;
3500 if (fQueries) {
3501 fQueries->Delete();
3502 delete fQueries;
3503 fQueries = 0;
3504 }
3505 fQueries = (TList *) mess->ReadObject(TList::Class());
3506 }
3507 break;
3508
3509 case kPROOF_RETRIEVE:
3510 {
3511 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_RETRIEVE: enter");
3512 TQueryResult *pq =
3514 if (pq && fPlayer) {
3516 // Notify the GUI that the result arrived
3517 QueryResultReady(TString::Format("%s:%s", pq->GetTitle(), pq->GetName()));
3518 } else {
3519 PDB(kGlobal,2)
3520 Info("HandleInputMessage",
3521 "kPROOF_RETRIEVE: query result missing or player undefined");
3522 }
3523 }
3524 break;
3525
3526 case kPROOF_MAXQUERIES:
3527 {
3528 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_MAXQUERIES: enter");
3529 Int_t max = 0;
3530
3531 (*mess) >> max;
3532 Printf("Number of queries fully kept remotely: %d", max);
3533 }
3534 break;
3535
3537 {
3538 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_SERVERSTARTED: enter");
3539
3540 UInt_t tot = 0, done = 0;
3541 TString action;
3542 Bool_t st = kTRUE;
3543
3544 (*mess) >> action >> tot >> done >> st;
3545
3547 if (tot) {
3548 TString type = (action.Contains("submas")) ? "submasters"
3549 : "workers";
3550 Int_t frac = (Int_t) (done*100.)/tot;
3551 char msg[512] = {0};
3552 if (frac >= 100) {
3553 snprintf(msg, 512, "%s: OK (%d %s) \n",
3554 action.Data(),tot, type.Data());
3555 } else {
3556 snprintf(msg, 512, "%s: %d out of %d (%d %%)\r",
3557 action.Data(), done, tot, frac);
3558 }
3559 if (fSync)
3560 fprintf(stderr,"%s", msg);
3561 else
3562 NotifyLogMsg(msg, 0);
3563 }
3564 // Notify GUIs
3565 StartupMessage(action.Data(), st, (Int_t)done, (Int_t)tot);
3566 } else {
3567
3568 // Just send the message one level up
3570 m << action << tot << done << st;
3572 }
3573 }
3574 break;
3575
3577 {
3578 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_DATASET_STATUS: enter");
3579
3580 UInt_t tot = 0, done = 0;
3581 TString action;
3582 Bool_t st = kTRUE;
3583
3584 (*mess) >> action >> tot >> done >> st;
3585
3587 if (tot) {
3588 TString type = "files";
3589 Int_t frac = (Int_t) (done*100.)/tot;
3590 char msg[512] = {0};
3591 if (frac >= 100) {
3592 snprintf(msg, 512, "%s: OK (%d %s) \n",
3593 action.Data(),tot, type.Data());
3594 } else {
3595 snprintf(msg, 512, "%s: %d out of %d (%d %%)\r",
3596 action.Data(), done, tot, frac);
3597 }
3598 if (fSync)
3599 fprintf(stderr,"%s", msg);
3600 else
3601 NotifyLogMsg(msg, 0);
3602 }
3603 // Notify GUIs
3604 DataSetStatus(action.Data(), st, (Int_t)done, (Int_t)tot);
3605 } else {
3606
3607 // Just send the message one level up
3609 m << action << tot << done << st;
3611 }
3612 }
3613 break;
3614
3616 {
3617 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_STARTPROCESS: enter");
3618
3619 // For Proof-Lite this variable is the number of workers and is set
3620 // by the player
3621 if (!IsLite()) {
3622 fNotIdle = 1;
3624 }
3625
3626 // Redirect the output, if needed
3628
3629 // The signal is used on masters by XrdProofdProtocol to catch
3630 // the start of processing; on clients it allows to update the
3631 // progress dialog
3632 if (!TestBit(TProof::kIsMaster)) {
3633
3634 // This is the end of preparation
3635 fQuerySTW.Stop();
3637 PDB(kGlobal,2) Info("HandleInputMessage","Preparation time: %f s", fPrepTime);
3638
3639 TString selec;
3640 Int_t dsz = -1;
3641 Long64_t first = -1, nent = -1;
3642 (*mess) >> selec >> dsz >> first >> nent;
3643 // Start or reset the progress dialog
3644 if (!gROOT->IsBatch()) {
3645 if (fProgressDialog &&
3648 fProgressDialog->ExecPlugin(5, this,
3649 selec.Data(), dsz, first, nent);
3651 } else {
3652 ResetProgressDialog(selec, dsz, first, nent);
3653 }
3654 }
3656 }
3657 }
3658 }
3659 break;
3660
3661 case kPROOF_ENDINIT:
3662 {
3663 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_ENDINIT: enter");
3664
3666 if (fPlayer)
3668 }
3669 }
3670 break;
3671
3672 case kPROOF_SETIDLE:
3673 {
3674 PDB(kGlobal,2)
3675 Info("HandleInputMessage","kPROOF_SETIDLE from '%s': enter (%d)", sl->GetOrdinal(), fNotIdle);
3676
3677 // The session is idle
3678 if (IsLite()) {
3679 if (fNotIdle > 0) {
3680 fNotIdle--;
3681 PDB(kGlobal,2)
3682 Info("HandleInputMessage", "%s: got kPROOF_SETIDLE", sl->GetOrdinal());
3683 } else {
3684 Warning("HandleInputMessage",
3685 "%s: got kPROOF_SETIDLE but no running workers ! protocol error?",
3686 sl->GetOrdinal());
3687 }
3688 } else {
3689 fNotIdle = 0;
3690 // Check if the query has been enqueued
3691 if ((mess->BufferSize() > mess->Length()))
3692 (*mess) >> fIsWaiting;
3693 }
3694 }
3695 break;
3696
3698 {
3699 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_QUERYSUBMITTED: enter");
3700
3701 // We have received the sequential number
3702 (*mess) >> fSeqNum;
3703 Bool_t sync = fSync;
3704 if ((mess->BufferSize() > mess->Length()))
3705 (*mess) >> sync;
3706 if (sync != fSync && fSync) {
3707 // The server required to switch to asynchronous mode
3708 Activate();
3709 fSync = kFALSE;
3710 }
3711 DisableGoAsyn();
3712 // Check if the query has been enqueued
3713 fIsWaiting = kTRUE;
3714 // For Proof-Lite this variable is the number of workers and is set by the player
3715 if (!IsLite())
3716 fNotIdle = 1;
3717
3718 rc = 1;
3719 }
3720 break;
3721
3722 case kPROOF_SESSIONTAG:
3723 {
3724 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_SESSIONTAG: enter");
3725
3726 // We have received the unique tag and save it as name of this object
3727 TString stag;
3728 (*mess) >> stag;
3729 SetName(stag);
3730 // In the TSlave object
3731 sl->SetSessionTag(stag);
3732 // Server may have also sent the group
3733 if ((mess->BufferSize() > mess->Length()))
3734 (*mess) >> fGroup;
3735 // Server may have also sent the user
3736 if ((mess->BufferSize() > mess->Length())) {
3737 TString usr;
3738 (*mess) >> usr;
3739 if (!usr.IsNull()) fUrl.SetUser(usr.Data());
3740 }
3741 }
3742 break;
3743
3744 case kPROOF_FEEDBACK:
3745 {
3746 PDB(kGlobal,2)
3747 Info("HandleInputMessage","kPROOF_FEEDBACK: enter");
3748 TList *out = (TList *) mess->ReadObject(TList::Class());
3749 out->SetOwner();
3750 if (fPlayer)
3751 fPlayer->StoreFeedback(sl, out); // Adopts the list
3752 else
3753 // Not yet ready: stop collect asap
3754 rc = 1;
3755 }
3756 break;
3757
3758 case kPROOF_AUTOBIN:
3759 {
3760 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_AUTOBIN: enter");
3761
3762 TString name;
3763 Double_t xmin, xmax, ymin, ymax, zmin, zmax;
3764
3765 (*mess) >> name >> xmin >> xmax >> ymin >> ymax >> zmin >> zmax;
3766
3768
3770
3771 answ << name << xmin << xmax << ymin << ymax << zmin << zmax;
3772
3773 s->Send(answ);
3774 }
3775 break;
3776
3777 case kPROOF_PROGRESS:
3778 {
3779 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_PROGRESS: enter");
3780
3781 if (GetRemoteProtocol() > 25) {
3782 // New format
3784 (*mess) >> pi;
3785 fPlayer->Progress(sl,pi);
3786 } else if (GetRemoteProtocol() > 11) {
3787 Long64_t total, processed, bytesread;
3788 Float_t initTime, procTime, evtrti, mbrti;
3789 (*mess) >> total >> processed >> bytesread
3790 >> initTime >> procTime
3791 >> evtrti >> mbrti;
3792 if (fPlayer)
3793 fPlayer->Progress(sl, total, processed, bytesread,
3794 initTime, procTime, evtrti, mbrti);
3795
3796 } else {
3797 // Old format
3798 Long64_t total, processed;
3799 (*mess) >> total >> processed;
3800 if (fPlayer)
3801 fPlayer->Progress(sl, total, processed);
3802 }
3803 }
3804 break;
3805
3806 case kPROOF_STOPPROCESS:
3807 {
3808 // This message is sent from a worker that finished processing.
3809 // We determine whether it was asked to finish by the
3810 // packetizer or stopped during processing a packet
3811 // (by TProof::RemoveWorkers() or by an external signal).
3812 // In the later case call packetizer->MarkBad.
3813 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_STOPPROCESS: enter");
3814
3815 Long64_t events = 0;
3816 Bool_t abort = kFALSE;
3817 TProofProgressStatus *status = 0;
3818
3819 if ((mess->BufferSize() > mess->Length()) && (fProtocol > 18)) {
3820 (*mess) >> status >> abort;
3821 } else if ((mess->BufferSize() > mess->Length()) && (fProtocol > 8)) {
3822 (*mess) >> events >> abort;
3823 } else {
3824 (*mess) >> events;
3825 }
3826 if (fPlayer) {
3827 if (fProtocol > 18) {
3828 TList *listOfMissingFiles = 0;
3829 if (!(listOfMissingFiles = (TList *)GetOutput("MissingFiles"))) {
3830 listOfMissingFiles = new TList();
3831 listOfMissingFiles->SetName("MissingFiles");
3832 if (fPlayer)
3833 fPlayer->AddOutputObject(listOfMissingFiles);
3834 }
3835 if (fPlayer->GetPacketizer()) {
3836 Int_t ret =
3837 fPlayer->GetPacketizer()->AddProcessed(sl, status, 0, &listOfMissingFiles);
3838 if (ret > 0)
3839 fPlayer->GetPacketizer()->MarkBad(sl, status, &listOfMissingFiles);
3840 // This object is now owned by the packetizer
3841 status = 0;
3842 }
3843 if (status) fPlayer->AddEventsProcessed(status->GetEntries());
3844 } else {
3845 fPlayer->AddEventsProcessed(events);
3846 }
3847 }
3848 SafeDelete(status);
3850 Emit("StopProcess(Bool_t)", abort);
3851 break;
3852 }
3853
3854 case kPROOF_SUBMERGER:
3855 {
3856 PDB(kGlobal,2) Info("HandleInputMessage", "kPROOF_SUBMERGER: enter");
3857 HandleSubmerger(mess, sl);
3858 }
3859 break;
3860
3862 {
3863 PDB(kGlobal,2) Info("HandleInputMessage", "kPROOF_GETSLAVEINFO: enter");
3864
3865 Bool_t active = (GetListOfActiveSlaves()->FindObject(sl) != 0);
3866 Bool_t bad = (GetListOfBadSlaves()->FindObject(sl) != 0);
3867 TList* tmpinfo = 0;
3868 (*mess) >> tmpinfo;
3869 if (tmpinfo == 0) {
3870 Error("HandleInputMessage", "kPROOF_GETSLAVEINFO: no list received!");
3871 } else {
3872 tmpinfo->SetOwner(kFALSE);
3873 Int_t nentries = tmpinfo->GetSize();
3874 for (Int_t i=0; i<nentries; i++) {
3875 TSlaveInfo* slinfo =
3876 dynamic_cast<TSlaveInfo*>(tmpinfo->At(i));
3877 if (slinfo) {
3878 // If PROOF-Lite
3879 if (IsLite()) slinfo->fHostName = gSystem->HostName();
3880 // Check if we have already a instance for this worker
3881 TIter nxw(fSlaveInfo);
3882 TSlaveInfo *ourwi = 0;
3883 while ((ourwi = (TSlaveInfo *)nxw())) {
3884 if (!strcmp(ourwi->GetOrdinal(), slinfo->GetOrdinal())) {
3885 ourwi->SetSysInfo(slinfo->GetSysInfo());
3886 ourwi->fHostName = slinfo->GetName();
3887 if (slinfo->GetDataDir() && (strlen(slinfo->GetDataDir()) > 0))
3888 ourwi->fDataDir = slinfo->GetDataDir();
3889 break;
3890 }
3891 }
3892 if (!ourwi) {
3893 fSlaveInfo->Add(slinfo);
3894 } else {
3895 slinfo = ourwi;
3896 }
3897 if (slinfo->fStatus != TSlaveInfo::kBad) {
3898 if (!active) slinfo->SetStatus(TSlaveInfo::kNotActive);
3899 if (bad) slinfo->SetStatus(TSlaveInfo::kBad);
3900 }
3901 if (sl->GetMsd() && (strlen(sl->GetMsd()) > 0))
3902 slinfo->fMsd = sl->GetMsd();
3903 }
3904 }
3905 delete tmpinfo;
3906 rc = 1;
3907 }
3908 }
3909 break;
3910
3912 {
3913 PDB(kGlobal,2)
3914 Info("HandleInputMessage", "kPROOF_VALIDATE_DSET: enter");
3915 TDSet* dset = 0;
3916 (*mess) >> dset;
3917 if (!fDSet)
3918 Error("HandleInputMessage", "kPROOF_VALIDATE_DSET: fDSet not set");
3919 else
3920 fDSet->Validate(dset);
3921 delete dset;
3922 }
3923 break;
3924
3925 case kPROOF_DATA_READY:
3926 {
3927 PDB(kGlobal,2) Info("HandleInputMessage", "kPROOF_DATA_READY: enter");
3928 Bool_t dataready = kFALSE;
3929 Long64_t totalbytes, bytesready;
3930 (*mess) >> dataready >> totalbytes >> bytesready;
3931 fTotalBytes += totalbytes;
3932 fBytesReady += bytesready;
3933 if (dataready == kFALSE) fDataReady = dataready;
3934 }
3935 break;
3936
3937 case kPROOF_PING:
3938 // do nothing (ping is already acknowledged)
3939 break;
3940
3941 case kPROOF_MESSAGE:
3942 {
3943 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_MESSAGE: enter");
3944
3945 // We have received the unique tag and save it as name of this object
3946 TString msg;
3947 (*mess) >> msg;
3948 Bool_t lfeed = kTRUE;
3949 if ((mess->BufferSize() > mess->Length()))
3950 (*mess) >> lfeed;
3951
3953
3954 if (fSync) {
3955 // Notify locally
3956 fprintf(stderr,"%s%c", msg.Data(), (lfeed ? '\n' : '\r'));
3957 } else {
3958 // Notify locally taking care of redirection, windows logs, ...
3959 NotifyLogMsg(msg, (lfeed ? "\n" : "\r"));
3960 }
3961 } else {
3962
3963 // The message is logged for debugging purposes.
3964 fprintf(stderr,"%s%c", msg.Data(), (lfeed ? '\n' : '\r'));
3965 if (gProofServ) {
3966 // We hide it during normal operations
3968
3969 // And send the message one level up
3970 gProofServ->SendAsynMessage(msg, lfeed);
3971 }
3972 }
3973 }
3974 break;
3975
3977 {
3978 TString vac;
3979 (*mess) >> vac;
3980 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_VERSARCHCOMP: %s", vac.Data());
3981 Int_t from = 0;
3982 TString vers, archcomp;
3983 if (vac.Tokenize(vers, from, "|"))
3984 vac.Tokenize(archcomp, from, "|");
3985 sl->SetArchCompiler(archcomp);
3986 vers.ReplaceAll(":","|");
3987 sl->SetROOTVersion(vers);
3988 }
3989 break;
3990
3991 default:
3992 {
3993 Error("HandleInputMessage", "unknown command received from '%s' (what = %d)",
3994 sl->GetOrdinal(), what);
3995 }
3996 break;
3997 }
3998
3999 // Cleanup
4000 if (delete_mess)
4001 delete mess;
4002
4003 // We are done successfully
4004 return rc;
4005}
4006
4007////////////////////////////////////////////////////////////////////////////////
4008/// Process a message of type kPROOF_SUBMERGER
4009
4011{
4012 // Message sub-type
4013 Int_t type = 0;
4014 (*mess) >> type;
4015 TSocket *s = sl->GetSocket();
4016
4017 switch (type) {
4018 case kOutputSent:
4019 {
4020 if (IsEndMaster()) {
4021 Int_t merger_id = -1;
4022 (*mess) >> merger_id;
4023
4024 PDB(kSubmerger, 2)
4025 Info("HandleSubmerger", "kOutputSent: Worker %s:%d:%s had sent its output to merger #%d",
4026 sl->GetName(), sl->GetPort(), sl->GetOrdinal(), merger_id);
4027
4028 if (!fMergers || fMergers->GetSize() <= merger_id) {
4029 Error("HandleSubmerger", "kOutputSize: #%d not in list ", merger_id);
4030 break;
4031 }
4032 TMergerInfo * mi = (TMergerInfo *) fMergers->At(merger_id);
4033 mi->SetMergedWorker();
4034 if (mi->AreAllWorkersMerged()) {
4035 mi->Deactivate();
4036 if (GetActiveMergersCount() == 0) {
4037 fMergers->Clear();
4038 delete fMergers;
4040 fMergersCount = -1;
4042 PDB(kSubmerger, 2) Info("HandleSubmerger", "all mergers removed ... ");
4043 }
4044 }
4045 } else {
4046 PDB(kSubmerger, 2) Error("HandleSubmerger","kOutputSent: received not on endmaster!");
4047 }
4048 }
4049 break;
4050
4051 case kMergerDown:
4052 {
4053 Int_t merger_id = -1;
4054 (*mess) >> merger_id;
4055
4056 PDB(kSubmerger, 2) Info("HandleSubmerger", "kMergerDown: #%d ", merger_id);
4057
4058 if (!fMergers || fMergers->GetSize() <= merger_id) {
4059 Error("HandleSubmerger", "kMergerDown: #%d not in list ", merger_id);
4060 break;
4061 }
4062
4063 TMergerInfo * mi = (TMergerInfo *) fMergers->At(merger_id);
4064 if (!mi->IsActive()) {
4065 break;
4066 } else {
4067 mi->Deactivate();
4068 }
4069
4070 // Stop the invalid merger in the case it is still listening
4072 stop << Int_t(kStopMerging);
4073 stop << 0;
4074 s->Send(stop);
4075
4076 // Ask for results from merger (only original results from this node as worker are returned)
4077 AskForOutput(mi->GetMerger());
4078
4079 // Ask for results from all workers assigned to this merger
4080 TIter nxo(mi->GetWorkers());
4081 TObject * o = 0;
4082 while ((o = nxo())) {
4083 AskForOutput((TSlave *)o);
4084 }
4085 PDB(kSubmerger, 2) Info("HandleSubmerger", "kMergerDown:%d: exit", merger_id);
4086 }
4087 break;
4088
4089 case kOutputSize:
4090 {
4091 if (IsEndMaster()) {
4092 PDB(kSubmerger, 2)
4093 Info("HandleSubmerger", "worker %s reported as finished ", sl->GetOrdinal());
4094
4095 const char *prefix = gProofServ ? gProofServ->GetPrefix() : "Lite-0";
4096 if (!fFinalizationRunning) {
4097 Info("HandleSubmerger", "finalization on %s started ...", prefix);
4099 }
4100
4101 Int_t output_size = 0;
4102 Int_t merging_port = 0;
4103 (*mess) >> output_size >> merging_port;
4104
4105 PDB(kSubmerger, 2) Info("HandleSubmerger",
4106 "kOutputSize: Worker %s:%d:%s reports %d output objects (+ available port %d)",
4107 sl->GetName(), sl->GetPort(), sl->GetOrdinal(), output_size, merging_port);
4108 TString msg;
4109 if (!fMergersSet) {
4110
4112
4113 // First pass - setting number of mergers according to user or dynamically
4114 fMergersCount = -1; // No mergers used if not set by user
4115 TParameter<Int_t> *mc = dynamic_cast<TParameter<Int_t> *>(GetParameter("PROOF_UseMergers"));
4116 if (mc) fMergersCount = mc->GetVal(); // Value set by user
4117 TParameter<Int_t> *mh = dynamic_cast<TParameter<Int_t> *>(GetParameter("PROOF_MergersByHost"));
4118 if (mh) fMergersByHost = (mh->GetVal() != 0) ? kTRUE : kFALSE; // Assign submergers by hostname
4119
4120 // Mergers count specified by user but not valid
4121 if (fMergersCount < 0 || (fMergersCount > (activeWorkers/2) )) {
4122 msg.Form("%s: Invalid request: cannot start %d mergers for %d workers",
4123 prefix, fMergersCount, activeWorkers);
4124 if (gProofServ)
4126 else
4127 Printf("%s",msg.Data());
4128 fMergersCount = 0;
4129 }
4130 // Mergers count will be set dynamically
4131 if ((fMergersCount == 0) && (!fMergersByHost)) {
4132 if (activeWorkers > 1) {
4133 fMergersCount = TMath::Nint(TMath::Sqrt(activeWorkers));
4134 if (activeWorkers / fMergersCount < 2)
4135 fMergersCount = (Int_t) TMath::Sqrt(activeWorkers);
4136 }
4137 if (fMergersCount > 1)
4138 msg.Form("%s: Number of mergers set dynamically to %d (for %d workers)",
4139 prefix, fMergersCount, activeWorkers);
4140 else {
4141 msg.Form("%s: No mergers will be used for %d workers",
4142 prefix, activeWorkers);
4143 fMergersCount = -1;
4144 }
4145 if (gProofServ)
4147 else
4148 Printf("%s",msg.Data());
4149 } else if (fMergersByHost) {
4150 // We force mergers at host level to minimize network traffic
4151 if (activeWorkers > 1) {
4152 fMergersCount = 0;
4153 THashList hosts;
4154 TIter nxwk(fSlaves);
4155 TObject *wrk = 0;
4156 while ((wrk = nxwk())) {
4157 if (!hosts.FindObject(wrk->GetName())) {
4158 hosts.Add(new TObjString(wrk->GetName()));
4159 fMergersCount++;
4160 }
4161 }
4162 }
4163 if (fMergersCount > 1)
4164 msg.Form("%s: Number of mergers set to %d (for %d workers), one for each slave host",
4165 prefix, fMergersCount, activeWorkers);
4166 else {
4167 msg.Form("%s: No mergers will be used for %d workers",
4168 prefix, activeWorkers);
4169 fMergersCount = -1;
4170 }
4171 if (gProofServ)
4173 else
4174 Printf("%s",msg.Data());
4175 } else {
4176 msg.Form("%s: Number of mergers set by user to %d (for %d workers)",
4177 prefix, fMergersCount, activeWorkers);
4178 if (gProofServ)
4180 else
4181 Printf("%s",msg.Data());
4182 }
4183
4184 // We started merging; we call it here because fMergersCount is still the original number
4185 // and can be saved internally
4187
4188 // Update merger counters (new workers are not yet active)
4190
4191 if (fMergersCount > 0) {
4192
4193 fMergers = new TList();
4195 // Total number of workers, which will not act as mergers ('pure workers')
4196 fWorkersToMerge = (activeWorkers - fMergersCount);
4197 // Establish the first merger
4198 if (!CreateMerger(sl, merging_port)) {
4199 // Cannot establish first merger
4200 AskForOutput(sl);
4202 fMergersCount--;
4203 }
4205 } else {
4206 AskForOutput(sl);
4207 }
4209 } else {
4210 // Multiple pass
4211 if (fMergersCount == -1) {
4212 // No mergers. Workers send their outputs directly to master
4213 AskForOutput(sl);
4214 } else {
4215 if ((fRedirectNext > 0 ) && (!fMergersByHost)) {
4216 RedirectWorker(s, sl, output_size);
4217 fRedirectNext--;
4218 } else {
4219 Bool_t newMerger = kTRUE;
4220 if (fMergersByHost) {
4221 TIter nxmg(fMergers);
4222 TMergerInfo *mgi = 0;
4223 while ((mgi = (TMergerInfo *) nxmg())) {
4224 if (!strcmp(sl->GetName(), mgi->GetMerger()->GetName())) {
4225 newMerger = kFALSE;
4226 break;
4227 }
4228 }
4229 }
4230 if ((fMergersCount > fMergers->GetSize()) && newMerger) {
4231 // Still not enough mergers established
4232 if (!CreateMerger(sl, merging_port)) {
4233 // Cannot establish a merger
4234 AskForOutput(sl);
4236 fMergersCount--;
4237 }
4238 } else
4239 RedirectWorker(s, sl, output_size);
4240 }
4241 }
4242 }
4243 } else {
4244 Error("HandleSubMerger","kOutputSize received not on endmaster!");
4245 }
4246 }
4247 break;
4248 }
4249}
4250
4251////////////////////////////////////////////////////////////////////////////////
4252/// Redirect output of worker sl to some merger
4253
4255{
4256 Int_t merger_id = -1;
4257
4258 if (fMergersByHost) {
4259 for (Int_t i = 0; i < fMergers->GetSize(); i++) {
4260 TMergerInfo *mgi = (TMergerInfo *)fMergers->At(i);
4261 if (!strcmp(sl->GetName(), mgi->GetMerger()->GetName())) {
4262 merger_id = i;
4263 break;
4264 }
4265 }
4266 } else {
4267 merger_id = FindNextFreeMerger();
4268 }
4269
4270 if (merger_id == -1) {
4271 // No free merger (probably it had crashed before)
4272 AskForOutput(sl);
4273 } else {
4274 TMessage sendoutput(kPROOF_SUBMERGER);
4275 sendoutput << Int_t(kSendOutput);
4276 PDB(kSubmerger, 2)
4277 Info("RedirectWorker", "redirecting worker %s to merger %d", sl->GetOrdinal(), merger_id);
4278
4279 PDB(kSubmerger, 2) Info("RedirectWorker", "redirecting output to merger #%d", merger_id);
4280 if (!fMergers || fMergers->GetSize() <= merger_id) {
4281 Error("RedirectWorker", "#%d not in list ", merger_id);
4282 return;
4283 }
4284 TMergerInfo * mi = (TMergerInfo *) fMergers->At(merger_id);
4285
4286 TString hname = (IsLite()) ? "localhost" : mi->GetMerger()->GetName();
4287 sendoutput << merger_id;
4288 sendoutput << hname;
4289 sendoutput << mi->GetPort();
4290 s->Send(sendoutput);
4291 mi->AddMergedObjects(output_size);
4292 mi->AddWorker(sl);
4293 }
4294}
4295
4296////////////////////////////////////////////////////////////////////////////////
4297/// Return a merger, which is both active and still accepts some workers to be
4298/// assigned to it. It works on the 'round-robin' basis.
4299
4301{
4302 while (fLastAssignedMerger < fMergers->GetSize() &&
4303 (!((TMergerInfo*)fMergers->At(fLastAssignedMerger))->IsActive() ||
4304 ((TMergerInfo*)fMergers->At(fLastAssignedMerger))->AreAllWorkersAssigned())) {
4306 }
4307
4310 } else {
4311 return fLastAssignedMerger++;
4312 }
4313
4314 while (fLastAssignedMerger < fMergers->GetSize() &&
4315 (!((TMergerInfo*)fMergers->At(fLastAssignedMerger))->IsActive() ||
4316 ((TMergerInfo*)fMergers->At(fLastAssignedMerger))->AreAllWorkersAssigned())) {
4318 }
4319
4321 return -1;
4322 } else {
4323 return fLastAssignedMerger++;
4324 }
4325}
4326
4327////////////////////////////////////////////////////////////////////////////////
4328/// Master asks for output from worker sl
4329
4331{
4332 TMessage sendoutput(kPROOF_SUBMERGER);
4333 sendoutput << Int_t(kSendOutput);
4334
4335 PDB(kSubmerger, 2) Info("AskForOutput",
4336 "worker %s was asked to send its output to master",
4337 sl->GetOrdinal());
4338
4339 sendoutput << -1;
4340 sendoutput << TString("master");
4341 sendoutput << -1;
4342 sl->GetSocket()->Send(sendoutput);
4344}
4345
4346////////////////////////////////////////////////////////////////////////////////
4347/// Final update of the progress dialog
4348
4350{
4351 if (!fPlayer) return;
4352
4353 // Handle abort ...
4355 if (fSync)
4356 Info("UpdateDialog",
4357 "processing was aborted - %lld events processed",
4359
4360 if (GetRemoteProtocol() > 11) {
4361 // New format
4362 Progress(-1, fPlayer->GetEventsProcessed(), -1, -1., -1., -1., -1.);
4363 } else {
4365 }
4366 Emit("StopProcess(Bool_t)", kTRUE);
4367 }
4368
4369 // Handle stop ...
4371 if (fSync)
4372 Info("UpdateDialog",
4373 "processing was stopped - %lld events processed",
4375
4376 if (GetRemoteProtocol() > 25) {
4377 // New format
4378 Progress(-1, fPlayer->GetEventsProcessed(), -1, -1., -1., -1., -1., -1, -1, -1.);
4379 } else if (GetRemoteProtocol() > 11) {
4380 Progress(-1, fPlayer->GetEventsProcessed(), -1, -1., -1., -1., -1.);
4381 } else {
4383 }
4384 Emit("StopProcess(Bool_t)", kFALSE);
4385 }
4386
4387 // Final update of the dialog box
4388 if (GetRemoteProtocol() > 25) {
4389 // New format
4390 EmitVA("Progress(Long64_t,Long64_t,Long64_t,Float_t,Float_t,Float_t,Float_t,Int_t,Int_t,Float_t)",
4391 10, (Long64_t)(-1), (Long64_t)(-1), (Long64_t)(-1),(Float_t)(-1.),(Float_t)(-1.),
4392 (Float_t)(-1.),(Float_t)(-1.),(Int_t)(-1),(Int_t)(-1),(Float_t)(-1.));
4393 } else if (GetRemoteProtocol() > 11) {
4394 // New format
4395 EmitVA("Progress(Long64_t,Long64_t,Long64_t,Float_t,Float_t,Float_t,Float_t)",
4396 7, (Long64_t)(-1), (Long64_t)(-1), (Long64_t)(-1),
4397 (Float_t)(-1.),(Float_t)(-1.),(Float_t)(-1.),(Float_t)(-1.));
4398 } else {
4399 EmitVA("Progress(Long64_t,Long64_t)", 2, (Long64_t)(-1), (Long64_t)(-1));
4400 }
4401}
4402
4403////////////////////////////////////////////////////////////////////////////////
4404/// Activate the a-sync input handler.
4405
4407{
4408 TIter next(fSlaves);
4409 TSlave *sl;
4410
4411 while ((sl = (TSlave*) next()))
4412 if (sl->GetInputHandler())
4413 sl->GetInputHandler()->Add();
4414}
4415
4416////////////////////////////////////////////////////////////////////////////////
4417/// De-activate a-sync input handler.
4418
4420{
4421 TIter next(fSlaves);
4422 TSlave *sl;
4423
4424 while ((sl = (TSlave*) next()))
4425 if (sl->GetInputHandler())
4426 sl->GetInputHandler()->Remove();
4427}
4428
4429////////////////////////////////////////////////////////////////////////////////
4430/// Get the active mergers count
4431
4433{
4434 if (!fMergers) return 0;
4435
4436 Int_t active_mergers = 0;
4437
4438 TIter mergers(fMergers);
4439 TMergerInfo *mi = 0;
4440 while ((mi = (TMergerInfo *)mergers())) {
4441 if (mi->IsActive()) active_mergers++;
4442 }
4443
4444 return active_mergers;
4445}
4446
4447////////////////////////////////////////////////////////////////////////////////
4448/// Create a new merger
4449
4451{
4452 PDB(kSubmerger, 2)
4453 Info("CreateMerger", "worker %s will be merger ", sl->GetOrdinal());
4454
4455 PDB(kSubmerger, 2) Info("CreateMerger","Begin");
4456
4457 if (port <= 0) {
4458 PDB(kSubmerger,2)
4459 Info("CreateMerger", "cannot create merger on port %d - exit", port);
4460 return kFALSE;
4461 }
4462
4463 Int_t workers = -1;
4464 if (!fMergersByHost) {
4465 Int_t mergersToCreate = fMergersCount - fMergers->GetSize();
4466 // Number of pure workers, which are not simply divisible by mergers
4467 Int_t rest = fWorkersToMerge % mergersToCreate;
4468 // We add one more worker for each of the first 'rest' mergers being established
4469 if (rest > 0 && fMergers->GetSize() < rest) {
4470 rest = 1;
4471 } else {
4472 rest = 0;
4473 }
4474 workers = (fWorkersToMerge / mergersToCreate) + rest;
4475 } else {
4476 Int_t workersOnHost = 0;
4477 for (Int_t i = 0; i < fActiveSlaves->GetSize(); i++) {
4478 if(!strcmp(sl->GetName(), fActiveSlaves->At(i)->GetName())) workersOnHost++;
4479 }
4480 workers = workersOnHost - 1;
4481 }
4482
4483 TString msg;
4484 msg.Form("worker %s on host %s will be merger for %d additional workers", sl->GetOrdinal(), sl->GetName(), workers);
4485
4486 if (gProofServ) {
4488 } else {
4489 Printf("%s",msg.Data());
4490 }
4491 TMergerInfo * merger = new TMergerInfo(sl, port, workers);
4492
4493 TMessage bemerger(kPROOF_SUBMERGER);
4494 bemerger << Int_t(kBeMerger);
4495 bemerger << fMergers->GetSize();
4496 bemerger << workers;
4497 sl->GetSocket()->Send(bemerger);
4498
4499 PDB(kSubmerger,2) Info("CreateMerger",
4500 "merger #%d (port: %d) for %d workers started",
4501 fMergers->GetSize(), port, workers);
4502
4503 fMergers->Add(merger);
4504 fWorkersToMerge = fWorkersToMerge - workers;
4505
4506 fRedirectNext = workers / 2;
4507
4508 PDB(kSubmerger, 2) Info("CreateMerger", "exit");
4509 return kTRUE;
4510}
4511
4512////////////////////////////////////////////////////////////////////////////////
4513/// Add a bad slave server to the bad slave list and remove it from
4514/// the active list and from the two monitor objects. Assume that the work
4515/// done by this worker was lost and ask packerizer to reassign it.
4516
4517void TProof::MarkBad(TSlave *wrk, const char *reason)
4518{
4519 std::lock_guard<std::recursive_mutex> lock(fCloseMutex);
4520
4521 // We may have been invalidated in the meanwhile: nothing to do in such a case
4522 if (!IsValid()) return;
4523
4524 if (!wrk) {
4525 Error("MarkBad", "worker instance undefined: protocol error? ");
4526 return;
4527 }
4528
4529 // Local URL
4530 static TString thisurl;
4531 if (thisurl.IsNull()) {
4532 if (IsMaster()) {
4533 Int_t port = gEnv->GetValue("ProofServ.XpdPort",-1);
4534 thisurl = TUrl(gSystem->HostName()).GetHostFQDN();
4535 if (port > 0) thisurl += TString::Format(":%d", port);
4536 } else {
4537 thisurl.Form("%s@%s:%d", fUrl.GetUser(), fUrl.GetHost(), fUrl.GetPort());
4538 }
4539 }
4540
4541 if (!reason || (strcmp(reason, kPROOF_TerminateWorker) && strcmp(reason, kPROOF_WorkerIdleTO))) {
4542 // Message for notification
4543 const char *mastertype = (gProofServ && gProofServ->IsTopMaster()) ? "top master" : "master";
4544 TString src = IsMaster() ? Form("%s at %s", mastertype, thisurl.Data()) : "local session";
4545 TString msg;
4546 msg.Form("\n +++ Message from %s : marking %s:%d (%s) as bad\n +++ Reason: %s",
4547 src.Data(), wrk->GetName(), wrk->GetPort(), wrk->GetOrdinal(),
4548 (reason && strlen(reason)) ? reason : "unknown");
4549 Info("MarkBad", "%s", msg.Data());
4550 // Notify one level up, if the case
4551 // Add some hint for diagnostics
4552 if (gProofServ) {
4553 msg += TString::Format("\n\n +++ Most likely your code crashed on worker %s at %s:%d.\n",
4554 wrk->GetOrdinal(), wrk->GetName(), wrk->GetPort());
4555 } else {
4556 msg += TString::Format("\n\n +++ Most likely your code crashed\n");
4557 }
4558 msg += TString::Format(" +++ Please check the session logs for error messages either using\n");
4559 msg += TString::Format(" +++ the 'Show logs' button or executing\n");
4560 msg += TString::Format(" +++\n");
4561 if (gProofServ) {
4562 msg += TString::Format(" +++ root [] TProof::Mgr(\"%s\")->GetSessionLogs()->"
4563 "Display(\"%s\",0)\n\n", thisurl.Data(), wrk->GetOrdinal());
4565 } else {
4566 msg += TString::Format(" +++ root [] TProof::Mgr(\"%s\")->GetSessionLogs()->"
4567 "Display(\"*\")\n\n", thisurl.Data());
4568 Printf("%s", msg.Data());
4569 }
4570 } else if (reason) {
4571 if (gDebug > 0 && strcmp(reason, kPROOF_WorkerIdleTO)) {
4572 Info("MarkBad", "worker %s at %s:%d asked to terminate",
4573 wrk->GetOrdinal(), wrk->GetName(), wrk->GetPort());
4574 }
4575 }
4576
4577 if (IsMaster() && reason) {
4578 if (strcmp(reason, kPROOF_TerminateWorker)) {
4579 // if the reason was not a planned termination
4580 TList *listOfMissingFiles = 0;
4581 if (!(listOfMissingFiles = (TList *)GetOutput("MissingFiles"))) {
4582 listOfMissingFiles = new TList();
4583 listOfMissingFiles->SetName("MissingFiles");
4584 if (fPlayer)
4585 fPlayer->AddOutputObject(listOfMissingFiles);
4586 }
4587 // If a query is being processed, assume that the work done by
4588 // the worker was lost and needs to be reassigned.
4589 TVirtualPacketizer *packetizer = fPlayer ? fPlayer->GetPacketizer() : 0;
4590 if (packetizer) {
4591 // the worker was lost so do resubmit the packets
4592 packetizer->MarkBad(wrk, 0, &listOfMissingFiles);
4593 }
4594 } else {
4595 // Tell the coordinator that we are gone
4596 if (gProofServ) {
4597 TString ord(wrk->GetOrdinal());
4598 Int_t id = ord.Last('.');
4599 if (id != kNPOS) ord.Remove(0, id+1);
4601 }
4602 }
4603 } else if (TestBit(TProof::kIsClient) && reason && !strcmp(reason, kPROOF_WorkerIdleTO)) {
4604 // We are invalid after this
4605 fValid = kFALSE;
4606 }
4607
4608 fActiveSlaves->Remove(wrk);
4610
4611 fAllMonitor->Remove(wrk->GetSocket());
4613
4615
4616 if (IsMaster()) {
4617 if (reason && !strcmp(reason, kPROOF_TerminateWorker)) {
4618 // if the reason was a planned termination then delete the worker and
4619 // remove it from all the lists
4620 fSlaves->Remove(wrk);
4621 fBadSlaves->Remove(wrk);
4622 fActiveSlaves->Remove(wrk);
4623 fInactiveSlaves->Remove(wrk);
4624 fUniqueSlaves->Remove(wrk);
4627
4628 // we add it to the list of terminated slave infos instead, so that it
4629 // stays available in the .workers persistent file
4630 TSlaveInfo *si = new TSlaveInfo(
4631 wrk->GetOrdinal(),
4632 Form("%s@%s:%d", wrk->GetUser(), wrk->GetName(), wrk->GetPort()),
4633 0, "", wrk->GetWorkDir());
4635 else delete si;
4636
4637 delete wrk;
4638 } else {
4639 fBadSlaves->Add(wrk);
4640 fActiveSlaves->Remove(wrk);
4641 fUniqueSlaves->Remove(wrk);
4645 wrk->Close();
4646 // Update the mergers count, if needed
4647 if (fMergersSet) {
4648 Int_t mergersCount = -1;
4649 TParameter<Int_t> *mc = dynamic_cast<TParameter<Int_t> *>(GetParameter("PROOF_UseMergers"));
4650 if (mc) mergersCount = mc->GetVal(); // Value set by user
4651 // Mergers count is set dynamically: recalculate it
4652 if (mergersCount == 0) {
4654 if (activeWorkers > 1) {
4655 fMergersCount = TMath::Nint(TMath::Sqrt(activeWorkers));
4656 if (activeWorkers / fMergersCount < 2)
4657 fMergersCount = (Int_t) TMath::Sqrt(activeWorkers);
4658 }
4659 }
4660 }
4661 }
4662
4663 // Update session workers files
4665 } else {
4666 // On clients the proof session should be removed from the lists
4667 // and deleted, since it is not valid anymore
4668 fSlaves->Remove(wrk);
4669 if (fManager)
4670 fManager->DiscardSession(this);
4671 }
4672}
4673
4674////////////////////////////////////////////////////////////////////////////////
4675/// Add slave with socket s to the bad slave list and remove if from
4676/// the active list and from the two monitor objects.
4677
4678void TProof::MarkBad(TSocket *s, const char *reason)
4679{
4680 std::lock_guard<std::recursive_mutex> lock(fCloseMutex);
4681
4682 // We may have been invalidated in the meanwhile: nothing to do in such a case
4683 if (!IsValid()) return;
4684
4685 TSlave *wrk = FindSlave(s);
4686 MarkBad(wrk, reason);
4687}
4688
4689////////////////////////////////////////////////////////////////////////////////
4690/// Ask an active worker 'wrk' to terminate, i.e. to shutdown
4691
4693{
4694 if (!wrk) {
4695 Warning("TerminateWorker", "worker instance undefined: protocol error? ");
4696 return;
4697 }
4698
4699 // Send stop message