Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
TProof.cxx
Go to the documentation of this file.
1// @(#)root/proof:$Id: a2a50e759072c37ccbc65ecbcce735a76de86e95 $
2// Author: Fons Rademakers 13/02/97
3
4/*************************************************************************
5 * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11/**
12 \defgroup proof PROOF
13
14 Classes defining the Parallel ROOT Facility, PROOF, a framework for parallel analysis of ROOT TTrees.
15
16*/
17
18/**
19 \defgroup proofkernel PROOF kernel Libraries
20 \ingroup proof
21
22 The PROOF kernel libraries (libProof, libProofPlayer, libProofDraw) contain the classes defining
23 the kernel of the PROOF facility, i.e. the protocol and the utilities to steer data processing
24 and handling of results.
25
26*/
27
28/** \class TProof
29\ingroup proofkernel
30
31This class controls a Parallel ROOT Facility, PROOF, cluster.
32It fires the worker servers, it keeps track of how many workers are
33running, it keeps track of the workers running status, it broadcasts
34messages to all workers, it collects results, etc.
35
36*/
37
38#include <stdlib.h>
39#include <fcntl.h>
40#include <errno.h>
41#ifdef WIN32
42# include <io.h>
43# include <sys/stat.h>
44# include <sys/types.h>
45# include "snprintf.h"
46#else
47# include <unistd.h>
48#endif
49#include <vector>
50
51#include "RConfigure.h"
52#include "Riostream.h"
53#include "Getline.h"
54#include "TBrowser.h"
55#include "TChain.h"
56#include "TCondor.h"
57#include "TDSet.h"
58#include "TError.h"
59#include "TEnv.h"
60#include "TEntryList.h"
61#include "TEventList.h"
62#include "TFile.h"
63#include "TFileInfo.h"
64#include "TFTP.h"
65#include "THashList.h"
66#include "TInterpreter.h"
67#include "TKey.h"
68#include "TMap.h"
69#include "TMath.h"
70#include "TMessage.h"
71#include "TMonitor.h"
72#include "TObjArray.h"
73#include "TObjString.h"
74#include "TParameter.h"
75#include "TProof.h"
76#include "TProofNodeInfo.h"
77#include "TProofOutputFile.h"
78#include "TVirtualProofPlayer.h"
79#include "TVirtualPacketizer.h"
80#include "TProofServ.h"
81#include "TPluginManager.h"
82#include "TQueryResult.h"
83#include "TRandom.h"
84#include "TRegexp.h"
85#include "TROOT.h"
86#include "TSlave.h"
87#include "TSocket.h"
88#include "TSortedList.h"
89#include "TSystem.h"
90#include "TTree.h"
91#include "TUrl.h"
92#include "TFileCollection.h"
93#include "TDataSetManager.h"
94#include "TDataSetManagerFile.h"
95#include "TMacro.h"
96#include "TSelector.h"
97#include "TPRegexp.h"
98#include "TPackMgr.h"
99
100#include <mutex>
101
103
104// Rotating indicator
105char TProofMergePrg::fgCr[4] = {'-', '\\', '|', '/'};
106
107TList *TProof::fgProofEnvList = 0; // List of env vars for proofserv
108TPluginHandler *TProof::fgLogViewer = 0; // Log viewer handler
109
111
112//----- PROOF Interrupt signal handler -----------------------------------------
113////////////////////////////////////////////////////////////////////////////////
114/// TProof interrupt handler.
115
117{
118 if (!fProof->IsTty() || fProof->GetRemoteProtocol() < 22) {
119
120 // Cannot ask the user : abort any remote processing
122
123 } else {
124 // Real stop or request to switch to asynchronous?
125 const char *a = 0;
126 if (fProof->GetRemoteProtocol() < 22) {
127 a = Getline("\nSwitch to asynchronous mode not supported remotely:"
128 "\nEnter S/s to stop, Q/q to quit, any other key to continue: ");
129 } else {
130 a = Getline("\nEnter A/a to switch asynchronous, S/s to stop, Q/q to quit,"
131 " any other key to continue: ");
132 }
133 if (a[0] == 'Q' || a[0] == 'S' || a[0] == 'q' || a[0] == 's') {
134
135 Info("Notify","Processing interrupt signal ... %c", a[0]);
136
137 // Stop or abort any remote processing
138 Bool_t abort = (a[0] == 'Q' || a[0] == 'q') ? kTRUE : kFALSE;
139 fProof->StopProcess(abort);
140
141 } else if ((a[0] == 'A' || a[0] == 'a') && fProof->GetRemoteProtocol() >= 22) {
142 // Stop any remote processing
144 }
145 }
146
147 return kTRUE;
148}
149
150//----- Input handler for messages from TProofServ -----------------------------
151////////////////////////////////////////////////////////////////////////////////
152/// Constructor
153
155 : TFileHandler(s->GetDescriptor(),1),
156 fSocket(s), fProof(p)
157{
158}
159
160////////////////////////////////////////////////////////////////////////////////
161/// Handle input
162
164{
166 return kTRUE;
167}
168
169
170//------------------------------------------------------------------------------
171
173
174////////////////////////////////////////////////////////////////////////////////
175/// Used to sort slaveinfos by ordinal.
176
178{
179 if (!obj) return 1;
180
181 const TSlaveInfo *si = dynamic_cast<const TSlaveInfo*>(obj);
182
183 if (!si) return fOrdinal.CompareTo(obj->GetName());
184
185 const char *myord = GetOrdinal();
186 const char *otherord = si->GetOrdinal();
187 while (myord && otherord) {
188 Int_t myval = atoi(myord);
189 Int_t otherval = atoi(otherord);
190 if (myval < otherval) return 1;
191 if (myval > otherval) return -1;
192 myord = strchr(myord, '.');
193 if (myord) myord++;
194 otherord = strchr(otherord, '.');
195 if (otherord) otherord++;
196 }
197 if (myord) return -1;
198 if (otherord) return 1;
199 return 0;
200}
201
202////////////////////////////////////////////////////////////////////////////////
203/// Used to compare slaveinfos by ordinal.
204
206{
207 if (!obj) return kFALSE;
208 const TSlaveInfo *si = dynamic_cast<const TSlaveInfo*>(obj);
209 if (!si) return kFALSE;
210 return (strcmp(GetOrdinal(), si->GetOrdinal()) == 0);
211}
212
213////////////////////////////////////////////////////////////////////////////////
214/// Print slave info. If opt = "active" print only the active
215/// slaves, if opt="notactive" print only the not active slaves,
216/// if opt = "bad" print only the bad slaves, else
217/// print all slaves.
218
220{
221 TString stat = fStatus == kActive ? "active" :
222 fStatus == kBad ? "bad" :
223 "not active";
224
225 Bool_t newfmt = kFALSE;
226 TString oo(opt);
227 if (oo.Contains("N")) {
228 newfmt = kTRUE;
229 oo.ReplaceAll("N","");
230 }
231 if (oo == "active" && fStatus != kActive) return;
232 if (oo == "notactive" && fStatus != kNotActive) return;
233 if (oo == "bad" && fStatus != kBad) return;
234
235 if (newfmt) {
236 TString msd, si, datadir;
237 if (!(fMsd.IsNull())) msd.Form("| msd: %s ", fMsd.Data());
238 if (!(fDataDir.IsNull())) datadir.Form("| datadir: %s ", fDataDir.Data());
239 if (fSysInfo.fCpus > 0) {
240 si.Form("| %s, %d cores, %d MB ram", fHostName.Data(),
242 } else {
243 si.Form("| %s", fHostName.Data());
244 }
245 Printf("Worker: %9s %s %s%s| %s", fOrdinal.Data(), si.Data(), msd.Data(), datadir.Data(), stat.Data());
246
247 } else {
248 TString msd = fMsd.IsNull() ? "<null>" : fMsd.Data();
249
250 std::cout << "Slave: " << fOrdinal
251 << " hostname: " << fHostName
252 << " msd: " << msd
253 << " perf index: " << fPerfIndex
254 << " " << stat
255 << std::endl;
256 }
257}
258
259////////////////////////////////////////////////////////////////////////////////
260/// Setter for fSysInfo
261
263{
264 fSysInfo.fOS = si.fOS; // OS
265 fSysInfo.fModel = si.fModel; // computer model
266 fSysInfo.fCpuType = si.fCpuType; // type of cpu
267 fSysInfo.fCpus = si.fCpus; // number of cpus
268 fSysInfo.fCpuSpeed = si.fCpuSpeed; // cpu speed in MHz
269 fSysInfo.fBusSpeed = si.fBusSpeed; // bus speed in MHz
270 fSysInfo.fL2Cache = si.fL2Cache; // level 2 cache size in KB
271 fSysInfo.fPhysRam = si.fPhysRam; // Physical RAM
272}
273
275
276//------------------------------------------------------------------------------
277
278////////////////////////////////////////////////////////////////////////////////
279/// Destructor
280
282{
283 // Just delete the list, the objects are owned by other list
284 if (fWorkers) {
287 }
288}
289////////////////////////////////////////////////////////////////////////////////
290/// Increase number of already merged workers by 1
291
293{
295 Error("SetMergedWorker", "all workers have been already merged before!");
296 else
298}
299
300////////////////////////////////////////////////////////////////////////////////
301/// Add new worker to the list of workers to be merged by this merger
302
304{
305 if (!fWorkers)
306 fWorkers = new TList();
307 if (fWorkersToMerge == fWorkers->GetSize()) {
308 Error("AddWorker", "all workers have been already assigned to this merger");
309 return;
310 }
311 fWorkers->Add(sl);
312}
313
314////////////////////////////////////////////////////////////////////////////////
315/// Return if merger has already merged all workers, i.e. if it has finished its merging job
316
318{
320}
321
322////////////////////////////////////////////////////////////////////////////////
323/// Return if the determined number of workers has been already assigned to this merger
324
326{
327 if (!fWorkers)
328 return kFALSE;
329
330 return (fWorkers->GetSize() == fWorkersToMerge);
331}
332
333////////////////////////////////////////////////////////////////////////////////
334/// This a private API function.
335/// It checks whether the connection string contains a PoD cluster protocol.
336/// If it does, then the connection string will be changed to reflect
337/// a real PROOF connection string for a PROOF cluster managed by PoD.
338/// PoD: http://pod.gsi.de .
339/// Return -1 if the PoD request failed; return 0 otherwise.
340
341static Int_t PoDCheckUrl(TString *_cluster)
342{
343 if ( !_cluster )
344 return 0;
345
346 // trim spaces from both sides of the string
347 *_cluster = _cluster->Strip( TString::kBoth );
348 // PoD protocol string
349 const TString pod_prot("pod");
350
351 // URL test
352 // TODO: The URL test is to support remote PoD servers (not managed by pod-remote)
353 TUrl url( _cluster->Data() );
354 if( pod_prot.CompareTo(url.GetProtocol(), TString::kIgnoreCase) )
355 return 0;
356
357 // PoD cluster is used
358 // call pod-info in a batch mode (-b).
359 // pod-info will find either a local PoD cluster or
360 // a remote one, manged by pod-remote.
361 *_cluster = gSystem->GetFromPipe("pod-info -c -b");
362 if( 0 == _cluster->Length() ) {
363 Error("PoDCheckUrl", "PoD server is not running");
364 return -1;
365 }
366 return 0;
367}
368
369////////////////////////////////////////////////////////////////////////////////
370/// Create a PROOF environment. Starting PROOF involves either connecting
371/// to a master server, which in turn will start a set of slave servers, or
372/// directly starting as master server (if master = ""). Masterurl is of
373/// the form: [proof[s]://]host[:port]. Conffile is the name of the config
374/// file describing the remote PROOF cluster (this argument alows you to
375/// describe different cluster configurations).
376/// The default is proof.conf. Confdir is the directory where the config
377/// file and other PROOF related files are (like motd and noproof files).
378/// Loglevel is the log level (default = 1). User specified custom config
379/// files will be first looked for in $HOME/.conffile.
380
381TProof::TProof(const char *masterurl, const char *conffile, const char *confdir,
382 Int_t loglevel, const char *alias, TProofMgr *mgr)
383 : fUrl(masterurl)
384{
385 // Default initializations
386 InitMembers();
387
388 // This may be needed during init
389 fManager = mgr;
390
391 // Default server type
393
394 // Default query mode
396
397 // Parse the main URL, adjusting the missing fields and setting the relevant
398 // bits
401
402 // Protocol and Host
403 if (!masterurl || strlen(masterurl) <= 0) {
404 fUrl.SetProtocol("proof");
405 fUrl.SetHost("__master__");
406 } else if (!(strstr(masterurl, "://"))) {
407 fUrl.SetProtocol("proof");
408 }
409 // Port
410 if (fUrl.GetPort() == TUrl(" ").GetPort())
411 fUrl.SetPort(TUrl("proof:// ").GetPort());
412
413 // Make sure to store the FQDN, so to get a solid reference for subsequent checks
414 if (!strcmp(fUrl.GetHost(), "__master__"))
415 fMaster = fUrl.GetHost();
416 else if (!strlen(fUrl.GetHost()))
418 else
420
421 // Server type
422 if (strlen(fUrl.GetOptions()) > 0) {
423 TString opts(fUrl.GetOptions());
424 if (!(strncmp(fUrl.GetOptions(),"std",3))) {
426 opts.Remove(0,3);
427 fUrl.SetOptions(opts.Data());
428 } else if (!(strncmp(fUrl.GetOptions(),"lite",4))) {
430 opts.Remove(0,4);
431 fUrl.SetOptions(opts.Data());
432 }
433 }
434
435 // Instance type
439 if (fMaster == "__master__") {
443 } else if (fMaster == "prooflite") {
444 // Client and master are merged
447 }
448 // Flag that we are a client
450 if (!gSystem->Getenv("ROOTPROOFCLIENT")) gSystem->Setenv("ROOTPROOFCLIENT","");
451
452 Init(masterurl, conffile, confdir, loglevel, alias);
453
454 // If the user was not set, get it from the master
455 if (strlen(fUrl.GetUser()) <= 0) {
456 TString usr, emsg;
457 if (Exec("gProofServ->GetUser()", "0", kTRUE) == 0) {
458 TObjString *os = fMacroLog.GetLineWith("const char");
459 if (os) {
460 Ssiz_t fst = os->GetString().First('\"');
461 Ssiz_t lst = os->GetString().Last('\"');
462 usr = os->GetString()(fst+1, lst-fst-1);
463 } else {
464 emsg = "could not find 'const char *' string in macro log";
465 }
466 } else {
467 emsg = "could not retrieve user info";
468 }
469 if (!emsg.IsNull()) {
470 // Get user logon name
472 if (pw) {
473 usr = pw->fUser;
474 delete pw;
475 }
476 Warning("TProof", "%s: using local default %s", emsg.Data(), usr.Data());
477 }
478 // Set the user name in the main URL
479 fUrl.SetUser(usr.Data());
480 }
481
482 // If called by a manager, make sure it stays in last position
483 // for cleaning
484 if (mgr) {
486 gROOT->GetListOfSockets()->Remove(mgr);
487 gROOT->GetListOfSockets()->Add(mgr);
488 }
489
490 // Old-style server type: we add this to the list and set the global pointer
492 if (!gROOT->GetListOfProofs()->FindObject(this))
493 gROOT->GetListOfProofs()->Add(this);
494
495 // Still needed by the packetizers: needs to be changed
496 gProof = this;
497}
498
499////////////////////////////////////////////////////////////////////////////////
500/// Protected constructor to be used by classes deriving from TProof
501/// (they have to call Init themselves and override StartSlaves
502/// appropriately).
503///
504/// This constructor simply closes any previous gProof and sets gProof
505/// to this instance.
506
507TProof::TProof() : fUrl(""), fServType(TProofMgr::kXProofd)
508{
509 // Default initializations
510 InitMembers();
511
512 if (!gROOT->GetListOfProofs()->FindObject(this))
513 gROOT->GetListOfProofs()->Add(this);
514
515 gProof = this;
516}
517
518////////////////////////////////////////////////////////////////////////////////
519/// Default initializations
520
522{
523 fValid = kFALSE;
524 fTty = kFALSE;
525 fRecvMessages = 0;
526 fSlaveInfo = 0;
531 fActiveSlaves = 0;
532 fInactiveSlaves = 0;
533 fUniqueSlaves = 0;
536 fActiveMonitor = 0;
537 fUniqueMonitor = 0;
539 fCurrentMonitor = 0;
540 fBytesRead = 0;
541 fRealTime = 0;
542 fCpuTime = 0;
543 fIntHandler = 0;
544 fProgressDialog = 0;
547 fPlayer = 0;
548 fFeedback = 0;
549 fChains = 0;
550 fDSet = 0;
551 fNotIdle = 0;
552 fSync = kTRUE;
556 fLogFileW = 0;
557 fLogFileR = 0;
560 fMacroLog.SetName("ProofLogMacro");
561
562 fWaitingSlaves = 0;
563 fQueries = 0;
564 fOtherQueries = 0;
565 fDrawQueries = 0;
566 fMaxDrawQueries = 1;
567 fSeqNum = 0;
568
569 fSessionID = -1;
571
572 fPackMgr = 0;
574
575 fInputData = 0;
576
577 fPrintProgress = 0;
578
579 fLoadedMacros = 0;
580
581 fProtocol = -1;
582 fSlaves = 0;
584 fBadSlaves = 0;
585 fAllMonitor = 0;
587 fBytesReady = 0;
588 fTotalBytes = 0;
591 fRunningDSets = 0;
592
593 fCollectTimeout = -1;
594
595 fManager = 0;
598
601 fMergers = 0;
602 fMergersCount = -1;
604 fWorkersToMerge = 0;
606
607 fPerfTree = "";
608
610
611 fSelector = 0;
612
613 fPrepTime = 0.;
614
615 // Check if the user defined a list of environment variables to send over:
616 // include them into the dedicated list
617 if (gSystem->Getenv("PROOF_ENVVARS")) {
618 TString envs(gSystem->Getenv("PROOF_ENVVARS")), env, envsfound;
619 Int_t from = 0;
620 while (envs.Tokenize(env, from, ",")) {
621 if (!env.IsNull()) {
622 if (!gSystem->Getenv(env)) {
623 Warning("Init", "request for sending over undefined environemnt variable '%s' - ignoring", env.Data());
624 } else {
625 if (!envsfound.IsNull()) envsfound += ",";
626 envsfound += env;
628 TProof::AddEnvVar(env, gSystem->Getenv(env));
629 }
630 }
631 }
632 if (envsfound.IsNull()) {
633 Warning("Init", "none of the requested env variables were found: '%s'", envs.Data());
634 } else {
635 Info("Init", "the following environment variables have been added to the list to be sent to the nodes: '%s'", envsfound.Data());
636 }
637 }
638
639 // Done
640 return;
641}
642
643////////////////////////////////////////////////////////////////////////////////
644/// Clean up PROOF environment.
645
647{
648 if (fChains) {
649 while (TChain *chain = dynamic_cast<TChain*> (fChains->First()) ) {
650 // remove "chain" from list
651 chain->SetProof(0);
652 RemoveChain(chain);
653 }
654 }
655
656 // remove links to packages enabled on the client
658 // iterate over all packages
660 if (epl) {
661 TIter nxp(epl);
662 while (TObjString *pck = (TObjString *)(nxp())) {
663 FileStat_t stat;
664 if (gSystem->GetPathInfo(pck->String(), stat) == 0) {
665 // check if symlink, if so unlink
666 // NOTE: GetPathInfo() returns 1 in case of symlink that does not point to
667 // existing file or to a directory, but if fIsLink is true the symlink exists
668 if (stat.fIsLink)
669 gSystem->Unlink(pck->String());
670 }
671 }
672 epl->Delete();
673 delete epl;
674 }
675 }
676
677 Close();
704 if (fWrksOutputReady) {
706 delete fWrksOutputReady;
707 }
708 if (fQueries) {
709 fQueries->Delete();
710 delete fQueries;
711 }
712
713 // remove file with redirected logs
715 if (fLogFileR)
716 fclose(fLogFileR);
717 if (fLogFileW)
718 fclose(fLogFileW);
719 if (fLogFileName.Length() > 0)
721 }
722
723 // Remove for the global list
724 gROOT->GetListOfProofs()->Remove(this);
725 // ... and from the manager list
726 if (fManager && fManager->IsValid())
728
729 if (gProof && gProof == this) {
730 // Set previous as default
731 TIter pvp(gROOT->GetListOfProofs(), kIterBackward);
732 while ((gProof = (TProof *)pvp())) {
733 if (gProof->InheritsFrom(TProof::Class()))
734 break;
735 }
736 }
737
738 // For those interested in our destruction ...
739 Emit("~TProof()");
740 Emit("CloseWindow()");
741}
742
743////////////////////////////////////////////////////////////////////////////////
744/// Start the PROOF environment. Starting PROOF involves either connecting
745/// to a master server, which in turn will start a set of slave servers, or
746/// directly starting as master server (if master = ""). For a description
747/// of the arguments see the TProof ctor. Returns the number of started
748/// master or slave servers, returns 0 in case of error, in which case
749/// fValid remains false.
750
751Int_t TProof::Init(const char *, const char *conffile,
752 const char *confdir, Int_t loglevel, const char *alias)
753{
755
756 fValid = kFALSE;
757
758 // Connected to terminal?
759 fTty = (isatty(0) == 0 || isatty(1) == 0) ? kFALSE : kTRUE;
760
761 // If in attach mode, options is filled with additional info
762 Bool_t attach = kFALSE;
763 if (strlen(fUrl.GetOptions()) > 0) {
764 attach = kTRUE;
765 // A flag from the GUI
766 TString opts = fUrl.GetOptions();
767 if (opts.Contains("GUI")) {
769 opts.Remove(opts.Index("GUI"));
770 fUrl.SetOptions(opts);
771 }
772 }
773
775 // Fill default conf file and conf dir
776 if (!conffile || !conffile[0])
778 if (!confdir || !confdir[0])
780 // The group; the client receives it in the kPROOF_SESSIONTAG message
782 } else {
783 fConfDir = confdir;
784 fConfFile = conffile;
785 }
786
787 // Analysise the conffile field
788 if (fConfFile.Contains("workers=0")) fConfFile.ReplaceAll("workers=0", "masteronly");
790
792 fLogLevel = loglevel;
795 fImage = fMasterServ ? "" : "<local>";
796 fIntHandler = 0;
797 fStatus = 0;
798 fRecvMessages = new TList;
800 fSlaveInfo = 0;
801 fChains = new TList;
804 fRunningDSets = 0;
806 fInputData = 0;
808 fPrintProgress = 0;
809
810 // Timeout for some collect actions
811 fCollectTimeout = gEnv->GetValue("Proof.CollectTimeout", -1);
812
813 // Should the workers be started dynamically; default: no
814 fDynamicStartup = gEnv->GetValue("Proof.DynamicStartup", kFALSE);
815
816 // Default entry point for the data pool is the master
818 fDataPoolUrl.Form("root://%s", fMaster.Data());
819 else
820 fDataPoolUrl = "";
821
822 fProgressDialog = 0;
824
825 // Default alias is the master name
826 TString al = (alias) ? alias : fMaster.Data();
827 SetAlias(al);
828
829 // Client logging of messages from the master and slaves
832 fLogFileName.Form("%s/ProofLog_%d", gSystem->TempDirectory(), gSystem->GetPid());
833 if ((fLogFileW = fopen(fLogFileName, "w")) == 0)
834 Error("Init", "could not create temporary logfile");
835 if ((fLogFileR = fopen(fLogFileName, "r")) == 0)
836 Error("Init", "could not open temp logfile for reading");
837 }
839
840 // Status of cluster
841 fNotIdle = 0;
842 // Query type
843 fSync = (attach) ? kFALSE : kTRUE;
844 // Not enqueued
846
847 // Counters
848 fBytesRead = 0;
849 fRealTime = 0;
850 fCpuTime = 0;
851
852 // List of queries
853 fQueries = 0;
854 fOtherQueries = 0;
855 fDrawQueries = 0;
856 fMaxDrawQueries = 1;
857 fSeqNum = 0;
858
859 // Remote ID of the session
860 fSessionID = -1;
861
862 // Part of active query
863 fWaitingSlaves = 0;
864
865 // Make remote PROOF player
866 fPlayer = 0;
867 MakePlayer();
868
869 fFeedback = new TList;
871 fFeedback->SetName("FeedbackList");
873
874 // sort slaves by descending performance index
876 fActiveSlaves = new TList;
878 fUniqueSlaves = new TList;
881 fBadSlaves = new TList;
882 fAllMonitor = new TMonitor;
886 fCurrentMonitor = 0;
887
890
891 fLoadedMacros = 0;
892 fPackMgr = 0;
893
894 // Enable optimized sending of streamer infos to use embedded backward/forward
895 // compatibility support between different ROOT versions and different versions of
896 // users classes
897 Bool_t enableSchemaEvolution = gEnv->GetValue("Proof.SchemaEvolution",1);
898 if (enableSchemaEvolution) {
900 } else {
901 Info("TProof", "automatic schema evolution in TMessage explicitly disabled");
902 }
903
904 if (IsMaster()) {
905 // to make UploadPackage() method work on the master as well.
907 } else {
908
909 TString sandbox;
910 if (GetSandbox(sandbox, kTRUE) != 0) {
911 Error("Init", "failure asserting sandbox directory %s", sandbox.Data());
912 return 0;
913 }
914
915 // Package Dir
916 TString packdir = gEnv->GetValue("Proof.PackageDir", "");
917 if (packdir.IsNull())
918 packdir.Form("%s/%s", sandbox.Data(), kPROOF_PackDir);
919 if (AssertPath(packdir, kTRUE) != 0) {
920 Error("Init", "failure asserting directory %s", packdir.Data());
921 return 0;
922 }
923 fPackMgr = new TPackMgr(packdir);
924 if (gDebug > 0)
925 Info("Init", "package directory set to %s", packdir.Data());
926 }
927
928 if (!IsMaster()) {
929 // List of directories where to look for global packages
930 TString globpack = gEnv->GetValue("Proof.GlobalPackageDirs","");
932 Int_t nglb = TPackMgr::RegisterGlobalPath(globpack);
933 if (gDebug > 0)
934 Info("Init", " %d global package directories registered", nglb);
935 }
936
937 // Master may want dynamic startup
938 if (fDynamicStartup) {
939 if (!IsMaster()) {
940 // If on client - start the master
941 if (!StartSlaves(attach))
942 return 0;
943 }
944 } else {
945
946 // Master Only mode (for operations requiring only the master, e.g. dataset browsing,
947 // result retrieving, ...)
948 Bool_t masterOnly = gEnv->GetValue("Proof.MasterOnly", kFALSE);
949 if (!IsMaster() || !masterOnly) {
950 // Start slaves (the old, static, per-session way)
951 if (!StartSlaves(attach))
952 return 0;
953 // Client: Is Master in dynamic startup mode?
954 if (!IsMaster()) {
955 Int_t dyn = 0;
956 GetRC("Proof.DynamicStartup", dyn);
957 if (dyn != 0) fDynamicStartup = kTRUE;
958 }
959 }
960 }
961 // we are now properly initialized
962 fValid = kTRUE;
963
964 // De-activate monitor (will be activated in Collect)
966
967 // By default go into parallel mode
968 Int_t nwrk = GetRemoteProtocol() > 35 ? -1 : 9999;
969 TNamed *n = 0;
970 if (TProof::GetEnvVars() &&
971 (n = (TNamed *) TProof::GetEnvVars()->FindObject("PROOF_NWORKERS"))) {
972 TString s(n->GetTitle());
973 if (s.IsDigit()) nwrk = s.Atoi();
974 }
975 GoParallel(nwrk, attach);
976
977 // Send relevant initial state to slaves
978 if (!attach)
980 else if (!IsIdle())
981 // redirect log
983
984 // Done at this point, the alias will be communicated to the coordinator, if any
986 SetAlias(al);
987
989
990 if (IsValid()) {
991
992 // Activate input handler
994
996 gROOT->GetListOfSockets()->Add(this);
997 }
998
999 AskParallel();
1000
1001 return fActiveSlaves->GetSize();
1002}
1003
1004////////////////////////////////////////////////////////////////////////////////
1005/// Set the sandbox path from ' Proof.Sandbox' or the alternative var 'rc'.
1006/// Use the existing setting or the default if nothing is found.
1007/// If 'assert' is kTRUE, make also sure that the path exists.
1008/// Return 0 on success, -1 on failure
1009
1010Int_t TProof::GetSandbox(TString &sb, Bool_t assert, const char *rc)
1011{
1012 // Get it from 'rc', if defined
1013 if (rc && strlen(rc)) sb = gEnv->GetValue(rc, sb);
1014 // Or use the default 'rc'
1015 if (sb.IsNull()) sb = gEnv->GetValue("Proof.Sandbox", "");
1016 // If nothing found , use the default
1017 if (sb.IsNull()) sb.Form("~/%s", kPROOF_WorkDir);
1018 // Expand special settings
1019 if (sb == ".") {
1020 sb = gSystem->pwd();
1021 } else if (sb == "..") {
1022 sb = gSystem->GetDirName(gSystem->pwd());
1023 }
1025
1026 // Assert the path, if required
1027 if (assert && AssertPath(sb, kTRUE) != 0) return -1;
1028 // Done
1029 return 0;
1030}
1031
1032////////////////////////////////////////////////////////////////////////////////
1033/// The config file field may contain special instructions which need to be
1034/// parsed at the beginning, e.g. for debug runs with valgrind.
1035/// Several options can be given separated by a ','
1036
1037void TProof::ParseConfigField(const char *config)
1038{
1039 TString sconf(config), opt;
1040 Ssiz_t from = 0;
1041 Bool_t cpuPin = kFALSE;
1042
1043 // Analysise the field
1044 const char *cq = (IsLite()) ? "\"" : "";
1045 while (sconf.Tokenize(opt, from, ",")) {
1046 if (opt.IsNull()) continue;
1047
1048 if (opt.BeginsWith("valgrind")) {
1049 // Any existing valgrind setting? User can give full settings, which we fully respect,
1050 // or pass additional options for valgrind by prefixing 'valgrind_opts:'. For example,
1051 // TProof::AddEnvVar("PROOF_MASTER_WRAPPERCMD", "valgrind_opts:--time-stamp --leak-check=full"
1052 // will add option "--time-stamp --leak-check=full" to our default options
1053 TString mst, top, sub, wrk, all;
1054 TList *envs = fgProofEnvList;
1055 TNamed *n = 0;
1056 if (envs) {
1057 if ((n = (TNamed *) envs->FindObject("PROOF_WRAPPERCMD")))
1058 all = n->GetTitle();
1059 if ((n = (TNamed *) envs->FindObject("PROOF_MASTER_WRAPPERCMD")))
1060 mst = n->GetTitle();
1061 if ((n = (TNamed *) envs->FindObject("PROOF_TOPMASTER_WRAPPERCMD")))
1062 top = n->GetTitle();
1063 if ((n = (TNamed *) envs->FindObject("PROOF_SUBMASTER_WRAPPERCMD")))
1064 sub = n->GetTitle();
1065 if ((n = (TNamed *) envs->FindObject("PROOF_SLAVE_WRAPPERCMD")))
1066 wrk = n->GetTitle();
1067 }
1068 if (all != "" && mst == "") mst = all;
1069 if (all != "" && top == "") top = all;
1070 if (all != "" && sub == "") sub = all;
1071 if (all != "" && wrk == "") wrk = all;
1072 if (all != "" && all.BeginsWith("valgrind_opts:")) {
1073 // The field is used to add an option Reset the setting
1074 Info("ParseConfigField","valgrind run: resetting 'PROOF_WRAPPERCMD':"
1075 " must be set again for next run , if any");
1076 TProof::DelEnvVar("PROOF_WRAPPERCMD");
1077 }
1078 TString var, cmd;
1079 cmd.Form("%svalgrind -v --suppressions=<rootsys>/etc/valgrind-root.supp", cq);
1080 TString mstlab("NO"), wrklab("NO");
1081 Bool_t doMaster = (opt == "valgrind" || (opt.Contains("master") &&
1082 !opt.Contains("topmaster") && !opt.Contains("submaster")))
1083 ? kTRUE : kFALSE;
1084 if (doMaster) {
1085 if (!IsLite()) {
1086 // Check if we have to add a var
1087 if (mst == "" || mst.BeginsWith("valgrind_opts:")) {
1088 mst.ReplaceAll("valgrind_opts:","");
1089 var.Form("%s --log-file=<logfilemst>.valgrind.log %s", cmd.Data(), mst.Data());
1090 TProof::AddEnvVar("PROOF_MASTER_WRAPPERCMD", var);
1091 mstlab = "YES";
1092 } else if (mst != "") {
1093 mstlab = "YES";
1094 }
1095 } else {
1096 if (opt.Contains("master")) {
1097 Warning("ParseConfigField",
1098 "master valgrinding does not make sense for PROOF-Lite: ignoring");
1099 opt.ReplaceAll("master", "");
1100 if (!opt.Contains("workers")) return;
1101 }
1102 if (opt == "valgrind" || opt == "valgrind=") opt = "valgrind=workers";
1103 }
1104 }
1105 if (opt.Contains("topmaster")) {
1106 // Check if we have to add a var
1107 if (top == "" || top.BeginsWith("valgrind_opts:")) {
1108 top.ReplaceAll("valgrind_opts:","");
1109 var.Form("%s --log-file=<logfilemst>.valgrind.log %s", cmd.Data(), top.Data());
1110 TProof::AddEnvVar("PROOF_TOPMASTER_WRAPPERCMD", var);
1111 mstlab = "YES";
1112 } else if (top != "") {
1113 mstlab = "YES";
1114 }
1115 }
1116 if (opt.Contains("submaster")) {
1117 // Check if we have to add a var
1118 if (sub == "" || sub.BeginsWith("valgrind_opts:")) {
1119 sub.ReplaceAll("valgrind_opts:","");
1120 var.Form("%s --log-file=<logfilemst>.valgrind.log %s", cmd.Data(), sub.Data());
1121 TProof::AddEnvVar("PROOF_SUBMASTER_WRAPPERCMD", var);
1122 mstlab = "YES";
1123 } else if (sub != "") {
1124 mstlab = "YES";
1125 }
1126 }
1127 if (opt.Contains("=workers") || opt.Contains("+workers")) {
1128 // Check if we have to add a var
1129 if (wrk == "" || wrk.BeginsWith("valgrind_opts:")) {
1130 wrk.ReplaceAll("valgrind_opts:","");
1131 var.Form("%s --log-file=<logfilewrk>.__valgrind__.log %s%s", cmd.Data(), wrk.Data(), cq);
1132 TProof::AddEnvVar("PROOF_SLAVE_WRAPPERCMD", var);
1133 TString nwrks("2");
1134 Int_t inw = opt.Index('#');
1135 if (inw != kNPOS) {
1136 nwrks = opt(inw+1, opt.Length());
1137 if (!nwrks.IsDigit()) nwrks = "2";
1138 }
1139 // Set the relevant variables
1140 if (!IsLite()) {
1141 TProof::AddEnvVar("PROOF_NWORKERS", nwrks);
1142 } else {
1143 gEnv->SetValue("ProofLite.Workers", nwrks.Atoi());
1144 }
1145 wrklab = nwrks;
1146 // Register the additional worker log in the session file
1147 // (for the master this is done automatically)
1148 TProof::AddEnvVar("PROOF_ADDITIONALLOG", "__valgrind__.log*");
1149 } else if (wrk != "") {
1150 wrklab = "ALL";
1151 }
1152 }
1153 // Increase the relevant timeouts
1154 if (!IsLite()) {
1155 TProof::AddEnvVar("PROOF_INTWAIT", "5000");
1156 gEnv->SetValue("Proof.SocketActivityTimeout", 6000);
1157 } else {
1158 gEnv->SetValue("ProofLite.StartupTimeOut", 5000);
1159 }
1160 // Warn for slowness
1161 Printf(" ");
1162 if (!IsLite()) {
1163 Printf(" ---> Starting a debug run with valgrind (master:%s, workers:%s)", mstlab.Data(), wrklab.Data());
1164 } else {
1165 Printf(" ---> Starting a debug run with valgrind (workers:%s)", wrklab.Data());
1166 }
1167 Printf(" ---> Please be patient: startup may be VERY slow ...");
1168 Printf(" ---> Logs will be available as special tags in the log window (from the progress dialog or TProof::LogViewer()) ");
1169 Printf(" ---> (Reminder: this debug run makes sense only if you are running a debug version of ROOT)");
1170 Printf(" ");
1171
1172 } else if (opt.BeginsWith("igprof-pp")) {
1173
1174 // IgProf profiling on master and worker. PROOF does not set the
1175 // environment for you: proper environment variables (like PATH and
1176 // LD_LIBRARY_PATH) should be set externally
1177
1178 Printf("*** Requested IgProf performance profiling ***");
1179 TString addLogExt = "__igprof.pp__.log";
1180 TString addLogFmt = "igprof -pk -pp -t proofserv.exe -o %s.%s";
1181 TString tmp;
1182
1183 if (IsLite()) {
1184 addLogFmt.Append("\"");
1185 addLogFmt.Prepend("\"");
1186 }
1187
1188 tmp.Form(addLogFmt.Data(), "<logfilemst>", addLogExt.Data());
1189 TProof::AddEnvVar("PROOF_MASTER_WRAPPERCMD", tmp.Data());
1190
1191 tmp.Form(addLogFmt.Data(), "<logfilewrk>", addLogExt.Data());
1192 TProof::AddEnvVar("PROOF_SLAVE_WRAPPERCMD", tmp.Data() );
1193
1194 TProof::AddEnvVar("PROOF_ADDITIONALLOG", addLogExt.Data());
1195
1196 } else if (opt.BeginsWith("cpupin=")) {
1197 // Enable CPU pinning. Takes as argument the list of processor IDs
1198 // that will be used in order. Processor IDs are numbered from 0,
1199 // use likwid to see how they are organized. A possible parameter
1200 // format would be:
1201 //
1202 // cpupin=3+4+0+9+10+22+7
1203 //
1204 // Only the specified processor IDs will be used in a round-robin
1205 // fashion, dealing with the fact that you can request more workers
1206 // than the number of processor IDs you have specified.
1207 //
1208 // To use all available processors in their order:
1209 //
1210 // cpupin=*
1211
1212 opt.Remove(0, 7);
1213
1214 // Remove any char which is neither a number nor a plus '+'
1215 for (Ssiz_t i=0; i<opt.Length(); i++) {
1216 Char_t c = opt[i];
1217 if ((c != '+') && ((c < '0') || (c > '9')))
1218 opt[i] = '_';
1219 }
1220 opt.ReplaceAll("_", "");
1221 TProof::AddEnvVar("PROOF_SLAVE_CPUPIN_ORDER", opt);
1222 cpuPin = kTRUE;
1223 } else if (opt.BeginsWith("workers=")) {
1224
1225 // Request for a given number of workers (within the max) or worker
1226 // startup combination:
1227 // workers=5 start max 5 workers (or less, if less are assigned)
1228 // workers=2x start max 2 workers per node (or less, if less are assigned)
1229 opt.ReplaceAll("workers=","");
1230 TProof::AddEnvVar("PROOF_NWORKERS", opt);
1231 }
1232 }
1233
1234 // In case of PROOF-Lite, enable CPU pinning when requested (Linux only)
1235 #ifdef R__LINUX
1236 if (IsLite() && cpuPin) {
1237 Printf("*** Requested CPU pinning ***");
1238 const TList *ev = GetEnvVars();
1239 const char *pinCmd = "taskset -c <cpupin>";
1240 TString val;
1241 TNamed *p;
1242 if (ev && (p = dynamic_cast<TNamed *>(ev->FindObject("PROOF_SLAVE_WRAPPERCMD")))) {
1243 val = p->GetTitle();
1244 val.Insert(val.Length()-1, " ");
1245 val.Insert(val.Length()-1, pinCmd);
1246 }
1247 else {
1248 val.Form("\"%s\"", pinCmd);
1249 }
1250 TProof::AddEnvVar("PROOF_SLAVE_WRAPPERCMD", val.Data());
1251 }
1252 #endif
1253}
1254
1255////////////////////////////////////////////////////////////////////////////////
1256/// Make sure that 'path' exists; if 'writable' is kTRUE, make also sure
1257/// that the path is writable
1258
1259Int_t TProof::AssertPath(const char *inpath, Bool_t writable)
1260{
1261 if (!inpath || strlen(inpath) <= 0) {
1262 Error("AssertPath", "undefined input path");
1263 return -1;
1264 }
1265
1266 TString path(inpath);
1267 gSystem->ExpandPathName(path);
1268
1269 if (gSystem->AccessPathName(path, kFileExists)) {
1270 if (gSystem->mkdir(path, kTRUE) != 0) {
1271 Error("AssertPath", "could not create path %s", path.Data());
1272 return -1;
1273 }
1274 }
1275 // It must be writable
1276 if (gSystem->AccessPathName(path, kWritePermission) && writable) {
1277 if (gSystem->Chmod(path, 0666) != 0) {
1278 Error("AssertPath", "could not make path %s writable", path.Data());
1279 return -1;
1280 }
1281 }
1282
1283 // Done
1284 return 0;
1285}
1286
1287////////////////////////////////////////////////////////////////////////////////
1288/// Set manager and schedule its destruction after this for clean
1289/// operations.
1290
1292{
1293 fManager = mgr;
1294
1295 if (mgr) {
1297 gROOT->GetListOfSockets()->Remove(mgr);
1298 gROOT->GetListOfSockets()->Add(mgr);
1299 }
1300}
1301
1302////////////////////////////////////////////////////////////////////////////////
1303/// Works on the master node only.
1304/// It starts workers on the machines in workerList and sets the paths,
1305/// packages and macros as on the master.
1306/// It is a subbstitute for StartSlaves(...)
1307/// The code is mostly the master part of StartSlaves,
1308/// with the parallel startup removed.
1309
1311{
1312 if (!IsMaster()) {
1313 Error("AddWorkers", "AddWorkers can only be called on the master!");
1314 return -1;
1315 }
1316
1317 if (!workerList || !(workerList->GetSize())) {
1318 Error("AddWorkers", "empty list of workers!");
1319 return -2;
1320 }
1321
1322 // Code taken from master part of StartSlaves with the parllel part removed
1323
1325 if (fImage.IsNull())
1327
1328 // Get all workers
1329 UInt_t nSlaves = workerList->GetSize();
1330 UInt_t nSlavesDone = 0;
1331 Int_t ord = 0;
1332
1333 // Loop over all new workers and start them (if we had already workers it means we are
1334 // increasing parallelism or that is not the first time we are called)
1335 Bool_t goMoreParallel = (fSlaves->GetEntries() > 0) ? kTRUE : kFALSE;
1336
1337 // A list of TSlave objects for workers that are being added
1338 TList *addedWorkers = new TList();
1339 if (!addedWorkers) {
1340 // This is needed to silence Coverity ...
1341 Error("AddWorkers", "cannot create new list for the workers to be added");
1342 return -2;
1343 }
1344 addedWorkers->SetOwner(kFALSE);
1345 TListIter next(workerList);
1346 TObject *to;
1347 TProofNodeInfo *worker;
1348 TSlaveInfo *dummysi = new TSlaveInfo();
1349 while ((to = next())) {
1350 // Get the next worker from the list
1351 worker = (TProofNodeInfo *)to;
1352
1353 // Read back worker node info
1354 const Char_t *image = worker->GetImage().Data();
1355 const Char_t *workdir = worker->GetWorkDir().Data();
1356 Int_t perfidx = worker->GetPerfIndex();
1357 Int_t sport = worker->GetPort();
1358 if (sport == -1)
1359 sport = fUrl.GetPort();
1360
1361 // Create worker server
1362 TString fullord;
1363 if (worker->GetOrdinal().Length() > 0) {
1364 fullord.Form("%s.%s", gProofServ->GetOrdinal(), worker->GetOrdinal().Data());
1365 } else {
1366 fullord.Form("%s.%d", gProofServ->GetOrdinal(), ord);
1367 }
1368
1369 // Remove worker from the list of workers terminated gracefully
1370 dummysi->SetOrdinal(fullord);
1372 SafeDelete(rmsi);
1373
1374 // Create worker server
1375 TString wn(worker->GetNodeName());
1376 if (wn == "localhost" || wn.BeginsWith("localhost.")) wn = gSystem->HostName();
1377 TUrl u(TString::Format("%s:%d", wn.Data(), sport));
1378 // Add group info in the password firdl, if any
1379 if (strlen(gProofServ->GetGroup()) > 0) {
1380 // Set also the user, otherwise the password is not exported
1381 if (strlen(u.GetUser()) <= 0)
1384 }
1385 TSlave *slave = 0;
1386 if (worker->IsWorker()) {
1387 slave = CreateSlave(u.GetUrl(), fullord, perfidx, image, workdir);
1388 } else {
1389 slave = CreateSubmaster(u.GetUrl(), fullord,
1390 image, worker->GetMsd(), worker->GetNWrks());
1391 }
1392
1393 // Add to global list (we will add to the monitor list after
1394 // finalizing the server startup)
1395 Bool_t slaveOk = kTRUE;
1396 fSlaves->Add(slave);
1397 if (slave->IsValid()) {
1398 addedWorkers->Add(slave);
1399 } else {
1400 slaveOk = kFALSE;
1401 fBadSlaves->Add(slave);
1402 Warning("AddWorkers", "worker '%s' is invalid", slave->GetOrdinal());
1403 }
1404
1405 PDB(kGlobal,3)
1406 Info("AddWorkers", "worker on host %s created"
1407 " and added to list (ord: %s)", worker->GetName(), slave->GetOrdinal());
1408
1409 // Notify opening of connection
1410 nSlavesDone++;
1412 m << TString("Opening connections to workers") << nSlaves
1413 << nSlavesDone << slaveOk;
1415
1416 ord++;
1417 } //end of the worker loop
1418 SafeDelete(dummysi);
1419
1420 // Cleanup
1421 SafeDelete(workerList);
1422
1423 nSlavesDone = 0;
1424
1425 // Here we finalize the server startup: in this way the bulk
1426 // of remote operations are almost parallelized
1427 TIter nxsl(addedWorkers);
1428 TSlave *sl = 0;
1429 while ((sl = (TSlave *) nxsl())) {
1430
1431 // Finalize setup of the server
1432 if (sl->IsValid())
1433 sl->SetupServ(TSlave::kSlave, 0);
1434
1435 // Monitor good slaves
1436 Bool_t slaveOk = kTRUE;
1437 if (sl->IsValid()) {
1438 fAllMonitor->Add(sl->GetSocket());
1439 PDB(kGlobal,3)
1440 Info("AddWorkers", "worker on host %s finalized"
1441 " and added to list", sl->GetOrdinal());
1442 } else {
1443 slaveOk = kFALSE;
1444 fBadSlaves->Add(sl);
1445 }
1446
1447 // Notify end of startup operations
1448 nSlavesDone++;
1450 m << TString("Setting up worker servers") << nSlaves
1451 << nSlavesDone << slaveOk;
1453 }
1454
1455 // Now set new state on the added workers (on all workers for simplicity)
1456 // use fEnabledPackages, fLoadedMacros,
1457 // gSystem->GetDynamicPath() and gSystem->GetIncludePath()
1458 // no need to load packages that are only loaded and not enabled (dyn mode)
1459 Int_t nwrk = GetRemoteProtocol() > 35 ? -1 : 9999;
1460 TNamed *n = 0;
1461 if (TProof::GetEnvVars() &&
1462 (n = (TNamed *) TProof::GetEnvVars()->FindObject("PROOF_NWORKERS"))) {
1463 TString s(n->GetTitle());
1464 if (s.IsDigit()) nwrk = s.Atoi();
1465 }
1466
1467 if (fDynamicStartup && goMoreParallel) {
1468
1469 PDB(kGlobal, 3)
1470 Info("AddWorkers", "will invoke GoMoreParallel()");
1471 Int_t nw = GoMoreParallel(nwrk);
1472 PDB(kGlobal, 3)
1473 Info("AddWorkers", "GoMoreParallel()=%d", nw);
1474
1475 }
1476 else {
1477 // Not in Dynamic Workers mode
1478 PDB(kGlobal, 3)
1479 Info("AddWorkers", "will invoke GoParallel()");
1480 GoParallel(nwrk, kFALSE, 0);
1481 }
1482
1483 // Set worker processing environment
1484 SetupWorkersEnv(addedWorkers, goMoreParallel);
1485
1486 // Update list of current workers
1487 PDB(kGlobal, 3)
1488 Info("AddWorkers", "will invoke SaveWorkerInfo()");
1490
1491 // Inform the client that the number of workers has changed
1492 if (fDynamicStartup && gProofServ) {
1493 PDB(kGlobal, 3)
1494 Info("AddWorkers", "will invoke SendParallel()");
1496
1497 if (goMoreParallel && fPlayer) {
1498 // In case we are adding workers dynamically to an existing process, we
1499 // should invoke a special player's Process() to set only added workers
1500 // to the proper state
1501 PDB(kGlobal, 3)
1502 Info("AddWorkers", "will send the PROCESS message to selected workers");
1503 fPlayer->JoinProcess(addedWorkers);
1504 // Update merger counters (new workers are not yet active)
1505 fMergePrg.SetNWrks(fActiveSlaves->GetSize() + addedWorkers->GetSize());
1506 }
1507 }
1508
1509 // Cleanup
1510 delete addedWorkers;
1511
1512 return 0;
1513}
1514
1515////////////////////////////////////////////////////////////////////////////////
1516/// Set up packages, loaded macros, include and lib paths ...
1517
1518void TProof::SetupWorkersEnv(TList *addedWorkers, Bool_t increasingWorkers)
1519{
1520 TList *server_packs = gProofServ ? gProofServ->GetEnabledPackages() : nullptr;
1521 // Packages
1522 TList *packs = server_packs ? server_packs : GetEnabledPackages();
1523 if (packs && packs->GetSize() > 0) {
1524 TIter nxp(packs);
1525 TPair *pck = 0;
1526 while ((pck = (TPair *) nxp())) {
1527 // Upload and Enable methods are intelligent and avoid
1528 // re-uploading or re-enabling of a package to a node that has it.
1529 if (fDynamicStartup && increasingWorkers) {
1530 // Upload only on added workers
1531 PDB(kGlobal, 3)
1532 Info("SetupWorkersEnv", "will invoke UploadPackage() and EnablePackage() on added workers");
1533 if (UploadPackage(pck->GetName(), kUntar, addedWorkers) >= 0)
1534 EnablePackage(pck->GetName(), (TList *) pck->Value(), kTRUE, addedWorkers);
1535 } else {
1536 PDB(kGlobal, 3)
1537 Info("SetupWorkersEnv", "will invoke UploadPackage() and EnablePackage() on all workers");
1538 if (UploadPackage(pck->GetName()) >= 0)
1539 EnablePackage(pck->GetName(), (TList *) pck->Value(), kTRUE);
1540 }
1541 }
1542 }
1543
1544 if (server_packs) {
1545 server_packs->Delete();
1546 delete server_packs;
1547 }
1548
1549 // Loaded macros
1550 if (fLoadedMacros) {
1551 TIter nxp(fLoadedMacros);
1552 TObjString *os = 0;
1553 while ((os = (TObjString *) nxp())) {
1554 PDB(kGlobal, 3) {
1555 Info("SetupWorkersEnv", "will invoke Load() on selected workers");
1556 Printf("Loading a macro : %s", os->GetName());
1557 }
1558 Load(os->GetName(), kTRUE, kTRUE, addedWorkers);
1559 }
1560 }
1561
1562 // Dynamic path
1564 dyn.ReplaceAll(":", " ");
1565 dyn.ReplaceAll("\"", " ");
1566 PDB(kGlobal, 3)
1567 Info("SetupWorkersEnv", "will invoke AddDynamicPath() on selected workers");
1568 AddDynamicPath(dyn, kFALSE, addedWorkers, kFALSE); // Do not Collect
1569
1570 // Include path
1572 inc.ReplaceAll("-I", " ");
1573 inc.ReplaceAll("\"", " ");
1574 PDB(kGlobal, 3)
1575 Info("SetupWorkersEnv", "will invoke AddIncludePath() on selected workers");
1576 AddIncludePath(inc, kFALSE, addedWorkers, kFALSE); // Do not Collect
1577
1578 // Done
1579 return;
1580}
1581
1582////////////////////////////////////////////////////////////////////////////////
1583/// Used for shuting down the workres after a query is finished.
1584/// Sends each of the workers from the workerList, a kPROOF_STOP message.
1585/// If the workerList == 0, shutdown all the workers.
1586
1588{
1589 if (!IsMaster()) {
1590 Error("RemoveWorkers", "RemoveWorkers can only be called on the master!");
1591 return -1;
1592 }
1593
1594 fFileMap.clear(); // This could be avoided if CopyFromCache was used in SendFile
1595
1596 if (!workerList) {
1597 // shutdown all the workers
1598 TIter nxsl(fSlaves);
1599 TSlave *sl = 0;
1600 while ((sl = (TSlave *) nxsl())) {
1601 // Shut down the worker assumig that it is not processing
1602 TerminateWorker(sl);
1603 }
1604
1605 } else {
1606 if (!(workerList->GetSize())) {
1607 Error("RemoveWorkers", "The list of workers should not be empty!");
1608 return -2;
1609 }
1610
1611 // Loop over all the workers and stop them
1612 TListIter next(workerList);
1613 TObject *to;
1614 TProofNodeInfo *worker;
1615 while ((to = next())) {
1616 TSlave *sl = 0;
1617 if (!strcmp(to->ClassName(), "TProofNodeInfo")) {
1618 // Get the next worker from the list
1619 worker = (TProofNodeInfo *)to;
1620 TIter nxsl(fSlaves);
1621 while ((sl = (TSlave *) nxsl())) {
1622 // Shut down the worker assumig that it is not processing
1623 if (sl->GetName() == worker->GetNodeName())
1624 break;
1625 }
1626 } else if (to->InheritsFrom(TSlave::Class())) {
1627 sl = (TSlave *) to;
1628 } else {
1629 Warning("RemoveWorkers","unknown object type: %s - it should be"
1630 " TProofNodeInfo or inheriting from TSlave", to->ClassName());
1631 }
1632 // Shut down the worker assumig that it is not processing
1633 if (sl) {
1634 if (gDebug > 0)
1635 Info("RemoveWorkers","terminating worker %s", sl->GetOrdinal());
1636 TerminateWorker(sl);
1637 }
1638 }
1639 }
1640
1641 // Update also the master counter
1642 if (gProofServ && fSlaves->GetSize() <= 0) gProofServ->ReleaseWorker("master");
1643
1644 return 0;
1645}
1646
1647////////////////////////////////////////////////////////////////////////////////
1648/// Start up PROOF slaves.
1649
1651{
1652 // If this is a master server, find the config file and start slave
1653 // servers as specified in the config file
1655
1656 Int_t pc = 0;
1657 TList *workerList = new TList;
1658 // Get list of workers
1659 if (gProofServ->GetWorkers(workerList, pc) == TProofServ::kQueryStop) {
1660 TString emsg("no resource currently available for this session: please retry later");
1661 if (gDebug > 0) Info("StartSlaves", "%s", emsg.Data());
1663 return kFALSE;
1664 }
1665 // Setup the workers
1666 if (AddWorkers(workerList) < 0)
1667 return kFALSE;
1668
1669 } else {
1670
1671 // create master server
1672 Printf("Starting master: opening connection ...");
1673 TSlave *slave = CreateSubmaster(fUrl.GetUrl(), "0", "master", 0);
1674
1675 if (slave->IsValid()) {
1676
1677 // Notify
1678 fprintf(stderr,"Starting master:"
1679 " connection open: setting up server ... \r");
1680 StartupMessage("Connection to master opened", kTRUE, 1, 1);
1681
1682 if (!attach) {
1683
1684 // Set worker interrupt handler
1685 slave->SetInterruptHandler(kTRUE);
1686
1687 // Finalize setup of the server
1689
1690 if (slave->IsValid()) {
1691
1692 // Notify
1693 Printf("Starting master: OK ");
1694 StartupMessage("Master started", kTRUE, 1, 1);
1695
1696 // check protocol compatibility
1697 // protocol 1 is not supported anymore
1698 if (fProtocol == 1) {
1699 Error("StartSlaves",
1700 "client and remote protocols not compatible (%d and %d)",
1702 slave->Close("S");
1703 delete slave;
1704 return kFALSE;
1705 }
1706
1707 fSlaves->Add(slave);
1708 fAllMonitor->Add(slave->GetSocket());
1709
1710 // Unset worker interrupt handler
1712
1713 // Set interrupt PROOF handler from now on
1715
1716 // Give-up after 5 minutes
1717 Int_t rc = Collect(slave, 300);
1718 Int_t slStatus = slave->GetStatus();
1719 if (slStatus == -99 || slStatus == -98 || rc == 0) {
1720 fSlaves->Remove(slave);
1721 fAllMonitor->Remove(slave->GetSocket());
1722 if (slStatus == -99)
1723 Error("StartSlaves", "no resources available or problems setting up workers (check logs)");
1724 else if (slStatus == -98)
1725 Error("StartSlaves", "could not setup output redirection on master");
1726 else
1727 Error("StartSlaves", "setting up master");
1728 slave->Close("S");
1729 delete slave;
1730 return 0;
1731 }
1732
1733 if (!slave->IsValid()) {
1734 fSlaves->Remove(slave);
1735 fAllMonitor->Remove(slave->GetSocket());
1736 slave->Close("S");
1737 delete slave;
1738 Error("StartSlaves",
1739 "failed to setup connection with PROOF master server");
1740 return kFALSE;
1741 }
1742
1743 if (!gROOT->IsBatch() && TestBit(kUseProgressDialog)) {
1744 if ((fProgressDialog =
1745 gROOT->GetPluginManager()->FindHandler("TProofProgressDialog")))
1746 if (fProgressDialog->LoadPlugin() == -1)
1747 fProgressDialog = 0;
1748 }
1749 } else {
1750 // Notify
1751 Printf("Starting master: failure");
1752 }
1753 } else {
1754
1755 // Notify
1756 Printf("Starting master: OK ");
1757 StartupMessage("Master attached", kTRUE, 1, 1);
1758
1759 if (!gROOT->IsBatch() && TestBit(kUseProgressDialog)) {
1760 if ((fProgressDialog =
1761 gROOT->GetPluginManager()->FindHandler("TProofProgressDialog")))
1762 if (fProgressDialog->LoadPlugin() == -1)
1763 fProgressDialog = 0;
1764 }
1765
1766 fSlaves->Add(slave);
1768 }
1769
1770 } else {
1771 delete slave;
1772 // Notify only if verbosity is on: most likely the failure has already been notified
1773 if (gDebug > 0)
1774 Error("StartSlaves", "failed to create (or connect to) the PROOF master server");
1775 return kFALSE;
1776 }
1777 }
1778
1779 return kTRUE;
1780}
1781
1782////////////////////////////////////////////////////////////////////////////////
1783/// Close all open slave servers.
1784/// Client can decide to shutdown the remote session by passing option is 'S'
1785/// or 's'. Default for clients is detach, if supported. Masters always
1786/// shutdown the remote counterpart.
1787
1789{
1790 { std::lock_guard<std::recursive_mutex> lock(fCloseMutex);
1791
1792 fValid = kFALSE;
1793 if (fSlaves) {
1794 if (fIntHandler)
1796
1797 TIter nxs(fSlaves);
1798 TSlave *sl = 0;
1799 while ((sl = (TSlave *)nxs()))
1800 sl->Close(opt);
1801
1802 fActiveSlaves->Clear("nodelete");
1803 fUniqueSlaves->Clear("nodelete");
1804 fAllUniqueSlaves->Clear("nodelete");
1805 fNonUniqueMasters->Clear("nodelete");
1806 fBadSlaves->Clear("nodelete");
1807 fInactiveSlaves->Clear("nodelete");
1808 fSlaves->Delete();
1809 }
1810 }
1811
1813 gROOT->GetListOfSockets()->Remove(this);
1814
1815 if (fChains) {
1816 while (TChain *chain = dynamic_cast<TChain*> (fChains->First()) ) {
1817 // remove "chain" from list
1818 chain->SetProof(0);
1819 RemoveChain(chain);
1820 }
1821 }
1822
1823 if (IsProofd()) {
1824
1825 gROOT->GetListOfProofs()->Remove(this);
1826 if (gProof && gProof == this) {
1827 // Set previous proofd-related as default
1828 TIter pvp(gROOT->GetListOfProofs(), kIterBackward);
1829 while ((gProof = (TProof *)pvp())) {
1830 if (gProof->IsProofd())
1831 break;
1832 }
1833 }
1834 }
1835 }
1836}
1837
1838////////////////////////////////////////////////////////////////////////////////
1839/// Create a new TSlave of type TSlave::kSlave.
1840/// Note: creation of TSlave is private with TProof as a friend.
1841/// Derived classes must use this function to create slaves.
1842
1843TSlave *TProof::CreateSlave(const char *url, const char *ord,
1844 Int_t perf, const char *image, const char *workdir)
1845{
1846 TSlave* sl = TSlave::Create(url, ord, perf, image,
1847 this, TSlave::kSlave, workdir, 0);
1848
1849 if (sl->IsValid()) {
1850 sl->SetInputHandler(new TProofInputHandler(this, sl->GetSocket()));
1851 // must set fParallel to 1 for slaves since they do not
1852 // report their fParallel with a LOG_DONE message
1853 sl->fParallel = 1;
1854 }
1855
1856 return sl;
1857}
1858
1859
1860////////////////////////////////////////////////////////////////////////////////
1861/// Create a new TSlave of type TSlave::kMaster.
1862/// Note: creation of TSlave is private with TProof as a friend.
1863/// Derived classes must use this function to create slaves.
1864
1865TSlave *TProof::CreateSubmaster(const char *url, const char *ord,
1866 const char *image, const char *msd, Int_t nwk)
1867{
1868 TSlave *sl = TSlave::Create(url, ord, 100, image, this,
1869 TSlave::kMaster, 0, msd, nwk);
1870
1871 if (sl->IsValid()) {
1872 sl->SetInputHandler(new TProofInputHandler(this, sl->GetSocket()));
1873 }
1874
1875 return sl;
1876}
1877
1878////////////////////////////////////////////////////////////////////////////////
1879/// Find slave that has TSocket s. Returns 0 in case slave is not found.
1880
1882{
1883 TSlave *sl;
1884 TIter next(fSlaves);
1885
1886 while ((sl = (TSlave *)next())) {
1887 if (sl->IsValid() && sl->GetSocket() == s)
1888 return sl;
1889 }
1890 return 0;
1891}
1892
1893////////////////////////////////////////////////////////////////////////////////
1894/// Add to the fUniqueSlave list the active slaves that have a unique
1895/// (user) file system image. This information is used to transfer files
1896/// only once to nodes that share a file system (an image). Submasters
1897/// which are not in fUniqueSlaves are put in the fNonUniqueMasters
1898/// list. That list is used to trigger the transferring of files to
1899/// the submaster's unique slaves without the need to transfer the file
1900/// to the submaster.
1901
1903{
1909
1910 TIter next(fActiveSlaves);
1911
1912 while (TSlave *sl = dynamic_cast<TSlave*>(next())) {
1913 if (fImage == sl->fImage) {
1914 if (sl->GetSlaveType() == TSlave::kMaster) {
1916 fAllUniqueSlaves->Add(sl);
1917 fAllUniqueMonitor->Add(sl->GetSocket());
1918 }
1919 continue;
1920 }
1921
1922 TIter next2(fUniqueSlaves);
1923 TSlave *replace_slave = 0;
1924 Bool_t add = kTRUE;
1925 while (TSlave *sl2 = dynamic_cast<TSlave*>(next2())) {
1926 if (sl->fImage == sl2->fImage) {
1927 add = kFALSE;
1928 if (sl->GetSlaveType() == TSlave::kMaster) {
1929 if (sl2->GetSlaveType() == TSlave::kSlave) {
1930 // give preference to master
1931 replace_slave = sl2;
1932 add = kTRUE;
1933 } else if (sl2->GetSlaveType() == TSlave::kMaster) {
1935 fAllUniqueSlaves->Add(sl);
1936 fAllUniqueMonitor->Add(sl->GetSocket());
1937 } else {
1938 Error("FindUniqueSlaves", "TSlave is neither Master nor Slave");
1939 R__ASSERT(0);
1940 }
1941 }
1942 break;
1943 }
1944 }
1945
1946 if (add) {
1947 fUniqueSlaves->Add(sl);
1948 fAllUniqueSlaves->Add(sl);
1949 fUniqueMonitor->Add(sl->GetSocket());
1950 fAllUniqueMonitor->Add(sl->GetSocket());
1951 if (replace_slave) {
1952 fUniqueSlaves->Remove(replace_slave);
1953 fAllUniqueSlaves->Remove(replace_slave);
1954 fUniqueMonitor->Remove(replace_slave->GetSocket());
1955 fAllUniqueMonitor->Remove(replace_slave->GetSocket());
1956 }
1957 }
1958 }
1959
1960 // will be actiavted in Collect()
1963}
1964
1965////////////////////////////////////////////////////////////////////////////////
1966/// Return number of slaves as described in the config file.
1967
1969{
1970 return fSlaves->GetSize();
1971}
1972
1973////////////////////////////////////////////////////////////////////////////////
1974/// Return number of active slaves, i.e. slaves that are valid and in
1975/// the current computing group.
1976
1978{
1979 return fActiveSlaves->GetSize();
1980}
1981
1982////////////////////////////////////////////////////////////////////////////////
1983/// Return number of inactive slaves, i.e. slaves that are valid but not in
1984/// the current computing group.
1985
1987{
1988 return fInactiveSlaves->GetSize();
1989}
1990
1991////////////////////////////////////////////////////////////////////////////////
1992/// Return number of unique slaves, i.e. active slaves that have each a
1993/// unique different user files system.
1994
1996{
1997 return fUniqueSlaves->GetSize();
1998}
1999
2000////////////////////////////////////////////////////////////////////////////////
2001/// Return number of bad slaves. This are slaves that we in the config
2002/// file, but refused to startup or that died during the PROOF session.
2003
2005{
2006 return fBadSlaves->GetSize();
2007}
2008
2009////////////////////////////////////////////////////////////////////////////////
2010/// Ask the for the statistics of the slaves.
2011
2013{
2014 if (!IsValid()) return;
2015
2018}
2019
2020////////////////////////////////////////////////////////////////////////////////
2021/// Get statistics about CPU time, real time and bytes read.
2022/// If verbose, print the resuls (always available via GetCpuTime(), GetRealTime()
2023/// and GetBytesRead()
2024
2026{
2027 if (fProtocol > 27) {
2028 // This returns the correct result
2029 AskStatistics();
2030 } else {
2031 // AskStatistics is buggy: parse the output of Print()
2034 Print();
2035 gSystem->RedirectOutput(0, 0, &rh);
2036 TMacro *mp = GetLastLog();
2037 if (mp) {
2038 // Look for global directories
2039 TIter nxl(mp->GetListOfLines());
2040 TObjString *os = 0;
2041 while ((os = (TObjString *) nxl())) {
2042 TString s(os->GetName());
2043 if (s.Contains("Total MB's processed:")) {
2044 s.ReplaceAll("Total MB's processed:", "");
2045 if (s.IsFloat()) fBytesRead = (Long64_t) s.Atof() * (1024*1024);
2046 } else if (s.Contains("Total real time used (s):")) {
2047 s.ReplaceAll("Total real time used (s):", "");
2048 if (s.IsFloat()) fRealTime = s.Atof();
2049 } else if (s.Contains("Total CPU time used (s):")) {
2050 s.ReplaceAll("Total CPU time used (s):", "");
2051 if (s.IsFloat()) fCpuTime = s.Atof();
2052 }
2053 }
2054 delete mp;
2055 }
2056 }
2057
2058 if (verbose) {
2059 Printf(" Real/CPU time (s): %.3f / %.3f; workers: %d; processed: %.2f MBs",
2060 GetRealTime(), GetCpuTime(), GetParallel(), float(GetBytesRead())/(1024*1024));
2061 }
2062}
2063
2064////////////////////////////////////////////////////////////////////////////////
2065/// Ask the for the number of parallel slaves.
2066
2068{
2069 if (!IsValid()) return;
2070
2073}
2074
2075////////////////////////////////////////////////////////////////////////////////
2076/// Ask the master for the list of queries.
2077
2079{
2080 if (!IsValid() || TestBit(TProof::kIsMaster)) return (TList *)0;
2081
2082 Bool_t all = ((strchr(opt,'A') || strchr(opt,'a'))) ? kTRUE : kFALSE;
2084 m << all;
2087
2088 // This should have been filled by now
2089 return fQueries;
2090}
2091
2092////////////////////////////////////////////////////////////////////////////////
2093/// Number of queries processed by this session
2094
2096{
2097 if (fQueries)
2098 return fQueries->GetSize() - fOtherQueries;
2099 return 0;
2100}
2101
2102////////////////////////////////////////////////////////////////////////////////
2103/// Set max number of draw queries whose results are saved
2104
2106{
2107 if (max > 0) {
2108 if (fPlayer)
2110 fMaxDrawQueries = max;
2111 }
2112}
2113
2114////////////////////////////////////////////////////////////////////////////////
2115/// Get max number of queries whose full results are kept in the
2116/// remote sandbox
2117
2119{
2121 m << kFALSE;
2124}
2125
2126////////////////////////////////////////////////////////////////////////////////
2127/// Return pointer to the list of query results in the player
2128
2130{
2131 return (fPlayer ? fPlayer->GetListOfResults() : (TList *)0);
2132}
2133
2134////////////////////////////////////////////////////////////////////////////////
2135/// Return pointer to the full TQueryResult instance owned by the player
2136/// and referenced by 'ref'. If ref = 0 or "", return the last query result.
2137
2139{
2140 return (fPlayer ? fPlayer->GetQueryResult(ref) : (TQueryResult *)0);
2141}
2142
2143////////////////////////////////////////////////////////////////////////////////
2144/// Ask the master for the list of queries.
2145/// Options:
2146/// "A" show information about all the queries known to the
2147/// server, i.e. even those processed by other sessions
2148/// "L" show only information about queries locally available
2149/// i.e. already retrieved. If "L" is specified, "A" is
2150/// ignored.
2151/// "F" show all details available about queries
2152/// "H" print help menu
2153/// Default ""
2154
2156{
2157 Bool_t help = ((strchr(opt,'H') || strchr(opt,'h'))) ? kTRUE : kFALSE;
2158 if (help) {
2159
2160 // Help
2161
2162 Printf("+++");
2163 Printf("+++ Options: \"A\" show all queries known to server");
2164 Printf("+++ \"L\" show retrieved queries");
2165 Printf("+++ \"F\" full listing of query info");
2166 Printf("+++ \"H\" print this menu");
2167 Printf("+++");
2168 Printf("+++ (case insensitive)");
2169 Printf("+++");
2170 Printf("+++ Use Retrieve(<#>) to retrieve the full"
2171 " query results from the master");
2172 Printf("+++ e.g. Retrieve(8)");
2173
2174 Printf("+++");
2175
2176 return;
2177 }
2178
2179 if (!IsValid()) return;
2180
2181 Bool_t local = ((strchr(opt,'L') || strchr(opt,'l'))) ? kTRUE : kFALSE;
2182
2183 TObject *pq = 0;
2184 if (!local) {
2185 GetListOfQueries(opt);
2186
2187 if (!fQueries) return;
2188
2189 TIter nxq(fQueries);
2190
2191 // Queries processed by other sessions
2192 if (fOtherQueries > 0) {
2193 Printf("+++");
2194 Printf("+++ Queries processed during other sessions: %d", fOtherQueries);
2195 Int_t nq = 0;
2196 while (nq++ < fOtherQueries && (pq = nxq()))
2197 pq->Print(opt);
2198 }
2199
2200 // Queries processed by this session
2201 Printf("+++");
2202 Printf("+++ Queries processed during this session: selector: %d, draw: %d",
2204 while ((pq = nxq()))
2205 pq->Print(opt);
2206
2207 } else {
2208
2209 // Queries processed by this session
2210 Printf("+++");
2211 Printf("+++ Queries processed during this session: selector: %d, draw: %d",
2213
2214 // Queries available locally
2215 TList *listlocal = fPlayer ? fPlayer->GetListOfResults() : (TList *)0;
2216 if (listlocal) {
2217 Printf("+++");
2218 Printf("+++ Queries available locally: %d", listlocal->GetSize());
2219 TIter nxlq(listlocal);
2220 while ((pq = nxlq()))
2221 pq->Print(opt);
2222 }
2223 }
2224 Printf("+++");
2225}
2226
2227////////////////////////////////////////////////////////////////////////////////
2228/// See if the data is ready to be analyzed.
2229
2231{
2232 if (!IsValid()) return kFALSE;
2233
2234 TList submasters;
2235 TIter nextSlave(GetListOfActiveSlaves());
2236 while (TSlave *sl = dynamic_cast<TSlave*>(nextSlave())) {
2237 if (sl->GetSlaveType() == TSlave::kMaster) {
2238 submasters.Add(sl);
2239 }
2240 }
2241
2242 fDataReady = kTRUE; //see if any submasters set it to false
2243 fBytesReady = 0;
2244 fTotalBytes = 0;
2245 //loop over submasters and see if data is ready
2246 if (submasters.GetSize() > 0) {
2247 Broadcast(kPROOF_DATA_READY, &submasters);
2248 Collect(&submasters);
2249 }
2250
2251 bytesready = fBytesReady;
2252 totalbytes = fTotalBytes;
2253
2254 EmitVA("IsDataReady(Long64_t,Long64_t)", 2, totalbytes, bytesready);
2255
2256 PDB(kGlobal,2)
2257 Info("IsDataReady", "%lld / %lld (%s)",
2258 bytesready, totalbytes, fDataReady?"READY":"NOT READY");
2259
2260 return fDataReady;
2261}
2262
2263////////////////////////////////////////////////////////////////////////////////
2264/// Send interrupt to master or slave servers.
2265
2267{
2268 if (!IsValid()) return;
2269
2270 TList *slaves = 0;
2271 if (list == kAll) slaves = fSlaves;
2272 if (list == kActive) slaves = fActiveSlaves;
2273 if (list == kUnique) slaves = fUniqueSlaves;
2274 if (list == kAllUnique) slaves = fAllUniqueSlaves;
2275
2276 if (slaves->GetSize() == 0) return;
2277
2278 TSlave *sl;
2279 TIter next(slaves);
2280
2281 while ((sl = (TSlave *)next())) {
2282 if (sl->IsValid()) {
2283
2284 // Ask slave to progate the interrupt request
2285 sl->Interrupt((Int_t)type);
2286 }
2287 }
2288}
2289
2290////////////////////////////////////////////////////////////////////////////////
2291/// Returns number of slaves active in parallel mode. Returns 0 in case
2292/// there are no active slaves. Returns -1 in case of error.
2293
2295{
2296 if (!IsValid()) return -1;
2297
2298 // iterate over active slaves and return total number of slaves
2299 TIter nextSlave(GetListOfActiveSlaves());
2300 Int_t nparallel = 0;
2301 while (TSlave* sl = dynamic_cast<TSlave*>(nextSlave()))
2302 if (sl->GetParallel() >= 0)
2303 nparallel += sl->GetParallel();
2304
2305 return nparallel;
2306}
2307
2308////////////////////////////////////////////////////////////////////////////////
2309/// Returns list of TSlaveInfo's. In case of error return 0.
2310
2312{
2313 if (!IsValid()) return 0;
2314
2315 if (fSlaveInfo == 0) {
2318 } else {
2319 fSlaveInfo->Delete();
2320 }
2321
2322 TList masters;
2323 TIter next(GetListOfSlaves());
2324 TSlave *slave;
2325
2326 while ((slave = (TSlave *) next()) != 0) {
2327 if (slave->GetSlaveType() == TSlave::kSlave) {
2328 const char *name = IsLite() ? gSystem->HostName() : slave->GetName();
2329 TSlaveInfo *slaveinfo = new TSlaveInfo(slave->GetOrdinal(),
2330 name,
2331 slave->GetPerfIdx());
2332 fSlaveInfo->Add(slaveinfo);
2333
2334 TIter nextactive(GetListOfActiveSlaves());
2335 TSlave *activeslave;
2336 while ((activeslave = (TSlave *) nextactive())) {
2337 if (TString(slaveinfo->GetOrdinal()) == activeslave->GetOrdinal()) {
2338 slaveinfo->SetStatus(TSlaveInfo::kActive);
2339 break;
2340 }
2341 }
2342
2343 TIter nextbad(GetListOfBadSlaves());
2344 TSlave *badslave;
2345 while ((badslave = (TSlave *) nextbad())) {
2346 if (TString(slaveinfo->GetOrdinal()) == badslave->GetOrdinal()) {
2347 slaveinfo->SetStatus(TSlaveInfo::kBad);
2348 break;
2349 }
2350 }
2351 // Get system info if supported
2352 if (slave->IsValid()) {
2353 if (slave->GetSocket()->Send(kPROOF_GETSLAVEINFO) == -1)
2354 MarkBad(slave, "could not send kPROOF_GETSLAVEINFO message");
2355 else
2356 masters.Add(slave);
2357 }
2358
2359 } else if (slave->GetSlaveType() == TSlave::kMaster) {
2360 if (slave->IsValid()) {
2361 if (slave->GetSocket()->Send(kPROOF_GETSLAVEINFO) == -1)
2362 MarkBad(slave, "could not send kPROOF_GETSLAVEINFO message");
2363 else
2364 masters.Add(slave);
2365 }
2366 } else {
2367 Error("GetSlaveInfo", "TSlave is neither Master nor Slave");
2368 R__ASSERT(0);
2369 }
2370 }
2371 if (masters.GetSize() > 0) Collect(&masters);
2372
2373 return fSlaveInfo;
2374}
2375
2376////////////////////////////////////////////////////////////////////////////////
2377/// Activate slave server list.
2378
2380{
2381 TMonitor *mon = fAllMonitor;
2382 mon->DeActivateAll();
2383
2384 slaves = !slaves ? fActiveSlaves : slaves;
2385
2386 TIter next(slaves);
2387 TSlave *sl;
2388 while ((sl = (TSlave*) next())) {
2389 if (sl->IsValid())
2390 mon->Activate(sl->GetSocket());
2391 }
2392}
2393
2394////////////////////////////////////////////////////////////////////////////////
2395/// Activate (on == TRUE) or deactivate (on == FALSE) all sockets
2396/// monitored by 'mon'.
2397
2399{
2400 TMonitor *m = (mon) ? mon : fCurrentMonitor;
2401 if (m) {
2402 if (on)
2403 m->ActivateAll();
2404 else
2405 m->DeActivateAll();
2406 }
2407}
2408
2409////////////////////////////////////////////////////////////////////////////////
2410/// Broadcast the group priority to all workers in the specified list. Returns
2411/// the number of workers the message was successfully sent to.
2412/// Returns -1 in case of error.
2413
2414Int_t TProof::BroadcastGroupPriority(const char *grp, Int_t priority, TList *workers)
2415{
2416 if (!IsValid()) return -1;
2417
2418 if (workers->GetSize() == 0) return 0;
2419
2420 int nsent = 0;
2421 TIter next(workers);
2422
2423 TSlave *wrk;
2424 while ((wrk = (TSlave *)next())) {
2425 if (wrk->IsValid()) {
2426 if (wrk->SendGroupPriority(grp, priority) == -1)
2427 MarkBad(wrk, "could not send group priority");
2428 else
2429 nsent++;
2430 }
2431 }
2432
2433 return nsent;
2434}
2435
2436////////////////////////////////////////////////////////////////////////////////
2437/// Broadcast the group priority to all workers in the specified list. Returns
2438/// the number of workers the message was successfully sent to.
2439/// Returns -1 in case of error.
2440
2441Int_t TProof::BroadcastGroupPriority(const char *grp, Int_t priority, ESlaves list)
2442{
2443 TList *workers = 0;
2444 if (list == kAll) workers = fSlaves;
2445 if (list == kActive) workers = fActiveSlaves;
2446 if (list == kUnique) workers = fUniqueSlaves;
2447 if (list == kAllUnique) workers = fAllUniqueSlaves;
2448
2449 return BroadcastGroupPriority(grp, priority, workers);
2450}
2451
2452////////////////////////////////////////////////////////////////////////////////
2453/// Reset the merge progress notificator
2454
2456{
2458}
2459
2460////////////////////////////////////////////////////////////////////////////////
2461/// Broadcast a message to all slaves in the specified list. Returns
2462/// the number of slaves the message was successfully sent to.
2463/// Returns -1 in case of error.
2464
2466{
2467 if (!IsValid()) return -1;
2468
2469 if (!slaves || slaves->GetSize() == 0) return 0;
2470
2471 int nsent = 0;
2472 TIter next(slaves);
2473
2474 TSlave *sl;
2475 while ((sl = (TSlave *)next())) {
2476 if (sl->IsValid()) {
2477 if (sl->GetSocket()->Send(mess) == -1)
2478 MarkBad(sl, "could not broadcast request");
2479 else
2480 nsent++;
2481 }
2482 }
2483
2484 return nsent;
2485}
2486
2487////////////////////////////////////////////////////////////////////////////////
2488/// Broadcast a message to all slaves in the specified list (either
2489/// all slaves or only the active slaves). Returns the number of slaves
2490/// the message was successfully sent to. Returns -1 in case of error.
2491
2493{
2494 TList *slaves = 0;
2495 if (list == kAll) slaves = fSlaves;
2496 if (list == kActive) slaves = fActiveSlaves;
2497 if (list == kUnique) slaves = fUniqueSlaves;
2498 if (list == kAllUnique) slaves = fAllUniqueSlaves;
2499
2500 return Broadcast(mess, slaves);
2501}
2502
2503////////////////////////////////////////////////////////////////////////////////
2504/// Broadcast a character string buffer to all slaves in the specified
2505/// list. Use kind to set the TMessage what field. Returns the number of
2506/// slaves the message was sent to. Returns -1 in case of error.
2507
2508Int_t TProof::Broadcast(const char *str, Int_t kind, TList *slaves)
2509{
2510 TMessage mess(kind);
2511 if (str) mess.WriteString(str);
2512 return Broadcast(mess, slaves);
2513}
2514
2515////////////////////////////////////////////////////////////////////////////////
2516/// Broadcast a character string buffer to all slaves in the specified
2517/// list (either all slaves or only the active slaves). Use kind to
2518/// set the TMessage what field. Returns the number of slaves the message
2519/// was sent to. Returns -1 in case of error.
2520
2521Int_t TProof::Broadcast(const char *str, Int_t kind, ESlaves list)
2522{
2523 TMessage mess(kind);
2524 if (str) mess.WriteString(str);
2525 return Broadcast(mess, list);
2526}
2527
2528////////////////////////////////////////////////////////////////////////////////
2529/// Broadcast an object to all slaves in the specified list. Use kind to
2530/// set the TMEssage what field. Returns the number of slaves the message
2531/// was sent to. Returns -1 in case of error.
2532
2534{
2535 TMessage mess(kind);
2536 mess.WriteObject(obj);
2537 return Broadcast(mess, slaves);
2538}
2539
2540////////////////////////////////////////////////////////////////////////////////
2541/// Broadcast an object to all slaves in the specified list. Use kind to
2542/// set the TMEssage what field. Returns the number of slaves the message
2543/// was sent to. Returns -1 in case of error.
2544
2546{
2547 TMessage mess(kind);
2548 mess.WriteObject(obj);
2549 return Broadcast(mess, list);
2550}
2551
2552////////////////////////////////////////////////////////////////////////////////
2553/// Broadcast a raw buffer of specified length to all slaves in the
2554/// specified list. Returns the number of slaves the buffer was sent to.
2555/// Returns -1 in case of error.
2556
2557Int_t TProof::BroadcastRaw(const void *buffer, Int_t length, TList *slaves)
2558{
2559 if (!IsValid()) return -1;
2560
2561 if (slaves->GetSize() == 0) return 0;
2562
2563 int nsent = 0;
2564 TIter next(slaves);
2565
2566 TSlave *sl;
2567 while ((sl = (TSlave *)next())) {
2568 if (sl->IsValid()) {
2569 if (sl->GetSocket()->SendRaw(buffer, length) == -1)
2570 MarkBad(sl, "could not send broadcast-raw request");
2571 else
2572 nsent++;
2573 }
2574 }
2575
2576 return nsent;
2577}
2578
2579////////////////////////////////////////////////////////////////////////////////
2580/// Broadcast a raw buffer of specified length to all slaves in the
2581/// specified list. Returns the number of slaves the buffer was sent to.
2582/// Returns -1 in case of error.
2583
2584Int_t TProof::BroadcastRaw(const void *buffer, Int_t length, ESlaves list)
2585{
2586 TList *slaves = 0;
2587 if (list == kAll) slaves = fSlaves;
2588 if (list == kActive) slaves = fActiveSlaves;
2589 if (list == kUnique) slaves = fUniqueSlaves;
2590 if (list == kAllUnique) slaves = fAllUniqueSlaves;
2591
2592 return BroadcastRaw(buffer, length, slaves);
2593}
2594
2595////////////////////////////////////////////////////////////////////////////////
2596/// Broadcast file to all workers in the specified list. Returns the number of workers
2597/// the buffer was sent to.
2598/// Returns -1 in case of error.
2599
2600Int_t TProof::BroadcastFile(const char *file, Int_t opt, const char *rfile, TList *wrks)
2601{
2602 if (!IsValid()) return -1;
2603
2604 if (wrks->GetSize() == 0) return 0;
2605
2606 int nsent = 0;
2607 TIter next(wrks);
2608
2609 TSlave *wrk;
2610 while ((wrk = (TSlave *)next())) {
2611 if (wrk->IsValid()) {
2612 if (SendFile(file, opt, rfile, wrk) < 0)
2613 Error("BroadcastFile",
2614 "problems sending file to worker %s (%s)",
2615 wrk->GetOrdinal(), wrk->GetName());
2616 else
2617 nsent++;
2618 }
2619 }
2620
2621 return nsent;
2622}
2623
2624////////////////////////////////////////////////////////////////////////////////
2625/// Broadcast file to all workers in the specified list. Returns the number of workers
2626/// the buffer was sent to.
2627/// Returns -1 in case of error.
2628
2629Int_t TProof::BroadcastFile(const char *file, Int_t opt, const char *rfile, ESlaves list)
2630{
2631 TList *wrks = 0;
2632 if (list == kAll) wrks = fSlaves;
2633 if (list == kActive) wrks = fActiveSlaves;
2634 if (list == kUnique) wrks = fUniqueSlaves;
2635 if (list == kAllUnique) wrks = fAllUniqueSlaves;
2636
2637 return BroadcastFile(file, opt, rfile, wrks);
2638}
2639
2640////////////////////////////////////////////////////////////////////////////////
2641/// Release the used monitor to be used, making sure to delete newly created
2642/// monitors.
2643
2645{
2646 if (mon && (mon != fAllMonitor) && (mon != fActiveMonitor)
2647 && (mon != fUniqueMonitor) && (mon != fAllUniqueMonitor)) {
2648 delete mon;
2649 }
2650}
2651
2652////////////////////////////////////////////////////////////////////////////////
2653/// Collect responses from slave sl. Returns the number of slaves that
2654/// responded (=1).
2655/// If timeout >= 0, wait at most timeout seconds (timeout = -1 by default,
2656/// which means wait forever).
2657/// If defined (>= 0) endtype is the message that stops this collection.
2658
2659Int_t TProof::Collect(const TSlave *sl, Long_t timeout, Int_t endtype, Bool_t deactonfail)
2660{
2661 Int_t rc = 0;
2662
2663 TMonitor *mon = 0;
2664 if (!sl->IsValid()) return 0;
2665
2667 mon = new TMonitor;
2668 } else {
2669 mon = fAllMonitor;
2670 mon->DeActivateAll();
2671 }
2672 mon->Activate(sl->GetSocket());
2673
2674 rc = Collect(mon, timeout, endtype, deactonfail);
2675 ReleaseMonitor(mon);
2676 return rc;
2677}
2678
2679////////////////////////////////////////////////////////////////////////////////
2680/// Collect responses from the slave servers. Returns the number of slaves
2681/// that responded.
2682/// If timeout >= 0, wait at most timeout seconds (timeout = -1 by default,
2683/// which means wait forever).
2684/// If defined (>= 0) endtype is the message that stops this collection.
2685
2686Int_t TProof::Collect(TList *slaves, Long_t timeout, Int_t endtype, Bool_t deactonfail)
2687{
2688 Int_t rc = 0;
2689
2690 TMonitor *mon = 0;
2691
2693 mon = new TMonitor;
2694 } else {
2695 mon = fAllMonitor;
2696 mon->DeActivateAll();
2697 }
2698 TIter next(slaves);
2699 TSlave *sl;
2700 while ((sl = (TSlave*) next())) {
2701 if (sl->IsValid())
2702 mon->Activate(sl->GetSocket());
2703 }
2704
2705 rc = Collect(mon, timeout, endtype, deactonfail);
2706 ReleaseMonitor(mon);
2707 return rc;
2708}
2709
2710////////////////////////////////////////////////////////////////////////////////
2711/// Collect responses from the slave servers. Returns the number of slaves
2712/// that responded.
2713/// If timeout >= 0, wait at most timeout seconds (timeout = -1 by default,
2714/// which means wait forever).
2715/// If defined (>= 0) endtype is the message that stops this collection.
2716
2717Int_t TProof::Collect(ESlaves list, Long_t timeout, Int_t endtype, Bool_t deactonfail)
2718{
2719 Int_t rc = 0;
2720 TMonitor *mon = 0;
2721
2722 if (list == kAll) mon = fAllMonitor;
2723 if (list == kActive) mon = fActiveMonitor;
2724 if (list == kUnique) mon = fUniqueMonitor;
2725 if (list == kAllUnique) mon = fAllUniqueMonitor;
2726 if (fCurrentMonitor == mon) {
2727 // Get a copy
2728 mon = new TMonitor(*mon);
2729 }
2730 mon->ActivateAll();
2731
2732 rc = Collect(mon, timeout, endtype, deactonfail);
2733 ReleaseMonitor(mon);
2734 return rc;
2735}
2736
2737////////////////////////////////////////////////////////////////////////////////
2738/// Collect responses from the slave servers. Returns the number of messages
2739/// received. Can be 0 if there are no active slaves.
2740/// If timeout >= 0, wait at most timeout seconds (timeout = -1 by default,
2741/// which means wait forever).
2742/// If defined (>= 0) endtype is the message that stops this collection.
2743/// Collect also stops its execution from time to time to check for new
2744/// workers in Dynamic Startup mode.
2745
2746Int_t TProof::Collect(TMonitor *mon, Long_t timeout, Int_t endtype, Bool_t deactonfail)
2747{
2748 Int_t collectId = gRandom->Integer(9999);
2749
2750 PDB(kCollect, 3)
2751 Info("Collect", ">>>>>> Entering collect responses #%04d", collectId);
2752
2753 // Reset the status flag and clear the messages in the list, if any
2754 fStatus = 0;
2756
2757 Long_t actto = (Long_t)(gEnv->GetValue("Proof.SocketActivityTimeout", -1) * 1000);
2758
2759 if (!mon->GetActive(actto)) return 0;
2760
2762
2763 // Used by external code to know what we are monitoring
2764 TMonitor *savedMonitor = 0;
2765 if (fCurrentMonitor) {
2766 savedMonitor = fCurrentMonitor;
2767 fCurrentMonitor = mon;
2768 } else {
2769 fCurrentMonitor = mon;
2770 fBytesRead = 0;
2771 fRealTime = 0.0;
2772 fCpuTime = 0.0;
2773 }
2774
2775 // We want messages on the main window during synchronous collection,
2776 // but we save the present status to restore it at the end
2777 Bool_t saveRedirLog = fRedirLog;
2778 if (!IsIdle() && !IsSync())
2779 fRedirLog = kFALSE;
2780
2781 int cnt = 0, rc = 0;
2782
2783 // Timeout counter
2784 Long_t nto = timeout;
2785 PDB(kCollect, 2)
2786 Info("Collect","#%04d: active: %d", collectId, mon->GetActive());
2787
2788 // On clients, handle Ctrl-C during collection
2789 if (fIntHandler)
2790 fIntHandler->Add();
2791
2792 // Sockets w/o activity during the last 'sto' millisecs are deactivated
2793 Int_t nact = 0;
2794 Long_t sto = -1;
2795 Int_t nsto = 60;
2796 Int_t pollint = gEnv->GetValue("Proof.DynamicStartupPollInt", (Int_t) kPROOF_DynWrkPollInt_s);
2797 mon->ResetInterrupt();
2798 while ((nact = mon->GetActive(sto)) && (nto < 0 || nto > 0)) {
2799
2800 // Dump last waiting sockets, if in debug mode
2801 PDB(kCollect, 2) {
2802 if (nact < 4) {
2803 TList *al = mon->GetListOfActives();
2804 if (al && al->GetSize() > 0) {
2805 Info("Collect"," %d node(s) still active:", al->GetSize());
2806 TIter nxs(al);
2807 TSocket *xs = 0;
2808 while ((xs = (TSocket *)nxs())) {
2809 TSlave *wrk = FindSlave(xs);
2810 if (wrk)
2811 Info("Collect"," %s (%s)", wrk->GetName(), wrk->GetOrdinal());
2812 else
2813 Info("Collect"," %p: %s:%d", xs, xs->GetInetAddress().GetHostName(),
2814 xs->GetInetAddress().GetPort());
2815 }
2816 }
2817 delete al;
2818 }
2819 }
2820
2821 // Preemptive poll for new workers on the master only in Dynamic Mode and only
2822 // during processing (TODO: should work on Top Master only)
2824 ((fLastPollWorkers_s == -1) || (time(0)-fLastPollWorkers_s >= pollint))) {
2827 fLastPollWorkers_s = time(0);
2829 PDB(kCollect, 1)
2830 Info("Collect","#%04d: now active: %d", collectId, mon->GetActive());
2831 }
2832
2833 // Wait for a ready socket
2834 PDB(kCollect, 3)
2835 Info("Collect", "Will invoke Select() #%04d", collectId);
2836 TSocket *s = mon->Select(1000);
2837
2838 if (s && s != (TSocket *)(-1)) {
2839 // Get and analyse the info it did receive
2840 rc = CollectInputFrom(s, endtype, deactonfail);
2841 if (rc == 1 || (rc == 2 && !savedMonitor)) {
2842 // Deactivate it if we are done with it
2843 mon->DeActivate(s);
2844 TList *al = mon->GetListOfActives();
2845 PDB(kCollect, 2)
2846 Info("Collect","#%04d: deactivating %p (active: %d, %p)", collectId,
2847 s, mon->GetActive(),
2848 al->First());
2849 delete al;
2850 } else if (rc == 2) {
2851 // This end message was for the saved monitor
2852 // Deactivate it if we are done with it
2853 if (savedMonitor) {
2854 savedMonitor->DeActivate(s);
2855 TList *al = mon->GetListOfActives();
2856 PDB(kCollect, 2)
2857 Info("Collect","save monitor: deactivating %p (active: %d, %p)",
2858 s, savedMonitor->GetActive(),
2859 al->First());
2860 delete al;
2861 }
2862 }
2863
2864 // Update counter (if no error occured)
2865 if (rc >= 0)
2866 cnt++;
2867 } else {
2868 // If not timed-out, exit if not stopped or not aborted
2869 // (player exits status is finished in such a case); otherwise,
2870 // we still need to collect the partial output info
2871 if (!s)
2873 mon->DeActivateAll();
2874 // Decrease the timeout counter if requested
2875 if (s == (TSocket *)(-1) && nto > 0)
2876 nto--;
2877 }
2878
2879 // Check if there are workers with ready output to be sent and ask the first to send it
2881 // Maximum number of concurrent sendings
2882 Int_t mxws = gEnv->GetValue("Proof.ControlSendOutput", 1);
2883 if (TProof::GetParameter(fPlayer->GetInputList(), "PROOF_ControlSendOutput", mxws) != 0)
2884 mxws = gEnv->GetValue("Proof.ControlSendOutput", 1);
2885 TIter nxwr(fWrksOutputReady);
2886 TSlave *wrk = 0;
2887 while (mxws && (wrk = (TSlave *) nxwr())) {
2888 if (!wrk->TestBit(TSlave::kOutputRequested)) {
2889 // Ask worker for output
2890 TMessage sendoutput(kPROOF_SENDOUTPUT);
2891 PDB(kCollect, 2)
2892 Info("Collect", "worker %s was asked to send its output to master",
2893 wrk->GetOrdinal());
2894 if (wrk->GetSocket()->Send(sendoutput) != 1) {
2896 mxws--;
2897 }
2898 } else {
2899 // Count
2900 mxws--;
2901 }
2902 }
2903 }
2904
2905 // Check if we need to check the socket activity (we do it every 10 cycles ~ 10 sec)
2906 sto = -1;
2907 if (--nsto <= 0) {
2908 sto = (Long_t) actto;
2909 nsto = 60;
2910 }
2911
2912 } // end loop over active monitors
2913
2914 // If timed-out, deactivate the remaining sockets
2915 if (nto == 0) {
2916 TList *al = mon->GetListOfActives();
2917 if (al && al->GetSize() > 0) {
2918 // Notify the name of those which did timeout
2919 Info("Collect"," %d node(s) went in timeout:", al->GetSize());
2920 TIter nxs(al);
2921 TSocket *xs = 0;
2922 while ((xs = (TSocket *)nxs())) {
2923 TSlave *wrk = FindSlave(xs);
2924 if (wrk)
2925 Info("Collect"," %s", wrk->GetName());
2926 else
2927 Info("Collect"," %p: %s:%d", xs, xs->GetInetAddress().GetHostName(),
2928 xs->GetInetAddress().GetPort());
2929 }
2930 }
2931 delete al;
2932 mon->DeActivateAll();
2933 }
2934
2935 // Deactivate Ctrl-C special handler
2936 if (fIntHandler)
2938
2939 // make sure group view is up to date
2940 SendGroupView();
2941
2942 // Restore redirection setting
2943 fRedirLog = saveRedirLog;
2944
2945 // Restore the monitor
2946 fCurrentMonitor = savedMonitor;
2947
2949
2950 PDB(kCollect, 3)
2951 Info("Collect", "<<<<<< Exiting collect responses #%04d", collectId);
2952
2953 return cnt;
2954}
2955
2956////////////////////////////////////////////////////////////////////////////////
2957/// Asks the PROOF Serv for new workers in Dynamic Startup mode and activates
2958/// them. Returns the number of new workers found, or <0 on errors.
2959
2961{
2962 // Requests for worker updates
2963 Int_t dummy = 0;
2964 TList *reqWorkers = new TList();
2965 reqWorkers->SetOwner(kFALSE);
2966
2967 if (!TestBit(TProof::kIsMaster)) {
2968 Error("PollForNewWorkers", "Can't invoke: not on a master -- should not happen!");
2969 return -1;
2970 }
2971 if (!gProofServ) {
2972 Error("PollForNewWorkers", "No ProofServ available -- should not happen!");
2973 return -1;
2974 }
2975
2976 gProofServ->GetWorkers(reqWorkers, dummy, kTRUE); // last 2 are dummy
2977
2978 // List of new workers only (TProofNodeInfo)
2979 TList *newWorkers = new TList();
2980 newWorkers->SetOwner(kTRUE);
2981
2982 TIter next(reqWorkers);
2983 TProofNodeInfo *ni;
2984 TString fullOrd;
2985 while (( ni = dynamic_cast<TProofNodeInfo *>(next()) )) {
2986
2987 // Form the full ordinal
2988 fullOrd.Form("%s.%s", gProofServ->GetOrdinal(), ni->GetOrdinal().Data());
2989
2990 TIter nextInner(fSlaves);
2991 TSlave *sl;
2992 Bool_t found = kFALSE;
2993 while (( sl = dynamic_cast<TSlave *>(nextInner()) )) {
2994 if ( strcmp(sl->GetOrdinal(), fullOrd.Data()) == 0 ) {
2995 found = kTRUE;
2996 break;
2997 }
2998 }
2999
3000 if (found) delete ni;
3001 else {
3002 newWorkers->Add(ni);
3003 PDB(kGlobal, 1)
3004 Info("PollForNewWorkers", "New worker found: %s:%s",
3005 ni->GetNodeName().Data(), fullOrd.Data());
3006 }
3007 }
3008
3009 delete reqWorkers; // not owner
3010
3011 Int_t nNewWorkers = newWorkers->GetEntries();
3012
3013 // Add the new workers
3014 if (nNewWorkers > 0) {
3015 PDB(kGlobal, 1)
3016 Info("PollForNewWorkers", "Requesting to add %d new worker(s)", newWorkers->GetEntries());
3017 Int_t rv = AddWorkers(newWorkers);
3018 if (rv < 0) {
3019 Error("PollForNewWorkers", "Call to AddWorkers() failed (got %d < 0)", rv);
3020 return -1;
3021 }
3022 // Don't delete newWorkers: AddWorkers() will do that
3023 }
3024 else {
3025 PDB(kGlobal, 2)
3026 Info("PollForNewWorkers", "No new worker found");
3027 delete newWorkers;
3028 }
3029
3030 return nNewWorkers;
3031}
3032
3033////////////////////////////////////////////////////////////////////////////////
3034/// Remove links to objects in list 'ol' from gDirectory
3035
3037{
3038 if (ol) {
3039 TIter nxo(ol);
3040 TObject *o = 0;
3041 while ((o = nxo()))
3042 gDirectory->RecursiveRemove(o);
3043 }
3044}
3045
3046////////////////////////////////////////////////////////////////////////////////
3047/// Collect and analyze available input from socket s.
3048/// Returns 0 on success, -1 if any failure occurs.
3049
3051{
3052 TMessage *mess;
3053
3054 Int_t recvrc = 0;
3055 if ((recvrc = s->Recv(mess)) < 0) {
3056 PDB(kCollect,2)
3057 Info("CollectInputFrom","%p: got %d from Recv()", s, recvrc);
3058 Bool_t bad = kTRUE;
3059 if (recvrc == -5) {
3060 // Broken connection: try reconnection
3062 if (s->Reconnect() == 0) {
3064 bad = kFALSE;
3065 }
3066 }
3067 if (bad)
3068 MarkBad(s, "problems receiving a message in TProof::CollectInputFrom(...)");
3069 // Ignore this wake up
3070 return -1;
3071 }
3072 if (!mess) {
3073 // we get here in case the remote server died
3074 MarkBad(s, "undefined message in TProof::CollectInputFrom(...)");
3075 return -1;
3076 }
3077 Int_t rc = 0;
3078
3079 Int_t what = mess->What();
3080 TSlave *sl = FindSlave(s);
3081 rc = HandleInputMessage(sl, mess, deactonfail);
3082 if (rc == 1 && (endtype >= 0) && (what != endtype))
3083 // This message was for the base monitor in recursive case
3084 rc = 2;
3085
3086 // We are done successfully
3087 return rc;
3088}
3089
3090////////////////////////////////////////////////////////////////////////////////
3091/// Analyze the received message.
3092/// Returns 0 on success (1 if this the last message from this socket), -1 if
3093/// any failure occurs.
3094
3096{
3097 char str[512];
3098 TObject *obj;
3099 Int_t rc = 0;
3100
3101 if (!mess || !sl) {
3102 Warning("HandleInputMessage", "given an empty message or undefined worker");
3103 return -1;
3104 }
3105 Bool_t delete_mess = kTRUE;
3106 TSocket *s = sl->GetSocket();
3107 if (!s) {
3108 Warning("HandleInputMessage", "worker socket is undefined");
3109 return -1;
3110 }
3111
3112 // The message type
3113 Int_t what = mess->What();
3114
3115 PDB(kCollect,3)
3116 Info("HandleInputMessage", "got type %d from '%s'", what, sl->GetOrdinal());
3117
3118 switch (what) {
3119
3120 case kMESS_OK:
3121 // Add the message to the list
3122 fRecvMessages->Add(mess);
3123 delete_mess = kFALSE;
3124 break;
3125
3126 case kMESS_OBJECT:
3127 if (fPlayer) fPlayer->HandleRecvHisto(mess);
3128 break;
3129
3130 case kPROOF_FATAL:
3131 { TString msg;
3132 if ((mess->BufferSize() > mess->Length()))
3133 (*mess) >> msg;
3134 if (msg.IsNull()) {
3135 MarkBad(s, "received kPROOF_FATAL");
3136 } else {
3137 MarkBad(s, msg);
3138 }
3139 }
3141 // Finalize the progress dialog
3142 Emit("StopProcess(Bool_t)", kTRUE);
3143 }
3144 break;
3145
3146 case kPROOF_STOP:
3147 // Stop collection from this worker
3148 Info("HandleInputMessage", "received kPROOF_STOP from %s: disabling any further collection this worker",
3149 sl->GetOrdinal());
3150 rc = 1;
3151 break;
3152
3154 // Add the message to the list
3155 fRecvMessages->Add(mess);
3156 delete_mess = kFALSE;
3157 rc = 1;
3158 break;
3159
3160 case kPROOF_TOUCH:
3161 // send a request for touching the remote admin file
3162 {
3163 sl->Touch();
3164 }
3165 break;
3166
3167 case kPROOF_GETOBJECT:
3168 // send slave object it asks for
3169 mess->ReadString(str, sizeof(str));
3170 obj = gDirectory->Get(str);
3171 if (obj)
3172 s->SendObject(obj);
3173 else
3174 s->Send(kMESS_NOTOK);
3175 break;
3176
3177 case kPROOF_GETPACKET:
3178 {
3179 PDB(kGlobal,2)
3180 Info("HandleInputMessage","%s: kPROOF_GETPACKET", sl->GetOrdinal());
3181 TDSetElement *elem = 0;
3182 elem = fPlayer ? fPlayer->GetNextPacket(sl, mess) : 0;
3183
3184 if (elem != (TDSetElement*) -1) {
3186 answ << elem;
3187 s->Send(answ);
3188
3189 while (fWaitingSlaves != 0 && fWaitingSlaves->GetSize()) {
3190 TPair *p = (TPair*) fWaitingSlaves->First();
3191 s = (TSocket*) p->Key();
3192 TMessage *m = (TMessage*) p->Value();
3193
3194 elem = fPlayer ? fPlayer->GetNextPacket(sl, m) : 0;
3195 if (elem != (TDSetElement*) -1) {
3197 a << elem;
3198 s->Send(a);
3199 // remove has to happen via Links because TPair does not have
3200 // a Compare() function and therefore RemoveFirst() and
3201 // Remove(TObject*) do not work
3203 delete p;
3204 delete m;
3205 } else {
3206 break;
3207 }
3208 }
3209 } else {
3210 if (fWaitingSlaves == 0) fWaitingSlaves = new TList;
3211 fWaitingSlaves->Add(new TPair(s, mess));
3212 delete_mess = kFALSE;
3213 }
3214 }
3215 break;
3216
3217 case kPROOF_LOGFILE:
3218 {
3219 Int_t size;
3220 (*mess) >> size;
3221 PDB(kGlobal,2)
3222 Info("HandleInputMessage","%s: kPROOF_LOGFILE: size: %d", sl->GetOrdinal(), size);
3223 RecvLogFile(s, size);
3224 }
3225 break;
3226
3227 case kPROOF_LOGDONE:
3228 (*mess) >> sl->fStatus >> sl->fParallel;
3229 PDB(kCollect,2)
3230 Info("HandleInputMessage","%s: kPROOF_LOGDONE: status %d parallel %d",
3231 sl->GetOrdinal(), sl->fStatus, sl->fParallel);
3232 if (sl->fStatus != 0) {
3233 // Return last nonzero status
3234 fStatus = sl->fStatus;
3235 // Deactivate the worker, if required
3236 if (deactonfail) DeactivateWorker(sl->fOrdinal);
3237 }
3238 // Remove from the workers-ready list
3242 }
3243 rc = 1;
3244 break;
3245
3246 case kPROOF_GETSTATS:
3247 {
3248 (*mess) >> sl->fBytesRead >> sl->fRealTime >> sl->fCpuTime
3249 >> sl->fWorkDir >> sl->fProofWorkDir;
3250 PDB(kCollect,2)
3251 Info("HandleInputMessage", "kPROOF_GETSTATS: %s", sl->fWorkDir.Data());
3252 TString img;
3253 if ((mess->BufferSize() > mess->Length()))
3254 (*mess) >> img;
3255 // Set image
3256 if (img.IsNull()) {
3257 if (sl->fImage.IsNull())
3258 sl->fImage.Form("%s:%s", TUrl(sl->fName).GetHostFQDN(),
3259 sl->fProofWorkDir.Data());
3260 } else {
3261 sl->fImage = img;
3262 }
3263 PDB(kGlobal,2)
3264 Info("HandleInputMessage",
3265 "kPROOF_GETSTATS:%s image: %s", sl->GetOrdinal(), sl->GetImage());
3266
3267 fBytesRead += sl->fBytesRead;
3268 fRealTime += sl->fRealTime;
3269 fCpuTime += sl->fCpuTime;
3270 rc = 1;
3271 }
3272 break;
3273
3274 case kPROOF_GETPARALLEL:
3275 {
3276 Bool_t async = kFALSE;
3277 (*mess) >> sl->fParallel;
3278 if ((mess->BufferSize() > mess->Length()))
3279 (*mess) >> async;
3280 rc = (async) ? 0 : 1;
3281 }
3282 break;
3283
3284 case kPROOF_CHECKFILE:
3285 { // New servers (>= 5.22) send the status
3286 if ((mess->BufferSize() > mess->Length())) {
3287 (*mess) >> fCheckFileStatus;
3288 } else {
3289 // Form old servers this meant success (failure was signaled with the
3290 // dangerous kPROOF_FATAL)
3291 fCheckFileStatus = 1;
3292 }
3293 rc = 1;
3294 }
3295 break;
3296
3297 case kPROOF_SENDFILE:
3298 { // New server: signals ending of sendfile operation
3299 rc = 1;
3300 }
3301 break;
3302
3304 {
3305 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_PACKAGE_LIST: enter");
3306 Int_t type = 0;
3307 (*mess) >> type;
3308 switch (type) {
3311 fEnabledPackages = (TList *) mess->ReadObject(TList::Class());
3312 if (fEnabledPackages) {
3314 } else {
3315 Error("HandleInputMessage",
3316 "kPROOF_PACKAGE_LIST: kListEnabledPackages: TList not found in message!");
3317 }
3318 break;
3321 fAvailablePackages = (TList *) mess->ReadObject(TList::Class());
3322 if (fAvailablePackages) {
3324 } else {
3325 Error("HandleInputMessage",
3326 "kPROOF_PACKAGE_LIST: kListPackages: TList not found in message!");
3327 }
3328 break;
3329 default:
3330 Error("HandleInputMessage", "kPROOF_PACKAGE_LIST: unknown type: %d", type);
3331 }
3332 }
3333 break;
3334
3335 case kPROOF_SENDOUTPUT:
3336 {
3337 // We start measuring the merging time
3339
3340 // Worker is ready to send output: make sure the relevant bit is reset
3342 PDB(kGlobal,2)
3343 Info("HandleInputMessage","kPROOF_SENDOUTPUT: enter (%s)", sl->GetOrdinal());
3344 // Create the list if not yet done
3345 if (!fWrksOutputReady) {
3346 fWrksOutputReady = new TList;
3348 }
3349 fWrksOutputReady->Add(sl);
3350 }
3351 break;
3352
3354 {
3355 // We start measuring the merging time
3357
3358 PDB(kGlobal,2)
3359 Info("HandleInputMessage","kPROOF_OUTPUTOBJECT: enter");
3360 Int_t type = 0;
3361 const char *prefix = gProofServ ? gProofServ->GetPrefix() : "Lite-0";
3363 Info("HandleInputMessage", "finalization on %s started ...", prefix);
3365 }
3366
3367 while ((mess->BufferSize() > mess->Length())) {
3368 (*mess) >> type;
3369 // If a query result header, add it to the player list
3370 if (fPlayer) {
3371 if (type == 0) {
3372 // Retrieve query result instance (output list not filled)
3373 TQueryResult *pq =
3374 (TQueryResult *) mess->ReadObject(TQueryResult::Class());
3375 if (pq) {
3376 // Add query to the result list in TProofPlayer
3379 // And clear the output list, as we start merging a new set of results
3380 if (fPlayer->GetOutputList())
3382 // Add the unique query tag as TNamed object to the input list
3383 // so that it is available in TSelectors for monitoring
3384 TString qid = TString::Format("%s:%s",pq->GetTitle(),pq->GetName());
3385 if (fPlayer->GetInputList()->FindObject("PROOF_QueryTag"))
3386 fPlayer->GetInputList()->Remove(fPlayer->GetInputList()->FindObject("PROOF_QueryTag"));
3387 fPlayer->AddInput(new TNamed("PROOF_QueryTag", qid.Data()));
3388 } else {
3389 Warning("HandleInputMessage","kPROOF_OUTPUTOBJECT: query result missing");
3390 }
3391 } else if (type > 0) {
3392 // Read object
3393 TObject *o = mess->ReadObject(TObject::Class());
3394 // Increment counter on the client side
3396 TString msg;
3397 Bool_t changed = kFALSE;
3398 msg.Form("%s: merging output objects ... %s", prefix, fMergePrg.Export(changed));
3399 if (gProofServ) {
3401 } else if (IsTty() || changed) {
3402 fprintf(stderr, "%s\r", msg.Data());
3403 }
3404 // Add or merge it
3405 if ((fPlayer->AddOutputObject(o) == 1)) {
3406 // Remove the object if it has been merged
3407 SafeDelete(o);
3408 }
3409 if (type > 1) {
3410 // Update the merger progress info
3412 if (TestBit(TProof::kIsClient) && !IsLite()) {
3413 // In PROOFLite this has to be done once only in TProofLite::Process
3415 if (pq) {
3417 // Add input objects (do not override remote settings, if any)
3418 TObject *xo = 0;
3419 TIter nxin(fPlayer->GetInputList());
3420 // Servers prior to 5.28/00 do not create the input list in the TQueryResult
3421 if (!pq->GetInputList()) pq->SetInputList(new TList());
3422 while ((xo = nxin()))
3423 if (!pq->GetInputList()->FindObject(xo->GetName()))
3424 pq->AddInput(xo->Clone());
3425 // If the last object, notify the GUI that the result arrived
3426 QueryResultReady(TString::Format("%s:%s", pq->GetTitle(), pq->GetName()));
3427 }
3428 // Processing is over
3429 UpdateDialog();
3430 }
3431 }
3432 }
3433 } else {
3434 Warning("HandleInputMessage", "kPROOF_OUTPUTOBJECT: player undefined!");
3435 }
3436 }
3437 }
3438 break;
3439
3440 case kPROOF_OUTPUTLIST:
3441 {
3442 // We start measuring the merging time
3443
3444 PDB(kGlobal,2)
3445 Info("HandleInputMessage","%s: kPROOF_OUTPUTLIST: enter", sl->GetOrdinal());
3446 TList *out = 0;
3447 if (fPlayer) {
3449 if (TestBit(TProof::kIsMaster) || fProtocol < 7) {
3450 out = (TList *) mess->ReadObject(TList::Class());
3451 } else {
3452 TQueryResult *pq =
3453 (TQueryResult *) mess->ReadObject(TQueryResult::Class());
3454 if (pq) {
3455 // Add query to the result list in TProofPlayer
3458 // To avoid accidental cleanups from anywhere else
3459 // remove objects from gDirectory and clone the list
3460 out = pq->GetOutputList();
3461 CleanGDirectory(out);
3462 out = (TList *) out->Clone();
3463 // Notify the GUI that the result arrived
3464 QueryResultReady(TString::Format("%s:%s", pq->GetTitle(), pq->GetName()));
3465 } else {
3466 PDB(kGlobal,2)
3467 Info("HandleInputMessage",
3468 "%s: kPROOF_OUTPUTLIST: query result missing", sl->GetOrdinal());
3469 }
3470 }
3471 if (out) {
3472 out->SetOwner();
3473 fPlayer->AddOutput(out); // Incorporate the list
3474 SafeDelete(out);
3475 } else {
3476 PDB(kGlobal,2)
3477 Info("HandleInputMessage",
3478 "%s: kPROOF_OUTPUTLIST: outputlist is empty", sl->GetOrdinal());
3479 }
3480 } else {
3481 Warning("HandleInputMessage",
3482 "%s: kPROOF_OUTPUTLIST: player undefined!", sl->GetOrdinal());
3483 }
3484 // On clients at this point processing is over
3485 if (TestBit(TProof::kIsClient) && !IsLite())
3486 UpdateDialog();
3487 }
3488 break;
3489
3490 case kPROOF_QUERYLIST:
3491 {
3492 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_QUERYLIST: enter");
3493 (*mess) >> fOtherQueries >> fDrawQueries;
3494 if (fQueries) {
3495 fQueries->Delete();
3496 delete fQueries;
3497 fQueries = 0;
3498 }
3499 fQueries = (TList *) mess->ReadObject(TList::Class());
3500 }
3501 break;
3502
3503 case kPROOF_RETRIEVE:
3504 {
3505 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_RETRIEVE: enter");
3506 TQueryResult *pq =
3507 (TQueryResult *) mess->ReadObject(TQueryResult::Class());
3508 if (pq && fPlayer) {
3510 // Notify the GUI that the result arrived
3511 QueryResultReady(TString::Format("%s:%s", pq->GetTitle(), pq->GetName()));
3512 } else {
3513 PDB(kGlobal,2)
3514 Info("HandleInputMessage",
3515 "kPROOF_RETRIEVE: query result missing or player undefined");
3516 }
3517 }
3518 break;
3519
3520 case kPROOF_MAXQUERIES:
3521 {
3522 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_MAXQUERIES: enter");
3523 Int_t max = 0;
3524
3525 (*mess) >> max;
3526 Printf("Number of queries fully kept remotely: %d", max);
3527 }
3528 break;
3529
3531 {
3532 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_SERVERSTARTED: enter");
3533
3534 UInt_t tot = 0, done = 0;
3535 TString action;
3536 Bool_t st = kTRUE;
3537
3538 (*mess) >> action >> tot >> done >> st;
3539
3541 if (tot) {
3542 TString type = (action.Contains("submas")) ? "submasters"
3543 : "workers";
3544 Int_t frac = (Int_t) (done*100.)/tot;
3545 char msg[512] = {0};
3546 if (frac >= 100) {
3547 snprintf(msg, 512, "%s: OK (%d %s) \n",
3548 action.Data(),tot, type.Data());
3549 } else {
3550 snprintf(msg, 512, "%s: %d out of %d (%d %%)\r",
3551 action.Data(), done, tot, frac);
3552 }
3553 if (fSync)
3554 fprintf(stderr,"%s", msg);
3555 else
3556 NotifyLogMsg(msg, 0);
3557 }
3558 // Notify GUIs
3559 StartupMessage(action.Data(), st, (Int_t)done, (Int_t)tot);
3560 } else {
3561
3562 // Just send the message one level up
3564 m << action << tot << done << st;
3566 }
3567 }
3568 break;
3569
3571 {
3572 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_DATASET_STATUS: enter");
3573
3574 UInt_t tot = 0, done = 0;
3575 TString action;
3576 Bool_t st = kTRUE;
3577
3578 (*mess) >> action >> tot >> done >> st;
3579
3581 if (tot) {
3582 TString type = "files";
3583 Int_t frac = (Int_t) (done*100.)/tot;
3584 char msg[512] = {0};
3585 if (frac >= 100) {
3586 snprintf(msg, 512, "%s: OK (%d %s) \n",
3587 action.Data(),tot, type.Data());
3588 } else {
3589 snprintf(msg, 512, "%s: %d out of %d (%d %%)\r",
3590 action.Data(), done, tot, frac);
3591 }
3592 if (fSync)
3593 fprintf(stderr,"%s", msg);
3594 else
3595 NotifyLogMsg(msg, 0);
3596 }
3597 // Notify GUIs
3598 DataSetStatus(action.Data(), st, (Int_t)done, (Int_t)tot);
3599 } else {
3600
3601 // Just send the message one level up
3603 m << action << tot << done << st;
3605 }
3606 }
3607 break;
3608
3610 {
3611 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_STARTPROCESS: enter");
3612
3613 // For Proof-Lite this variable is the number of workers and is set
3614 // by the player
3615 if (!IsLite()) {
3616 fNotIdle = 1;
3618 }
3619
3620 // Redirect the output, if needed
3622
3623 // The signal is used on masters by XrdProofdProtocol to catch
3624 // the start of processing; on clients it allows to update the
3625 // progress dialog
3626 if (!TestBit(TProof::kIsMaster)) {
3627
3628 // This is the end of preparation
3629 fQuerySTW.Stop();
3631 PDB(kGlobal,2) Info("HandleInputMessage","Preparation time: %f s", fPrepTime);
3632
3633 TString selec;
3634 Int_t dsz = -1;
3635 Long64_t first = -1, nent = -1;
3636 (*mess) >> selec >> dsz >> first >> nent;
3637 // Start or reset the progress dialog
3638 if (!gROOT->IsBatch()) {
3639 if (fProgressDialog &&
3642 fProgressDialog->ExecPlugin(5, this,
3643 selec.Data(), dsz, first, nent);
3645 } else {
3646 ResetProgressDialog(selec, dsz, first, nent);
3647 }
3648 }
3650 }
3651 }
3652 }
3653 break;
3654
3655 case kPROOF_ENDINIT:
3656 {
3657 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_ENDINIT: enter");
3658
3660 if (fPlayer)
3662 }
3663 }
3664 break;
3665
3666 case kPROOF_SETIDLE:
3667 {
3668 PDB(kGlobal,2)
3669 Info("HandleInputMessage","kPROOF_SETIDLE from '%s': enter (%d)", sl->GetOrdinal(), fNotIdle);
3670
3671 // The session is idle
3672 if (IsLite()) {
3673 if (fNotIdle > 0) {
3674 fNotIdle--;
3675 PDB(kGlobal,2)
3676 Info("HandleInputMessage", "%s: got kPROOF_SETIDLE", sl->GetOrdinal());
3677 } else {
3678 Warning("HandleInputMessage",
3679 "%s: got kPROOF_SETIDLE but no running workers ! protocol error?",
3680 sl->GetOrdinal());
3681 }
3682 } else {
3683 fNotIdle = 0;
3684 // Check if the query has been enqueued
3685 if ((mess->BufferSize() > mess->Length()))
3686 (*mess) >> fIsWaiting;
3687 }
3688 }
3689 break;
3690
3692 {
3693 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_QUERYSUBMITTED: enter");
3694
3695 // We have received the sequential number
3696 (*mess) >> fSeqNum;
3697 Bool_t sync = fSync;
3698 if ((mess->BufferSize() > mess->Length()))
3699 (*mess) >> sync;
3700 if (sync != fSync && fSync) {
3701 // The server required to switch to asynchronous mode
3702 Activate();
3703 fSync = kFALSE;
3704 }
3705 DisableGoAsyn();
3706 // Check if the query has been enqueued
3707 fIsWaiting = kTRUE;
3708 // For Proof-Lite this variable is the number of workers and is set by the player
3709 if (!IsLite())
3710 fNotIdle = 1;
3711
3712 rc = 1;
3713 }
3714 break;
3715
3716 case kPROOF_SESSIONTAG:
3717 {
3718 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_SESSIONTAG: enter");
3719
3720 // We have received the unique tag and save it as name of this object
3721 TString stag;
3722 (*mess) >> stag;
3723 SetName(stag);
3724 // In the TSlave object
3725 sl->SetSessionTag(stag);
3726 // Server may have also sent the group
3727 if ((mess->BufferSize() > mess->Length()))
3728 (*mess) >> fGroup;
3729 // Server may have also sent the user
3730 if ((mess->BufferSize() > mess->Length())) {
3731 TString usr;
3732 (*mess) >> usr;
3733 if (!usr.IsNull()) fUrl.SetUser(usr.Data());
3734 }
3735 }
3736 break;
3737
3738 case kPROOF_FEEDBACK:
3739 {
3740 PDB(kGlobal,2)
3741 Info("HandleInputMessage","kPROOF_FEEDBACK: enter");
3742 TList *out = (TList *) mess->ReadObject(TList::Class());
3743 out->SetOwner();
3744 if (fPlayer)
3745 fPlayer->StoreFeedback(sl, out); // Adopts the list
3746 else
3747 // Not yet ready: stop collect asap
3748 rc = 1;
3749 }
3750 break;
3751
3752 case kPROOF_AUTOBIN:
3753 {
3754 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_AUTOBIN: enter");
3755
3756 TString name;
3757 Double_t xmin, xmax, ymin, ymax, zmin, zmax;
3758
3759 (*mess) >> name >> xmin >> xmax >> ymin >> ymax >> zmin >> zmax;
3760
3762
3764
3765 answ << name << xmin << xmax << ymin << ymax << zmin << zmax;
3766
3767 s->Send(answ);
3768 }
3769 break;
3770
3771 case kPROOF_PROGRESS:
3772 {
3773 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_PROGRESS: enter");
3774
3775 if (GetRemoteProtocol() > 25) {
3776 // New format
3777 TProofProgressInfo *pi = 0;
3778 (*mess) >> pi;
3779 fPlayer->Progress(sl,pi);
3780 } else if (GetRemoteProtocol() > 11) {
3781 Long64_t total, processed, bytesread;
3782 Float_t initTime, procTime, evtrti, mbrti;
3783 (*mess) >> total >> processed >> bytesread
3784 >> initTime >> procTime
3785 >> evtrti >> mbrti;
3786 if (fPlayer)
3787 fPlayer->Progress(sl, total, processed, bytesread,
3788 initTime, procTime, evtrti, mbrti);
3789
3790 } else {
3791 // Old format
3792 Long64_t total, processed;
3793 (*mess) >> total >> processed;
3794 if (fPlayer)
3795 fPlayer->Progress(sl, total, processed);
3796 }
3797 }
3798 break;
3799
3800 case kPROOF_STOPPROCESS:
3801 {
3802 // This message is sent from a worker that finished processing.
3803 // We determine whether it was asked to finish by the
3804 // packetizer or stopped during processing a packet
3805 // (by TProof::RemoveWorkers() or by an external signal).
3806 // In the later case call packetizer->MarkBad.
3807 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_STOPPROCESS: enter");
3808
3809 Long64_t events = 0;
3810 Bool_t abort = kFALSE;
3811 TProofProgressStatus *status = 0;
3812
3813 if ((mess->BufferSize() > mess->Length()) && (fProtocol > 18)) {
3814 (*mess) >> status >> abort;
3815 } else if ((mess->BufferSize() > mess->Length()) && (fProtocol > 8)) {
3816 (*mess) >> events >> abort;
3817 } else {
3818 (*mess) >> events;
3819 }
3820 if (fPlayer) {
3821 if (fProtocol > 18) {
3822 TList *listOfMissingFiles = 0;
3823 if (!(listOfMissingFiles = (TList *)GetOutput("MissingFiles"))) {
3824 listOfMissingFiles = new TList();
3825 listOfMissingFiles->SetName("MissingFiles");
3826 if (fPlayer)
3827 fPlayer->AddOutputObject(listOfMissingFiles);
3828 }
3829 if (fPlayer->GetPacketizer()) {
3830 Int_t ret =
3831 fPlayer->GetPacketizer()->AddProcessed(sl, status, 0, &listOfMissingFiles);
3832 if (ret > 0)
3833 fPlayer->GetPacketizer()->MarkBad(sl, status, &listOfMissingFiles);
3834 // This object is now owned by the packetizer
3835 status = 0;
3836 }
3837 if (status) fPlayer->AddEventsProcessed(status->GetEntries());
3838 } else {
3839 fPlayer->AddEventsProcessed(events);
3840 }
3841 }
3842 SafeDelete(status);
3844 Emit("StopProcess(Bool_t)", abort);
3845 break;
3846 }
3847
3848 case kPROOF_SUBMERGER:
3849 {
3850 PDB(kGlobal,2) Info("HandleInputMessage", "kPROOF_SUBMERGER: enter");
3851 HandleSubmerger(mess, sl);
3852 }
3853 break;
3854
3856 {
3857 PDB(kGlobal,2) Info("HandleInputMessage", "kPROOF_GETSLAVEINFO: enter");
3858
3859 Bool_t active = (GetListOfActiveSlaves()->FindObject(sl) != 0);
3860 Bool_t bad = (GetListOfBadSlaves()->FindObject(sl) != 0);
3861 TList* tmpinfo = 0;
3862 (*mess) >> tmpinfo;
3863 if (tmpinfo == 0) {
3864 Error("HandleInputMessage", "kPROOF_GETSLAVEINFO: no list received!");
3865 } else {
3866 tmpinfo->SetOwner(kFALSE);
3867 Int_t nentries = tmpinfo->GetSize();
3868 for (Int_t i=0; i<nentries; i++) {
3869 TSlaveInfo* slinfo =
3870 dynamic_cast<TSlaveInfo*>(tmpinfo->At(i));
3871 if (slinfo) {
3872 // If PROOF-Lite
3873 if (IsLite()) slinfo->fHostName = gSystem->HostName();
3874 // Check if we have already a instance for this worker
3875 TIter nxw(fSlaveInfo);
3876 TSlaveInfo *ourwi = 0;
3877 while ((ourwi = (TSlaveInfo *)nxw())) {
3878 if (!strcmp(ourwi->GetOrdinal(), slinfo->GetOrdinal())) {
3879 ourwi->SetSysInfo(slinfo->GetSysInfo());
3880 ourwi->fHostName = slinfo->GetName();
3881 if (slinfo->GetDataDir() && (strlen(slinfo->GetDataDir()) > 0))
3882 ourwi->fDataDir = slinfo->GetDataDir();
3883 break;
3884 }
3885 }
3886 if (!ourwi) {
3887 fSlaveInfo->Add(slinfo);
3888 } else {
3889 slinfo = ourwi;
3890 }
3891 if (slinfo->fStatus != TSlaveInfo::kBad) {
3892 if (!active) slinfo->SetStatus(TSlaveInfo::kNotActive);
3893 if (bad) slinfo->SetStatus(TSlaveInfo::kBad);
3894 }
3895 if (sl->GetMsd() && (strlen(sl->GetMsd()) > 0))
3896 slinfo->fMsd = sl->GetMsd();
3897 }
3898 }
3899 delete tmpinfo;
3900 rc = 1;
3901 }
3902 }
3903 break;
3904
3906 {
3907 PDB(kGlobal,2)
3908 Info("HandleInputMessage", "kPROOF_VALIDATE_DSET: enter");
3909 TDSet* dset = 0;
3910 (*mess) >> dset;
3911 if (!fDSet)
3912 Error("HandleInputMessage", "kPROOF_VALIDATE_DSET: fDSet not set");
3913 else
3914 fDSet->Validate(dset);
3915 delete dset;
3916 }
3917 break;
3918
3919 case kPROOF_DATA_READY:
3920 {
3921 PDB(kGlobal,2) Info("HandleInputMessage", "kPROOF_DATA_READY: enter");
3922 Bool_t dataready = kFALSE;
3923 Long64_t totalbytes, bytesready;
3924 (*mess) >> dataready >> totalbytes >> bytesready;
3925 fTotalBytes += totalbytes;
3926 fBytesReady += bytesready;
3927 if (dataready == kFALSE) fDataReady = dataready;
3928 }
3929 break;
3930
3931 case kPROOF_PING:
3932 // do nothing (ping is already acknowledged)
3933 break;
3934
3935 case kPROOF_MESSAGE:
3936 {
3937 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_MESSAGE: enter");
3938
3939 // We have received the unique tag and save it as name of this object
3940 TString msg;
3941 (*mess) >> msg;
3942 Bool_t lfeed = kTRUE;
3943 if ((mess->BufferSize() > mess->Length()))
3944 (*mess) >> lfeed;
3945
3947
3948 if (fSync) {
3949 // Notify locally
3950 fprintf(stderr,"%s%c", msg.Data(), (lfeed ? '\n' : '\r'));
3951 } else {
3952 // Notify locally taking care of redirection, windows logs, ...
3953 NotifyLogMsg(msg, (lfeed ? "\n" : "\r"));
3954 }
3955 } else {
3956
3957 // The message is logged for debugging purposes.
3958 fprintf(stderr,"%s%c", msg.Data(), (lfeed ? '\n' : '\r'));
3959 if (gProofServ) {
3960 // We hide it during normal operations
3962
3963 // And send the message one level up
3964 gProofServ->SendAsynMessage(msg, lfeed);
3965 }
3966 }
3967 }
3968 break;
3969
3971 {
3972 TString vac;
3973 (*mess) >> vac;
3974 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_VERSARCHCOMP: %s", vac.Data());
3975 Int_t from = 0;
3976 TString vers, archcomp;
3977 if (vac.Tokenize(vers, from, "|"))
3978 vac.Tokenize(archcomp, from, "|");
3979 sl->SetArchCompiler(archcomp);
3980 vers.ReplaceAll(":","|");
3981 sl->SetROOTVersion(vers);
3982 }
3983 break;
3984
3985 default:
3986 {
3987 Error("HandleInputMessage", "unknown command received from '%s' (what = %d)",
3988 sl->GetOrdinal(), what);
3989 }
3990 break;
3991 }
3992
3993 // Cleanup
3994 if (delete_mess)
3995 delete mess;
3996
3997 // We are done successfully
3998 return rc;
3999}
4000
4001////////////////////////////////////////////////////////////////////////////////
4002/// Process a message of type kPROOF_SUBMERGER
4003
4005{
4006 // Message sub-type
4007 Int_t type = 0;
4008 (*mess) >> type;
4009 TSocket *s = sl->GetSocket();
4010
4011 switch (type) {
4012 case kOutputSent:
4013 {
4014 if (IsEndMaster()) {
4015 Int_t merger_id = -1;
4016 (*mess) >> merger_id;
4017
4018 PDB(kSubmerger, 2)
4019 Info("HandleSubmerger", "kOutputSent: Worker %s:%d:%s had sent its output to merger #%d",
4020 sl->GetName(), sl->GetPort(), sl->GetOrdinal(), merger_id);
4021
4022 if (!fMergers || fMergers->GetSize() <= merger_id) {
4023 Error("HandleSubmerger", "kOutputSize: #%d not in list ", merger_id);
4024 break;
4025 }
4026 TMergerInfo * mi = (TMergerInfo *) fMergers->At(merger_id);
4027 mi->SetMergedWorker();
4028 if (mi->AreAllWorkersMerged()) {
4029 mi->Deactivate();
4030 if (GetActiveMergersCount() == 0) {
4031 fMergers->Clear();
4032 delete fMergers;
4034 fMergersCount = -1;
4036 PDB(kSubmerger, 2) Info("HandleSubmerger", "all mergers removed ... ");
4037 }
4038 }
4039 } else {
4040 PDB(kSubmerger, 2) Error("HandleSubmerger","kOutputSent: received not on endmaster!");
4041 }
4042 }
4043 break;
4044
4045 case kMergerDown:
4046 {
4047 Int_t merger_id = -1;
4048 (*mess) >> merger_id;
4049
4050 PDB(kSubmerger, 2) Info("HandleSubmerger", "kMergerDown: #%d ", merger_id);
4051
4052 if (!fMergers || fMergers->GetSize() <= merger_id) {
4053 Error("HandleSubmerger", "kMergerDown: #%d not in list ", merger_id);
4054 break;
4055 }
4056
4057 TMergerInfo * mi = (TMergerInfo *) fMergers->At(merger_id);
4058 if (!mi->IsActive()) {
4059 break;
4060 } else {
4061 mi->Deactivate();
4062 }
4063
4064 // Stop the invalid merger in the case it is still listening
4066 stop << Int_t(kStopMerging);
4067 stop << 0;
4068 s->Send(stop);
4069
4070 // Ask for results from merger (only original results from this node as worker are returned)
4071 AskForOutput(mi->GetMerger());
4072
4073 // Ask for results from all workers assigned to this merger
4074 TIter nxo(mi->GetWorkers());
4075 TObject * o = 0;
4076 while ((o = nxo())) {
4077 AskForOutput((TSlave *)o);
4078 }
4079 PDB(kSubmerger, 2) Info("HandleSubmerger", "kMergerDown:%d: exit", merger_id);
4080 }
4081 break;
4082
4083 case kOutputSize:
4084 {
4085 if (IsEndMaster()) {
4086 PDB(kSubmerger, 2)
4087 Info("HandleSubmerger", "worker %s reported as finished ", sl->GetOrdinal());
4088
4089 const char *prefix = gProofServ ? gProofServ->GetPrefix() : "Lite-0";
4090 if (!fFinalizationRunning) {
4091 Info("HandleSubmerger", "finalization on %s started ...", prefix);
4093 }
4094
4095 Int_t output_size = 0;
4096 Int_t merging_port = 0;
4097 (*mess) >> output_size >> merging_port;
4098
4099 PDB(kSubmerger, 2) Info("HandleSubmerger",
4100 "kOutputSize: Worker %s:%d:%s reports %d output objects (+ available port %d)",
4101 sl->GetName(), sl->GetPort(), sl->GetOrdinal(), output_size, merging_port);
4102 TString msg;
4103 if (!fMergersSet) {
4104
4106
4107 // First pass - setting number of mergers according to user or dynamically
4108 fMergersCount = -1; // No mergers used if not set by user
4109 TParameter<Int_t> *mc = dynamic_cast<TParameter<Int_t> *>(GetParameter("PROOF_UseMergers"));
4110 if (mc) fMergersCount = mc->GetVal(); // Value set by user
4111 TParameter<Int_t> *mh = dynamic_cast<TParameter<Int_t> *>(GetParameter("PROOF_MergersByHost"));
4112 if (mh) fMergersByHost = (mh->GetVal() != 0) ? kTRUE : kFALSE; // Assign submergers by hostname
4113
4114 // Mergers count specified by user but not valid
4115 if (fMergersCount < 0 || (fMergersCount > (activeWorkers/2) )) {
4116 msg.Form("%s: Invalid request: cannot start %d mergers for %d workers",
4117 prefix, fMergersCount, activeWorkers);
4118 if (gProofServ)
4120 else
4121 Printf("%s",msg.Data());
4122 fMergersCount = 0;
4123 }
4124 // Mergers count will be set dynamically
4125 if ((fMergersCount == 0) && (!fMergersByHost)) {
4126 if (activeWorkers > 1) {
4127 fMergersCount = TMath::Nint(TMath::Sqrt(activeWorkers));
4128 if (activeWorkers / fMergersCount < 2)
4129 fMergersCount = (Int_t) TMath::Sqrt(activeWorkers);
4130 }
4131 if (fMergersCount > 1)
4132 msg.Form("%s: Number of mergers set dynamically to %d (for %d workers)",
4133 prefix, fMergersCount, activeWorkers);
4134 else {
4135 msg.Form("%s: No mergers will be used for %d workers",
4136 prefix, activeWorkers);
4137 fMergersCount = -1;
4138 }
4139 if (gProofServ)
4141 else
4142 Printf("%s",msg.Data());
4143 } else if (fMergersByHost) {
4144 // We force mergers at host level to minimize network traffic
4145 if (activeWorkers > 1) {
4146 fMergersCount = 0;
4147 THashList hosts;
4148 TIter nxwk(fSlaves);
4149 TObject *wrk = 0;
4150 while ((wrk = nxwk())) {
4151 if (!hosts.FindObject(wrk->GetName())) {
4152 hosts.Add(new TObjString(wrk->GetName()));
4153 fMergersCount++;
4154 }
4155 }
4156 }
4157 if (fMergersCount > 1)
4158 msg.Form("%s: Number of mergers set to %d (for %d workers), one for each slave host",
4159 prefix, fMergersCount, activeWorkers);
4160 else {
4161 msg.Form("%s: No mergers will be used for %d workers",
4162 prefix, activeWorkers);
4163 fMergersCount = -1;
4164 }
4165 if (gProofServ)
4167 else
4168 Printf("%s",msg.Data());
4169 } else {
4170 msg.Form("%s: Number of mergers set by user to %d (for %d workers)",
4171 prefix, fMergersCount, activeWorkers);
4172 if (gProofServ)
4174 else
4175 Printf("%s",msg.Data());
4176 }
4177
4178 // We started merging; we call it here because fMergersCount is still the original number
4179 // and can be saved internally
4181
4182 // Update merger counters (new workers are not yet active)
4184
4185 if (fMergersCount > 0) {
4186
4187 fMergers = new TList();
4189 // Total number of workers, which will not act as mergers ('pure workers')
4190 fWorkersToMerge = (activeWorkers - fMergersCount);
4191 // Establish the first merger
4192 if (!CreateMerger(sl, merging_port)) {
4193 // Cannot establish first merger
4194 AskForOutput(sl);
4196 fMergersCount--;
4197 }
4199 } else {
4200 AskForOutput(sl);
4201 }
4203 } else {
4204 // Multiple pass
4205 if (fMergersCount == -1) {
4206 // No mergers. Workers send their outputs directly to master
4207 AskForOutput(sl);
4208 } else {
4209 if ((fRedirectNext > 0 ) && (!fMergersByHost)) {
4210 RedirectWorker(s, sl, output_size);
4211 fRedirectNext--;
4212 } else {
4213 Bool_t newMerger = kTRUE;
4214 if (fMergersByHost) {
4215 TIter nxmg(fMergers);
4216 TMergerInfo *mgi = 0;
4217 while ((mgi = (TMergerInfo *) nxmg())) {
4218 if (!strcmp(sl->GetName(), mgi->GetMerger()->GetName())) {
4219 newMerger = kFALSE;
4220 break;
4221 }
4222 }
4223 }
4224 if ((fMergersCount > fMergers->GetSize()) && newMerger) {
4225 // Still not enough mergers established
4226 if (!CreateMerger(sl, merging_port)) {
4227 // Cannot establish a merger
4228 AskForOutput(sl);
4230 fMergersCount--;
4231 }
4232 } else
4233 RedirectWorker(s, sl, output_size);
4234 }
4235 }
4236 }
4237 } else {
4238 Error("HandleSubMerger","kOutputSize received not on endmaster!");
4239 }
4240 }
4241 break;
4242 }
4243}
4244
4245////////////////////////////////////////////////////////////////////////////////
4246/// Redirect output of worker sl to some merger
4247
4248void TProof::RedirectWorker(TSocket *s, TSlave * sl, Int_t output_size)
4249{
4250 Int_t merger_id = -1;
4251
4252 if (fMergersByHost) {
4253 for (Int_t i = 0; i < fMergers->GetSize(); i++) {
4254 TMergerInfo *mgi = (TMergerInfo *)fMergers->At(i);
4255 if (!strcmp(sl->GetName(), mgi->GetMerger()->GetName())) {
4256 merger_id = i;
4257 break;
4258 }
4259 }
4260 } else {
4261 merger_id = FindNextFreeMerger();
4262 }
4263
4264 if (merger_id == -1) {
4265 // No free merger (probably it had crashed before)
4266 AskForOutput(sl);
4267 } else {
4268 TMessage sendoutput(kPROOF_SUBMERGER);
4269 sendoutput << Int_t(kSendOutput);
4270 PDB(kSubmerger, 2)
4271 Info("RedirectWorker", "redirecting worker %s to merger %d", sl->GetOrdinal(), merger_id);
4272
4273 PDB(kSubmerger, 2) Info("RedirectWorker", "redirecting output to merger #%d", merger_id);
4274 if (!fMergers || fMergers->GetSize() <= merger_id) {
4275 Error("RedirectWorker", "#%d not in list ", merger_id);
4276 return;
4277 }
4278 TMergerInfo * mi = (TMergerInfo *) fMergers->At(merger_id);
4279
4280 TString hname = (IsLite()) ? "localhost" : mi->GetMerger()->GetName();
4281 sendoutput << merger_id;
4282 sendoutput << hname;
4283 sendoutput << mi->GetPort();
4284 s->Send(sendoutput);
4285 mi->AddMergedObjects(output_size);
4286 mi->AddWorker(sl);
4287 }
4288}
4289
4290////////////////////////////////////////////////////////////////////////////////
4291/// Return a merger, which is both active and still accepts some workers to be
4292/// assigned to it. It works on the 'round-robin' basis.
4293
4295{
4296 while (fLastAssignedMerger < fMergers->GetSize() &&
4297 (!((TMergerInfo*)fMergers->At(fLastAssignedMerger))->IsActive() ||
4298 ((TMergerInfo*)fMergers->At(fLastAssignedMerger))->AreAllWorkersAssigned())) {
4300 }
4301
4304 } else {
4305 return fLastAssignedMerger++;
4306 }
4307
4308 while (fLastAssignedMerger < fMergers->GetSize() &&
4309 (!((TMergerInfo*)fMergers->At(fLastAssignedMerger))->IsActive() ||
4310 ((TMergerInfo*)fMergers->At(fLastAssignedMerger))->AreAllWorkersAssigned())) {
4312 }
4313
4315 return -1;
4316 } else {
4317 return fLastAssignedMerger++;
4318 }
4319}
4320
4321////////////////////////////////////////////////////////////////////////////////
4322/// Master asks for output from worker sl
4323
4325{
4326 TMessage sendoutput(kPROOF_SUBMERGER);
4327 sendoutput << Int_t(kSendOutput);
4328
4329 PDB(kSubmerger, 2) Info("AskForOutput",
4330 "worker %s was asked to send its output to master",
4331 sl->GetOrdinal());
4332
4333 sendoutput << -1;
4334 sendoutput << TString("master");
4335 sendoutput << -1;
4336 sl->GetSocket()->Send(sendoutput);
4338}
4339
4340////////////////////////////////////////////////////////////////////////////////
4341/// Final update of the progress dialog
4342
4344{
4345 if (!fPlayer) return;
4346
4347 // Handle abort ...
4349 if (fSync)
4350 Info("UpdateDialog",
4351 "processing was aborted - %lld events processed",
4353
4354 if (GetRemoteProtocol() > 11) {
4355 // New format
4356 Progress(-1, fPlayer->GetEventsProcessed(), -1, -1., -1., -1., -1.);
4357 } else {
4359 }
4360 Emit("StopProcess(Bool_t)", kTRUE);
4361 }
4362
4363 // Handle stop ...
4365 if (fSync)
4366 Info("UpdateDialog",
4367 "processing was stopped - %lld events processed",
4369
4370 if (GetRemoteProtocol() > 25) {
4371 // New format
4372 Progress(-1, fPlayer->GetEventsProcessed(), -1, -1., -1., -1., -1., -1, -1, -1.);
4373 } else if (GetRemoteProtocol() > 11) {
4374 Progress(-1, fPlayer->GetEventsProcessed(), -1, -1., -1., -1., -1.);
4375 } else {
4377 }
4378 Emit("StopProcess(Bool_t)", kFALSE);
4379 }
4380
4381 // Final update of the dialog box
4382 if (GetRemoteProtocol() > 25) {
4383 // New format
4384 EmitVA("Progress(Long64_t,Long64_t,Long64_t,Float_t,Float_t,Float_t,Float_t,Int_t,Int_t,Float_t)",
4385 10, (Long64_t)(-1), (Long64_t)(-1), (Long64_t)(-1),(Float_t)(-1.),(Float_t)(-1.),
4386 (Float_t)(-1.),(Float_t)(-1.),(Int_t)(-1),(Int_t)(-1),(Float_t)(-1.));
4387 } else if (GetRemoteProtocol() > 11) {
4388 // New format
4389 EmitVA("Progress(Long64_t,Long64_t,Long64_t,Float_t,Float_t,Float_t,Float_t)",
4390 7, (Long64_t)(-1), (Long64_t)(-1), (Long64_t)(-1),
4391 (Float_t)(-1.),(Float_t)(-1.),(Float_t)(-1.),(Float_t)(-1.));
4392 } else {
4393 EmitVA("Progress(Long64_t,Long64_t)", 2, (Long64_t)(-1), (Long64_t)(-1));
4394 }
4395}
4396
4397////////////////////////////////////////////////////////////////////////////////
4398/// Activate the a-sync input handler.
4399
4401{
4402 TIter next(fSlaves);
4403 TSlave *sl;
4404
4405 while ((sl = (TSlave*) next()))
4406 if (sl->GetInputHandler())
4407 sl->GetInputHandler()->Add();
4408}
4409
4410////////////////////////////////////////////////////////////////////////////////
4411/// De-activate a-sync input handler.
4412
4414{
4415 TIter next(fSlaves);
4416 TSlave *sl;
4417
4418 while ((sl = (TSlave*) next()))
4419 if (sl->GetInputHandler())
4420 sl->GetInputHandler()->Remove();
4421}
4422
4423////////////////////////////////////////////////////////////////////////////////
4424/// Get the active mergers count
4425
4427{
4428 if (!fMergers) return 0;
4429
4430 Int_t active_mergers = 0;
4431
4432 TIter mergers(fMergers);
4433 TMergerInfo *mi = 0;
4434 while ((mi = (TMergerInfo *)mergers())) {
4435 if (mi->IsActive()) active_mergers++;
4436 }
4437
4438 return active_mergers;
4439}
4440
4441////////////////////////////////////////////////////////////////////////////////
4442/// Create a new merger
4443
4445{
4446 PDB(kSubmerger, 2)
4447 Info("CreateMerger", "worker %s will be merger ", sl->GetOrdinal());
4448
4449 PDB(kSubmerger, 2) Info("CreateMerger","Begin");
4450
4451 if (port <= 0) {
4452 PDB(kSubmerger,2)
4453 Info("CreateMerger", "cannot create merger on port %d - exit", port);
4454 return kFALSE;
4455 }
4456
4457 Int_t workers = -1;
4458 if (!fMergersByHost) {
4459 Int_t mergersToCreate = fMergersCount - fMergers->GetSize();
4460 // Number of pure workers, which are not simply divisible by mergers
4461 Int_t rest = fWorkersToMerge % mergersToCreate;
4462 // We add one more worker for each of the first 'rest' mergers being established
4463 if (rest > 0 && fMergers->GetSize() < rest) {
4464 rest = 1;
4465 } else {
4466 rest = 0;
4467 }
4468 workers = (fWorkersToMerge / mergersToCreate) + rest;
4469 } else {
4470 Int_t workersOnHost = 0;
4471 for (Int_t i = 0; i < fActiveSlaves->GetSize(); i++) {
4472 if(!strcmp(sl->GetName(), fActiveSlaves->At(i)->GetName())) workersOnHost++;
4473 }
4474 workers = workersOnHost - 1;
4475 }
4476
4477 TString msg;
4478 msg.Form("worker %s on host %s will be merger for %d additional workers", sl->GetOrdinal(), sl->GetName(), workers);
4479
4480 if (gProofServ) {
4482 } else {
4483 Printf("%s",msg.Data());
4484 }
4485 TMergerInfo * merger = new TMergerInfo(sl, port, workers);
4486
4487 TMessage bemerger(kPROOF_SUBMERGER);
4488 bemerger << Int_t(kBeMerger);
4489 bemerger << fMergers->GetSize();
4490 bemerger << workers;
4491 sl->GetSocket()->Send(bemerger);
4492
4493 PDB(kSubmerger,2) Info("CreateMerger",
4494 "merger #%d (port: %d) for %d workers started",
4495 fMergers->GetSize(), port, workers);
4496
4497 fMergers->Add(merger);
4498 fWorkersToMerge = fWorkersToMerge - workers;
4499
4500 fRedirectNext = workers / 2;
4501
4502 PDB(kSubmerger, 2) Info("CreateMerger", "exit");
4503 return kTRUE;
4504}
4505
4506////////////////////////////////////////////////////////////////////////////////
4507/// Add a bad slave server to the bad slave list and remove it from
4508/// the active list and from the two monitor objects. Assume that the work
4509/// done by this worker was lost and ask packerizer to reassign it.
4510
4511void TProof::MarkBad(TSlave *wrk, const char *reason)
4512{
4513 std::lock_guard<std::recursive_mutex> lock(fCloseMutex);
4514
4515 // We may have been invalidated in the meanwhile: nothing to do in such a case
4516 if (!IsValid()) return;
4517
4518 if (!wrk) {
4519 Error("MarkBad", "worker instance undefined: protocol error? ");
4520 return;
4521 }
4522
4523 // Local URL
4524 static TString thisurl;
4525 if (thisurl.IsNull()) {
4526 if (IsMaster()) {
4527 Int_t port = gEnv->GetValue("ProofServ.XpdPort",-1);
4528 thisurl = TUrl(gSystem->HostName()).GetHostFQDN();
4529 if (port > 0) thisurl += TString::Format(":%d", port);
4530 } else {
4531 thisurl.Form("%s@%s:%d", fUrl.GetUser(), fUrl.GetHost(), fUrl.GetPort());
4532 }
4533 }
4534
4535 if (!reason || (strcmp(reason, kPROOF_TerminateWorker) && strcmp(reason, kPROOF_WorkerIdleTO))) {
4536 // Message for notification
4537 const char *mastertype = (gProofServ && gProofServ->IsTopMaster()) ? "top master" : "master";
4538 TString src = IsMaster() ? Form("%s at %s", mastertype, thisurl.Data()) : "local session";
4539 TString msg;
4540 msg.Form("\n +++ Message from %s : marking %s:%d (%s) as bad\n +++ Reason: %s",
4541 src.Data(), wrk->GetName(), wrk->GetPort(), wrk->GetOrdinal(),
4542 (reason && strlen(reason)) ? reason : "unknown");
4543 Info("MarkBad", "%s", msg.Data());
4544 // Notify one level up, if the case
4545 // Add some hint for diagnostics
4546 if (gProofServ) {
4547 msg += TString::Format("\n\n +++ Most likely your code crashed on worker %s at %s:%d.\n",
4548 wrk->GetOrdinal(), wrk->GetName(), wrk->GetPort());
4549 } else {
4550 msg += TString::Format("\n\n +++ Most likely your code crashed\n");
4551 }
4552 msg += TString::Format(" +++ Please check the session logs for error messages either using\n");
4553 msg += TString::Format(" +++ the 'Show logs' button or executing\n");
4554 msg += TString::Format(" +++\n");
4555 if (gProofServ) {
4556 msg += TString::Format(" +++ root [] TProof::Mgr(\"%s\")->GetSessionLogs()->"
4557 "Display(\"%s\",0)\n\n", thisurl.Data(), wrk->GetOrdinal());
4559 } else {
4560 msg += TString::Format(" +++ root [] TProof::Mgr(\"%s\")->GetSessionLogs()->"
4561 "Display(\"*\")\n\n", thisurl.Data());
4562 Printf("%s", msg.Data());
4563 }
4564 } else if (reason) {
4565 if (gDebug > 0 && strcmp(reason, kPROOF_WorkerIdleTO)) {
4566 Info("MarkBad", "worker %s at %s:%d asked to terminate",
4567 wrk->GetOrdinal(), wrk->GetName(), wrk->GetPort());
4568 }
4569 }
4570
4571 if (IsMaster() && reason) {
4572 if (strcmp(reason, kPROOF_TerminateWorker)) {
4573 // if the reason was not a planned termination
4574 TList *listOfMissingFiles = 0;
4575 if (!(listOfMissingFiles = (TList *)GetOutput("MissingFiles"))) {
4576 listOfMissingFiles = new TList();
4577 listOfMissingFiles->SetName("MissingFiles");
4578 if (fPlayer)
4579 fPlayer->AddOutputObject(listOfMissingFiles);
4580 }
4581 // If a query is being processed, assume that the work done by
4582 // the worker was lost and needs to be reassigned.
4583 TVirtualPacketizer *packetizer = fPlayer ? fPlayer->GetPacketizer() : 0;
4584 if (packetizer) {
4585 // the worker was lost so do resubmit the packets
4586 packetizer->MarkBad(wrk, 0, &listOfMissingFiles);
4587 }
4588 } else {
4589 // Tell the coordinator that we are gone
4590 if (gProofServ) {
4591 TString ord(wrk->GetOrdinal());
4592 Int_t id = ord.Last('.');
4593 if (id != kNPOS) ord.Remove(0, id+1);
4595 }
4596 }
4597 } else if (TestBit(TProof::kIsClient) && reason && !strcmp(reason, kPROOF_WorkerIdleTO)) {
4598 // We are invalid after this
4599 fValid = kFALSE;
4600 }
4601
4602 fActiveSlaves->Remove(wrk);
4604
4605 fAllMonitor->Remove(wrk->GetSocket());
4607
4609
4610 if (IsMaster()) {
4611 if (reason && !strcmp(reason, kPROOF_TerminateWorker)) {
4612 // if the reason was a planned termination then delete the worker and
4613 // remove it from all the lists
4614 fSlaves->Remove(wrk);
4615 fBadSlaves->Remove(wrk);
4616 fActiveSlaves->Remove(wrk);
4617 fInactiveSlaves->Remove(wrk);
4618 fUniqueSlaves->Remove(wrk);
4621
4622 // we add it to the list of terminated slave infos instead, so that it
4623 // stays available in the .workers persistent file
4624 TSlaveInfo *si = new TSlaveInfo(
4625 wrk->GetOrdinal(),
4626 Form("%s@%s:%d", wrk->GetUser(), wrk->GetName(), wrk->GetPort()),
4627 0, "", wrk->GetWorkDir());
4629 else delete si;
4630
4631 delete wrk;
4632 } else {
4633 fBadSlaves->Add(wrk);
4634 fActiveSlaves->Remove(wrk);
4635 fUniqueSlaves->Remove(wrk);
4639 wrk->Close();
4640 // Update the mergers count, if needed
4641 if (fMergersSet) {
4642 Int_t mergersCount = -1;
4643 TParameter<Int_t> *mc = dynamic_cast<TParameter<Int_t> *>(GetParameter("PROOF_UseMergers"));
4644 if (mc) mergersCount = mc->GetVal(); // Value set by user
4645 // Mergers count is set dynamically: recalculate it
4646 if (mergersCount == 0) {
4648 if (activeWorkers > 1) {
4649 fMergersCount = TMath::Nint(TMath::Sqrt(activeWorkers));
4650 if (activeWorkers / fMergersCount < 2)
4651 fMergersCount = (Int_t) TMath::Sqrt(activeWorkers);
4652 }
4653 }
4654 }
4655 }
4656
4657 // Update session workers files
4659 } else {
4660 // On clients the proof session should be removed from the lists
4661 // and deleted, since it is not valid anymore
4662 fSlaves->Remove(wrk);
4663 if (fManager)
4664 fManager->DiscardSession(this);
4665 }
4666}
4667
4668////////////////////////////////////////////////////////////////////////////////
4669/// Add slave with socket s to the bad slave list and remove if from
4670/// the active list and from the two monitor objects.
4671
4672void TProof::MarkBad(TSocket *s, const char *reason)
4673{
4674 std::lock_guard<std::recursive_mutex> lock(fCloseMutex);
4675
4676 // We may have been invalidated in the meanwhile: nothing to do in such a case
4677 if (!IsValid()) return;
4678
4679 TSlave *wrk = FindSlave(s);
4680 MarkBad(wrk, reason);
4681}
4682
4683////////////////////////////////////////////////////////////////////////////////
4684/// Ask an active worker 'wrk' to terminate, i.e. to shutdown
4685
4687{
4688 if (!wrk) {
4689 Warning("TerminateWorker", "worker instance undefined: protocol error? ");
4690 return;
4691 }
4692
4693 // Send stop message
4694 if (wrk->GetSocket() && wrk->GetSocket()->IsValid()) {
4695 TMessage mess(kPROOF_STOP);
4696 wrk->GetSocket()->Send(mess);
4697 } else {
4698 if (gDebug > 0)
4699 Info("TerminateWorker", "connection to worker is already down: cannot"
4700 " send termination message");
4701 }
4702
4703 // This is a bad worker from now on
4705}
4706
4707////////////////////////////////////////////////////////////////////////////////
4708/// Ask an active worker 'ord' to terminate, i.e. to shutdown
4709
4710void TProof::TerminateWorker(const char *ord)
4711{
4712 if (ord && strlen(ord) > 0) {
4713 Bool_t all = (ord[0] == '*') ? kTRUE : kFALSE;
4714 if (IsMaster()) {
4715 TIter nxw(fSlaves);
4716 TSlave *wrk = 0;
4717 while ((wrk = (TSlave *)nxw())) {
4718 if (all || !strcmp(wrk->GetOrdinal(), ord)) {
4719 TerminateWorker(wrk);
4720 if (!all) break;
4721 }
4722 }
4723 } else {
4724 TMessage mess(kPROOF_STOP);
4725 mess << TString(ord);
4726 Broadcast(mess);
4727 }
4728 }
4729}
4730
4731////////////////////////////////////////////////////////////////////////////////
4732/// Ping PROOF. Returns 1 if master server responded.
4733
4735{
4736 return Ping(kActive);
4737}
4738
4739////////////////////////////////////////////////////////////////////////////////
4740/// Ping PROOF slaves. Returns the number of slaves that responded.
4741
4743{
4744 TList *slaves = 0;
4745 if (list == kAll) slaves = fSlaves;
4746 if (list == kActive) slaves = fActiveSlaves;
4747 if (list == kUnique) slaves = fUniqueSlaves;
4748 if (list == kAllUnique) slaves = fAllUniqueSlaves;
4749
4750 if (slaves->GetSize() == 0) return 0;
4751
4752 int nsent = 0;
4753 TIter next(slaves);
4754
4755 TSlave *sl;
4756 while ((sl = (TSlave *)next())) {
4757 if (sl->IsValid()) {
4758 if (sl->Ping() == -1) {
4759 MarkBad(sl, "ping unsuccessful");
4760 } else {
4761 nsent++;
4762 }
4763 }
4764 }
4765
4766 return nsent;
4767}