Logo ROOT  
Reference Guide
TProof.cxx
Go to the documentation of this file.
1// @(#)root/proof:$Id: a2a50e759072c37ccbc65ecbcce735a76de86e95 $
2// Author: Fons Rademakers 13/02/97
3
4/*************************************************************************
5 * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12/**
13\defgroup proof PROOF
14
15Classes defining the Parallel ROOT Facility, PROOF, a framework for parallel analysis of ROOT TTrees.
16
17\deprecated
18We keep PROOF for those who still need it for legacy use cases.
19PROOF is not developed anymore and receiving only limited support.
20%ROOT has since a few years moved to RDataFrame and related products as multi-core/multi-processing engines.
21
22*/
23
24/**
25\defgroup proofkernel PROOF kernel Libraries
26\ingroup proof
27
28The PROOF kernel libraries (libProof, libProofPlayer, libProofDraw) contain the classes defining
29the kernel of the PROOF facility, i.e. the protocol and the utilities to steer data processing
30and handling of results.
31
32*/
33
34/** \class TProof
35\ingroup proofkernel
36
37This class controls a Parallel ROOT Facility, PROOF, cluster.
38It fires the worker servers, it keeps track of how many workers are
39running, it keeps track of the workers running status, it broadcasts
40messages to all workers, it collects results, etc.
41
42*/
43
44#include <stdlib.h>
45#include <fcntl.h>
46#include <errno.h>
47#ifdef WIN32
48# include <io.h>
49# include <sys/stat.h>
50# include <sys/types.h>
51# include "snprintf.h"
52#else
53# include <unistd.h>
54#endif
55#include <vector>
56
57#include "RConfigure.h"
58#include "Riostream.h"
59#include "Getline.h"
60#include "TBrowser.h"
61#include "TChain.h"
62#include "TCondor.h"
63#include "TDSet.h"
64#include "TError.h"
65#include "TEnv.h"
66#include "TEntryList.h"
67#include "TEventList.h"
68#include "TFile.h"
69#include "TFileInfo.h"
70#include "TFTP.h"
71#include "THashList.h"
72#include "TInterpreter.h"
73#include "TKey.h"
74#include "TMap.h"
75#include "TMath.h"
76#include "TMessage.h"
77#include "TMonitor.h"
78#include "TObjArray.h"
79#include "TObjString.h"
80#include "TParameter.h"
81#include "TProof.h"
82#include "TProofNodeInfo.h"
83#include "TProofOutputFile.h"
84#include "TVirtualProofPlayer.h"
85#include "TVirtualPacketizer.h"
86#include "TProofServ.h"
87#include "TPluginManager.h"
88#include "TQueryResult.h"
89#include "TRandom.h"
90#include "TRegexp.h"
91#include "TROOT.h"
92#include "TSlave.h"
93#include "TSocket.h"
94#include "TSortedList.h"
95#include "TSystem.h"
96#include "TTree.h"
97#include "TUrl.h"
98#include "TFileCollection.h"
99#include "TDataSetManager.h"
100#include "TDataSetManagerFile.h"
101#include "TMacro.h"
102#include "TSelector.h"
103#include "TPRegexp.h"
104#include "TPackMgr.h"
105
106#include <mutex>
107
109
110// Rotating indicator
111char TProofMergePrg::fgCr[4] = {'-', '\\', '|', '/'};
112
113TList *TProof::fgProofEnvList = 0; // List of env vars for proofserv
114TPluginHandler *TProof::fgLogViewer = 0; // Log viewer handler
115
117
118//----- PROOF Interrupt signal handler -----------------------------------------
119////////////////////////////////////////////////////////////////////////////////
120/// TProof interrupt handler.
121
123{
124 if (!fProof->IsTty() || fProof->GetRemoteProtocol() < 22) {
125
126 // Cannot ask the user : abort any remote processing
128
129 } else {
130 // Real stop or request to switch to asynchronous?
131 const char *a = 0;
132 if (fProof->GetRemoteProtocol() < 22) {
133 a = Getline("\nSwitch to asynchronous mode not supported remotely:"
134 "\nEnter S/s to stop, Q/q to quit, any other key to continue: ");
135 } else {
136 a = Getline("\nEnter A/a to switch asynchronous, S/s to stop, Q/q to quit,"
137 " any other key to continue: ");
138 }
139 if (a[0] == 'Q' || a[0] == 'S' || a[0] == 'q' || a[0] == 's') {
140
141 Info("Notify","Processing interrupt signal ... %c", a[0]);
142
143 // Stop or abort any remote processing
144 Bool_t abort = (a[0] == 'Q' || a[0] == 'q') ? kTRUE : kFALSE;
145 fProof->StopProcess(abort);
146
147 } else if ((a[0] == 'A' || a[0] == 'a') && fProof->GetRemoteProtocol() >= 22) {
148 // Stop any remote processing
150 }
151 }
152
153 return kTRUE;
154}
155
156//----- Input handler for messages from TProofServ -----------------------------
157////////////////////////////////////////////////////////////////////////////////
158/// Constructor
159
161 : TFileHandler(s->GetDescriptor(),1),
162 fSocket(s), fProof(p)
163{
164}
165
166////////////////////////////////////////////////////////////////////////////////
167/// Handle input
168
170{
172 return kTRUE;
173}
174
175
176//------------------------------------------------------------------------------
177
179
180////////////////////////////////////////////////////////////////////////////////
181/// Used to sort slaveinfos by ordinal.
182
184{
185 if (!obj) return 1;
186
187 const TSlaveInfo *si = dynamic_cast<const TSlaveInfo*>(obj);
188
189 if (!si) return fOrdinal.CompareTo(obj->GetName());
190
191 const char *myord = GetOrdinal();
192 const char *otherord = si->GetOrdinal();
193 while (myord && otherord) {
194 Int_t myval = atoi(myord);
195 Int_t otherval = atoi(otherord);
196 if (myval < otherval) return 1;
197 if (myval > otherval) return -1;
198 myord = strchr(myord, '.');
199 if (myord) myord++;
200 otherord = strchr(otherord, '.');
201 if (otherord) otherord++;
202 }
203 if (myord) return -1;
204 if (otherord) return 1;
205 return 0;
206}
207
208////////////////////////////////////////////////////////////////////////////////
209/// Used to compare slaveinfos by ordinal.
210
212{
213 if (!obj) return kFALSE;
214 const TSlaveInfo *si = dynamic_cast<const TSlaveInfo*>(obj);
215 if (!si) return kFALSE;
216 return (strcmp(GetOrdinal(), si->GetOrdinal()) == 0);
217}
218
219////////////////////////////////////////////////////////////////////////////////
220/// Print slave info. If opt = "active" print only the active
221/// slaves, if opt="notactive" print only the not active slaves,
222/// if opt = "bad" print only the bad slaves, else
223/// print all slaves.
224
226{
227 TString stat = fStatus == kActive ? "active" :
228 fStatus == kBad ? "bad" :
229 "not active";
230
231 Bool_t newfmt = kFALSE;
232 TString oo(opt);
233 if (oo.Contains("N")) {
234 newfmt = kTRUE;
235 oo.ReplaceAll("N","");
236 }
237 if (oo == "active" && fStatus != kActive) return;
238 if (oo == "notactive" && fStatus != kNotActive) return;
239 if (oo == "bad" && fStatus != kBad) return;
240
241 if (newfmt) {
242 TString msd, si, datadir;
243 if (!(fMsd.IsNull())) msd.Form("| msd: %s ", fMsd.Data());
244 if (!(fDataDir.IsNull())) datadir.Form("| datadir: %s ", fDataDir.Data());
245 if (fSysInfo.fCpus > 0) {
246 si.Form("| %s, %d cores, %d MB ram", fHostName.Data(),
248 } else {
249 si.Form("| %s", fHostName.Data());
250 }
251 Printf("Worker: %9s %s %s%s| %s", fOrdinal.Data(), si.Data(), msd.Data(), datadir.Data(), stat.Data());
252
253 } else {
254 TString msd = fMsd.IsNull() ? "<null>" : fMsd.Data();
255
256 std::cout << "Slave: " << fOrdinal
257 << " hostname: " << fHostName
258 << " msd: " << msd
259 << " perf index: " << fPerfIndex
260 << " " << stat
261 << std::endl;
262 }
263}
264
265////////////////////////////////////////////////////////////////////////////////
266/// Setter for fSysInfo
267
269{
270 fSysInfo.fOS = si.fOS; // OS
271 fSysInfo.fModel = si.fModel; // computer model
272 fSysInfo.fCpuType = si.fCpuType; // type of cpu
273 fSysInfo.fCpus = si.fCpus; // number of cpus
274 fSysInfo.fCpuSpeed = si.fCpuSpeed; // cpu speed in MHz
275 fSysInfo.fBusSpeed = si.fBusSpeed; // bus speed in MHz
276 fSysInfo.fL2Cache = si.fL2Cache; // level 2 cache size in KB
277 fSysInfo.fPhysRam = si.fPhysRam; // Physical RAM
278}
279
281
282//------------------------------------------------------------------------------
283
284////////////////////////////////////////////////////////////////////////////////
285/// Destructor
286
288{
289 // Just delete the list, the objects are owned by other list
290 if (fWorkers) {
293 }
294}
295////////////////////////////////////////////////////////////////////////////////
296/// Increase number of already merged workers by 1
297
299{
301 Error("SetMergedWorker", "all workers have been already merged before!");
302 else
304}
305
306////////////////////////////////////////////////////////////////////////////////
307/// Add new worker to the list of workers to be merged by this merger
308
310{
311 if (!fWorkers)
312 fWorkers = new TList();
313 if (fWorkersToMerge == fWorkers->GetSize()) {
314 Error("AddWorker", "all workers have been already assigned to this merger");
315 return;
316 }
317 fWorkers->Add(sl);
318}
319
320////////////////////////////////////////////////////////////////////////////////
321/// Return if merger has already merged all workers, i.e. if it has finished its merging job
322
324{
326}
327
328////////////////////////////////////////////////////////////////////////////////
329/// Return if the determined number of workers has been already assigned to this merger
330
332{
333 if (!fWorkers)
334 return kFALSE;
335
336 return (fWorkers->GetSize() == fWorkersToMerge);
337}
338
339////////////////////////////////////////////////////////////////////////////////
340/// This a private API function.
341/// It checks whether the connection string contains a PoD cluster protocol.
342/// If it does, then the connection string will be changed to reflect
343/// a real PROOF connection string for a PROOF cluster managed by PoD.
344/// PoD: http://pod.gsi.de .
345/// Return -1 if the PoD request failed; return 0 otherwise.
346
347static Int_t PoDCheckUrl(TString *_cluster)
348{
349 if ( !_cluster )
350 return 0;
351
352 // trim spaces from both sides of the string
353 *_cluster = _cluster->Strip( TString::kBoth );
354 // PoD protocol string
355 const TString pod_prot("pod");
356
357 // URL test
358 // TODO: The URL test is to support remote PoD servers (not managed by pod-remote)
359 TUrl url( _cluster->Data() );
360 if( pod_prot.CompareTo(url.GetProtocol(), TString::kIgnoreCase) )
361 return 0;
362
363 // PoD cluster is used
364 // call pod-info in a batch mode (-b).
365 // pod-info will find either a local PoD cluster or
366 // a remote one, manged by pod-remote.
367 *_cluster = gSystem->GetFromPipe("pod-info -c -b");
368 if( 0 == _cluster->Length() ) {
369 Error("PoDCheckUrl", "PoD server is not running");
370 return -1;
371 }
372 return 0;
373}
374
375////////////////////////////////////////////////////////////////////////////////
376/// Create a PROOF environment. Starting PROOF involves either connecting
377/// to a master server, which in turn will start a set of slave servers, or
378/// directly starting as master server (if master = ""). Masterurl is of
379/// the form: [proof[s]://]host[:port]. Conffile is the name of the config
380/// file describing the remote PROOF cluster (this argument alows you to
381/// describe different cluster configurations).
382/// The default is proof.conf. Confdir is the directory where the config
383/// file and other PROOF related files are (like motd and noproof files).
384/// Loglevel is the log level (default = 1). User specified custom config
385/// files will be first looked for in $HOME/.conffile.
386
387TProof::TProof(const char *masterurl, const char *conffile, const char *confdir,
388 Int_t loglevel, const char *alias, TProofMgr *mgr)
389 : fUrl(masterurl)
390{
391 // Default initializations
392 InitMembers();
393
394 // This may be needed during init
395 fManager = mgr;
396
397 // Default server type
399
400 // Default query mode
402
403 // Parse the main URL, adjusting the missing fields and setting the relevant
404 // bits
407
408 // Protocol and Host
409 if (!masterurl || strlen(masterurl) <= 0) {
410 fUrl.SetProtocol("proof");
411 fUrl.SetHost("__master__");
412 } else if (!(strstr(masterurl, "://"))) {
413 fUrl.SetProtocol("proof");
414 }
415 // Port
416 if (fUrl.GetPort() == TUrl(" ").GetPort())
417 fUrl.SetPort(TUrl("proof:// ").GetPort());
418
419 // Make sure to store the FQDN, so to get a solid reference for subsequent checks
420 if (!strcmp(fUrl.GetHost(), "__master__"))
421 fMaster = fUrl.GetHost();
422 else if (!strlen(fUrl.GetHost()))
424 else
426
427 // Server type
428 if (strlen(fUrl.GetOptions()) > 0) {
429 TString opts(fUrl.GetOptions());
430 if (!(strncmp(fUrl.GetOptions(),"std",3))) {
432 opts.Remove(0,3);
433 fUrl.SetOptions(opts.Data());
434 } else if (!(strncmp(fUrl.GetOptions(),"lite",4))) {
436 opts.Remove(0,4);
437 fUrl.SetOptions(opts.Data());
438 }
439 }
440
441 // Instance type
445 if (fMaster == "__master__") {
449 } else if (fMaster == "prooflite") {
450 // Client and master are merged
453 }
454 // Flag that we are a client
456 if (!gSystem->Getenv("ROOTPROOFCLIENT")) gSystem->Setenv("ROOTPROOFCLIENT","");
457
458 Init(masterurl, conffile, confdir, loglevel, alias);
459
460 // If the user was not set, get it from the master
461 if (strlen(fUrl.GetUser()) <= 0) {
462 TString usr, emsg;
463 if (Exec("gProofServ->GetUser()", "0", kTRUE) == 0) {
464 TObjString *os = fMacroLog.GetLineWith("const char");
465 if (os) {
466 Ssiz_t fst = os->GetString().First('\"');
467 Ssiz_t lst = os->GetString().Last('\"');
468 usr = os->GetString()(fst+1, lst-fst-1);
469 } else {
470 emsg = "could not find 'const char *' string in macro log";
471 }
472 } else {
473 emsg = "could not retrieve user info";
474 }
475 if (!emsg.IsNull()) {
476 // Get user logon name
478 if (pw) {
479 usr = pw->fUser;
480 delete pw;
481 }
482 Warning("TProof", "%s: using local default %s", emsg.Data(), usr.Data());
483 }
484 // Set the user name in the main URL
485 fUrl.SetUser(usr.Data());
486 }
487
488 // If called by a manager, make sure it stays in last position
489 // for cleaning
490 if (mgr) {
492 gROOT->GetListOfSockets()->Remove(mgr);
493 gROOT->GetListOfSockets()->Add(mgr);
494 }
495
496 // Old-style server type: we add this to the list and set the global pointer
498 if (!gROOT->GetListOfProofs()->FindObject(this))
499 gROOT->GetListOfProofs()->Add(this);
500
501 // Still needed by the packetizers: needs to be changed
502 gProof = this;
503}
504
505////////////////////////////////////////////////////////////////////////////////
506/// Protected constructor to be used by classes deriving from TProof
507/// (they have to call Init themselves and override StartSlaves
508/// appropriately).
509///
510/// This constructor simply closes any previous gProof and sets gProof
511/// to this instance.
512
513TProof::TProof() : fUrl(""), fServType(TProofMgr::kXProofd)
514{
515 // Default initializations
516 InitMembers();
517
518 if (!gROOT->GetListOfProofs()->FindObject(this))
519 gROOT->GetListOfProofs()->Add(this);
520
521 gProof = this;
522}
523
524////////////////////////////////////////////////////////////////////////////////
525/// Default initializations
526
528{
529 fValid = kFALSE;
530 fTty = kFALSE;
531 fRecvMessages = 0;
532 fSlaveInfo = 0;
537 fActiveSlaves = 0;
538 fInactiveSlaves = 0;
539 fUniqueSlaves = 0;
542 fActiveMonitor = 0;
543 fUniqueMonitor = 0;
545 fCurrentMonitor = 0;
546 fBytesRead = 0;
547 fRealTime = 0;
548 fCpuTime = 0;
549 fIntHandler = 0;
550 fProgressDialog = 0;
553 fPlayer = 0;
554 fFeedback = 0;
555 fChains = 0;
556 fDSet = 0;
557 fNotIdle = 0;
558 fSync = kTRUE;
562 fLogFileW = 0;
563 fLogFileR = 0;
566 fMacroLog.SetName("ProofLogMacro");
567
568 fWaitingSlaves = 0;
569 fQueries = 0;
570 fOtherQueries = 0;
571 fDrawQueries = 0;
572 fMaxDrawQueries = 1;
573 fSeqNum = 0;
574
575 fSessionID = -1;
577
578 fPackMgr = 0;
580
581 fInputData = 0;
582
583 fPrintProgress = 0;
584
585 fLoadedMacros = 0;
586
587 fProtocol = -1;
588 fSlaves = 0;
590 fBadSlaves = 0;
591 fAllMonitor = 0;
593 fBytesReady = 0;
594 fTotalBytes = 0;
597 fRunningDSets = 0;
598
599 fCollectTimeout = -1;
600
601 fManager = 0;
604
607 fMergers = 0;
608 fMergersCount = -1;
610 fWorkersToMerge = 0;
612
613 fPerfTree = "";
614
616
617 fSelector = 0;
618
619 fPrepTime = 0.;
620
621 // Check if the user defined a list of environment variables to send over:
622 // include them into the dedicated list
623 if (gSystem->Getenv("PROOF_ENVVARS")) {
624 TString envs(gSystem->Getenv("PROOF_ENVVARS")), env, envsfound;
625 Int_t from = 0;
626 while (envs.Tokenize(env, from, ",")) {
627 if (!env.IsNull()) {
628 if (!gSystem->Getenv(env)) {
629 Warning("Init", "request for sending over undefined environemnt variable '%s' - ignoring", env.Data());
630 } else {
631 if (!envsfound.IsNull()) envsfound += ",";
632 envsfound += env;
634 TProof::AddEnvVar(env, gSystem->Getenv(env));
635 }
636 }
637 }
638 if (envsfound.IsNull()) {
639 Warning("Init", "none of the requested env variables were found: '%s'", envs.Data());
640 } else {
641 Info("Init", "the following environment variables have been added to the list to be sent to the nodes: '%s'", envsfound.Data());
642 }
643 }
644
645 // Done
646 return;
647}
648
649////////////////////////////////////////////////////////////////////////////////
650/// Clean up PROOF environment.
651
653{
654 if (fChains) {
655 while (TChain *chain = dynamic_cast<TChain*> (fChains->First()) ) {
656 // remove "chain" from list
657 chain->SetProof(0);
658 RemoveChain(chain);
659 }
660 }
661
662 // remove links to packages enabled on the client
664 // iterate over all packages
666 if (epl) {
667 TIter nxp(epl);
668 while (TObjString *pck = (TObjString *)(nxp())) {
669 FileStat_t stat;
670 if (gSystem->GetPathInfo(pck->String(), stat) == 0) {
671 // check if symlink, if so unlink
672 // NOTE: GetPathInfo() returns 1 in case of symlink that does not point to
673 // existing file or to a directory, but if fIsLink is true the symlink exists
674 if (stat.fIsLink)
675 gSystem->Unlink(pck->String());
676 }
677 }
678 epl->Delete();
679 delete epl;
680 }
681 }
682
683 Close();
710 if (fWrksOutputReady) {
712 delete fWrksOutputReady;
713 }
714 if (fQueries) {
715 fQueries->Delete();
716 delete fQueries;
717 }
718
719 // remove file with redirected logs
721 if (fLogFileR)
722 fclose(fLogFileR);
723 if (fLogFileW)
724 fclose(fLogFileW);
725 if (fLogFileName.Length() > 0)
727 }
728
729 // Remove for the global list
730 gROOT->GetListOfProofs()->Remove(this);
731 // ... and from the manager list
732 if (fManager && fManager->IsValid())
734
735 if (gProof && gProof == this) {
736 // Set previous as default
737 TIter pvp(gROOT->GetListOfProofs(), kIterBackward);
738 while ((gProof = (TProof *)pvp())) {
740 break;
741 }
742 }
743
744 // For those interested in our destruction ...
745 Emit("~TProof()");
746 Emit("CloseWindow()");
747}
748
749////////////////////////////////////////////////////////////////////////////////
750/// Start the PROOF environment. Starting PROOF involves either connecting
751/// to a master server, which in turn will start a set of slave servers, or
752/// directly starting as master server (if master = ""). For a description
753/// of the arguments see the TProof ctor. Returns the number of started
754/// master or slave servers, returns 0 in case of error, in which case
755/// fValid remains false.
756
757Int_t TProof::Init(const char *, const char *conffile,
758 const char *confdir, Int_t loglevel, const char *alias)
759{
761
762 fValid = kFALSE;
763
764 // Connected to terminal?
765 fTty = (isatty(0) == 0 || isatty(1) == 0) ? kFALSE : kTRUE;
766
767 // If in attach mode, options is filled with additional info
768 Bool_t attach = kFALSE;
769 if (strlen(fUrl.GetOptions()) > 0) {
770 attach = kTRUE;
771 // A flag from the GUI
772 TString opts = fUrl.GetOptions();
773 if (opts.Contains("GUI")) {
775 opts.Remove(opts.Index("GUI"));
776 fUrl.SetOptions(opts);
777 }
778 }
779
781 // Fill default conf file and conf dir
782 if (!conffile || !conffile[0])
784 if (!confdir || !confdir[0])
786 // The group; the client receives it in the kPROOF_SESSIONTAG message
788 } else {
789 fConfDir = confdir;
790 fConfFile = conffile;
791 }
792
793 // Analysise the conffile field
794 if (fConfFile.Contains("workers=0")) fConfFile.ReplaceAll("workers=0", "masteronly");
796
798 fLogLevel = loglevel;
801 fImage = fMasterServ ? "" : "<local>";
802 fIntHandler = 0;
803 fStatus = 0;
804 fRecvMessages = new TList;
806 fSlaveInfo = 0;
807 fChains = new TList;
810 fRunningDSets = 0;
812 fInputData = 0;
814 fPrintProgress = 0;
815
816 // Timeout for some collect actions
817 fCollectTimeout = gEnv->GetValue("Proof.CollectTimeout", -1);
818
819 // Should the workers be started dynamically; default: no
820 fDynamicStartup = gEnv->GetValue("Proof.DynamicStartup", kFALSE);
821
822 // Default entry point for the data pool is the master
824 fDataPoolUrl.Form("root://%s", fMaster.Data());
825 else
826 fDataPoolUrl = "";
827
828 fProgressDialog = 0;
830
831 // Default alias is the master name
832 TString al = (alias) ? alias : fMaster.Data();
833 SetAlias(al);
834
835 // Client logging of messages from the master and slaves
838 fLogFileName.Form("%s/ProofLog_%d", gSystem->TempDirectory(), gSystem->GetPid());
839 if ((fLogFileW = fopen(fLogFileName, "w")) == 0)
840 Error("Init", "could not create temporary logfile");
841 if ((fLogFileR = fopen(fLogFileName, "r")) == 0)
842 Error("Init", "could not open temp logfile for reading");
843 }
845
846 // Status of cluster
847 fNotIdle = 0;
848 // Query type
849 fSync = (attach) ? kFALSE : kTRUE;
850 // Not enqueued
852
853 // Counters
854 fBytesRead = 0;
855 fRealTime = 0;
856 fCpuTime = 0;
857
858 // List of queries
859 fQueries = 0;
860 fOtherQueries = 0;
861 fDrawQueries = 0;
862 fMaxDrawQueries = 1;
863 fSeqNum = 0;
864
865 // Remote ID of the session
866 fSessionID = -1;
867
868 // Part of active query
869 fWaitingSlaves = 0;
870
871 // Make remote PROOF player
872 fPlayer = 0;
873 MakePlayer();
874
875 fFeedback = new TList;
877 fFeedback->SetName("FeedbackList");
879
880 // sort slaves by descending performance index
882 fActiveSlaves = new TList;
884 fUniqueSlaves = new TList;
887 fBadSlaves = new TList;
888 fAllMonitor = new TMonitor;
892 fCurrentMonitor = 0;
893
896
897 fLoadedMacros = 0;
898 fPackMgr = 0;
899
900 // Enable optimized sending of streamer infos to use embedded backward/forward
901 // compatibility support between different ROOT versions and different versions of
902 // users classes
903 Bool_t enableSchemaEvolution = gEnv->GetValue("Proof.SchemaEvolution",1);
904 if (enableSchemaEvolution) {
906 } else {
907 Info("TProof", "automatic schema evolution in TMessage explicitly disabled");
908 }
909
910 if (IsMaster()) {
911 // to make UploadPackage() method work on the master as well.
913 } else {
914
915 TString sandbox;
916 if (GetSandbox(sandbox, kTRUE) != 0) {
917 Error("Init", "failure asserting sandbox directory %s", sandbox.Data());
918 return 0;
919 }
920
921 // Package Dir
922 TString packdir = gEnv->GetValue("Proof.PackageDir", "");
923 if (packdir.IsNull())
924 packdir.Form("%s/%s", sandbox.Data(), kPROOF_PackDir);
925 if (AssertPath(packdir, kTRUE) != 0) {
926 Error("Init", "failure asserting directory %s", packdir.Data());
927 return 0;
928 }
929 fPackMgr = new TPackMgr(packdir);
930 if (gDebug > 0)
931 Info("Init", "package directory set to %s", packdir.Data());
932 }
933
934 if (!IsMaster()) {
935 // List of directories where to look for global packages
936 TString globpack = gEnv->GetValue("Proof.GlobalPackageDirs","");
938 Int_t nglb = TPackMgr::RegisterGlobalPath(globpack);
939 if (gDebug > 0)
940 Info("Init", " %d global package directories registered", nglb);
941 }
942
943 // Master may want dynamic startup
944 if (fDynamicStartup) {
945 if (!IsMaster()) {
946 // If on client - start the master
947 if (!StartSlaves(attach))
948 return 0;
949 }
950 } else {
951
952 // Master Only mode (for operations requiring only the master, e.g. dataset browsing,
953 // result retrieving, ...)
954 Bool_t masterOnly = gEnv->GetValue("Proof.MasterOnly", kFALSE);
955 if (!IsMaster() || !masterOnly) {
956 // Start slaves (the old, static, per-session way)
957 if (!StartSlaves(attach))
958 return 0;
959 // Client: Is Master in dynamic startup mode?
960 if (!IsMaster()) {
961 Int_t dyn = 0;
962 GetRC("Proof.DynamicStartup", dyn);
963 if (dyn != 0) fDynamicStartup = kTRUE;
964 }
965 }
966 }
967 // we are now properly initialized
968 fValid = kTRUE;
969
970 // De-activate monitor (will be activated in Collect)
972
973 // By default go into parallel mode
974 Int_t nwrk = GetRemoteProtocol() > 35 ? -1 : 9999;
975 TNamed *n = 0;
976 if (TProof::GetEnvVars() &&
977 (n = (TNamed *) TProof::GetEnvVars()->FindObject("PROOF_NWORKERS"))) {
978 TString s(n->GetTitle());
979 if (s.IsDigit()) nwrk = s.Atoi();
980 }
981 GoParallel(nwrk, attach);
982
983 // Send relevant initial state to slaves
984 if (!attach)
986 else if (!IsIdle())
987 // redirect log
989
990 // Done at this point, the alias will be communicated to the coordinator, if any
992 SetAlias(al);
993
995
996 if (IsValid()) {
997
998 // Activate input handler
1000
1002 gROOT->GetListOfSockets()->Add(this);
1003 }
1004
1005 AskParallel();
1006
1007 return fActiveSlaves->GetSize();
1008}
1009
1010////////////////////////////////////////////////////////////////////////////////
1011/// Set the sandbox path from ' Proof.Sandbox' or the alternative var 'rc'.
1012/// Use the existing setting or the default if nothing is found.
1013/// If 'assert' is kTRUE, make also sure that the path exists.
1014/// Return 0 on success, -1 on failure
1015
1016Int_t TProof::GetSandbox(TString &sb, Bool_t assert, const char *rc)
1017{
1018 // Get it from 'rc', if defined
1019 if (rc && strlen(rc)) sb = gEnv->GetValue(rc, sb);
1020 // Or use the default 'rc'
1021 if (sb.IsNull()) sb = gEnv->GetValue("Proof.Sandbox", "");
1022 // If nothing found , use the default
1023 if (sb.IsNull()) sb.Form("~/%s", kPROOF_WorkDir);
1024 // Expand special settings
1025 if (sb == ".") {
1026 sb = gSystem->pwd();
1027 } else if (sb == "..") {
1028 sb = gSystem->GetDirName(gSystem->pwd());
1029 }
1031
1032 // Assert the path, if required
1033 if (assert && AssertPath(sb, kTRUE) != 0) return -1;
1034 // Done
1035 return 0;
1036}
1037
1038////////////////////////////////////////////////////////////////////////////////
1039/// The config file field may contain special instructions which need to be
1040/// parsed at the beginning, e.g. for debug runs with valgrind.
1041/// Several options can be given separated by a ','
1042
1043void TProof::ParseConfigField(const char *config)
1044{
1045 TString sconf(config), opt;
1046 Ssiz_t from = 0;
1047 #ifdef R__LINUX
1048 Bool_t cpuPin = kFALSE;
1049 #endif
1050
1051 // Analysise the field
1052 const char *cq = (IsLite()) ? "\"" : "";
1053 while (sconf.Tokenize(opt, from, ",")) {
1054 if (opt.IsNull()) continue;
1055
1056 if (opt.BeginsWith("valgrind")) {
1057 // Any existing valgrind setting? User can give full settings, which we fully respect,
1058 // or pass additional options for valgrind by prefixing 'valgrind_opts:'. For example,
1059 // TProof::AddEnvVar("PROOF_MASTER_WRAPPERCMD", "valgrind_opts:--time-stamp --leak-check=full"
1060 // will add option "--time-stamp --leak-check=full" to our default options
1061 TString mst, top, sub, wrk, all;
1062 TList *envs = fgProofEnvList;
1063 TNamed *n = 0;
1064 if (envs) {
1065 if ((n = (TNamed *) envs->FindObject("PROOF_WRAPPERCMD")))
1066 all = n->GetTitle();
1067 if ((n = (TNamed *) envs->FindObject("PROOF_MASTER_WRAPPERCMD")))
1068 mst = n->GetTitle();
1069 if ((n = (TNamed *) envs->FindObject("PROOF_TOPMASTER_WRAPPERCMD")))
1070 top = n->GetTitle();
1071 if ((n = (TNamed *) envs->FindObject("PROOF_SUBMASTER_WRAPPERCMD")))
1072 sub = n->GetTitle();
1073 if ((n = (TNamed *) envs->FindObject("PROOF_SLAVE_WRAPPERCMD")))
1074 wrk = n->GetTitle();
1075 }
1076 if (all != "" && mst == "") mst = all;
1077 if (all != "" && top == "") top = all;
1078 if (all != "" && sub == "") sub = all;
1079 if (all != "" && wrk == "") wrk = all;
1080 if (all != "" && all.BeginsWith("valgrind_opts:")) {
1081 // The field is used to add an option Reset the setting
1082 Info("ParseConfigField","valgrind run: resetting 'PROOF_WRAPPERCMD':"
1083 " must be set again for next run , if any");
1084 TProof::DelEnvVar("PROOF_WRAPPERCMD");
1085 }
1086 TString var, cmd;
1087 cmd.Form("%svalgrind -v --suppressions=<rootsys>/etc/valgrind-root.supp", cq);
1088 TString mstlab("NO"), wrklab("NO");
1089 Bool_t doMaster = (opt == "valgrind" || (opt.Contains("master") &&
1090 !opt.Contains("topmaster") && !opt.Contains("submaster")))
1091 ? kTRUE : kFALSE;
1092 if (doMaster) {
1093 if (!IsLite()) {
1094 // Check if we have to add a var
1095 if (mst == "" || mst.BeginsWith("valgrind_opts:")) {
1096 mst.ReplaceAll("valgrind_opts:","");
1097 var.Form("%s --log-file=<logfilemst>.valgrind.log %s", cmd.Data(), mst.Data());
1098 TProof::AddEnvVar("PROOF_MASTER_WRAPPERCMD", var);
1099 mstlab = "YES";
1100 } else if (mst != "") {
1101 mstlab = "YES";
1102 }
1103 } else {
1104 if (opt.Contains("master")) {
1105 Warning("ParseConfigField",
1106 "master valgrinding does not make sense for PROOF-Lite: ignoring");
1107 opt.ReplaceAll("master", "");
1108 if (!opt.Contains("workers")) return;
1109 }
1110 if (opt == "valgrind" || opt == "valgrind=") opt = "valgrind=workers";
1111 }
1112 }
1113 if (opt.Contains("topmaster")) {
1114 // Check if we have to add a var
1115 if (top == "" || top.BeginsWith("valgrind_opts:")) {
1116 top.ReplaceAll("valgrind_opts:","");
1117 var.Form("%s --log-file=<logfilemst>.valgrind.log %s", cmd.Data(), top.Data());
1118 TProof::AddEnvVar("PROOF_TOPMASTER_WRAPPERCMD", var);
1119 mstlab = "YES";
1120 } else if (top != "") {
1121 mstlab = "YES";
1122 }
1123 }
1124 if (opt.Contains("submaster")) {
1125 // Check if we have to add a var
1126 if (sub == "" || sub.BeginsWith("valgrind_opts:")) {
1127 sub.ReplaceAll("valgrind_opts:","");
1128 var.Form("%s --log-file=<logfilemst>.valgrind.log %s", cmd.Data(), sub.Data());
1129 TProof::AddEnvVar("PROOF_SUBMASTER_WRAPPERCMD", var);
1130 mstlab = "YES";
1131 } else if (sub != "") {
1132 mstlab = "YES";
1133 }
1134 }
1135 if (opt.Contains("=workers") || opt.Contains("+workers")) {
1136 // Check if we have to add a var
1137 if (wrk == "" || wrk.BeginsWith("valgrind_opts:")) {
1138 wrk.ReplaceAll("valgrind_opts:","");
1139 var.Form("%s --log-file=<logfilewrk>.__valgrind__.log %s%s", cmd.Data(), wrk.Data(), cq);
1140 TProof::AddEnvVar("PROOF_SLAVE_WRAPPERCMD", var);
1141 TString nwrks("2");
1142 Int_t inw = opt.Index('#');
1143 if (inw != kNPOS) {
1144 nwrks = opt(inw+1, opt.Length());
1145 if (!nwrks.IsDigit()) nwrks = "2";
1146 }
1147 // Set the relevant variables
1148 if (!IsLite()) {
1149 TProof::AddEnvVar("PROOF_NWORKERS", nwrks);
1150 } else {
1151 gEnv->SetValue("ProofLite.Workers", nwrks.Atoi());
1152 }
1153 wrklab = nwrks;
1154 // Register the additional worker log in the session file
1155 // (for the master this is done automatically)
1156 TProof::AddEnvVar("PROOF_ADDITIONALLOG", "__valgrind__.log*");
1157 } else if (wrk != "") {
1158 wrklab = "ALL";
1159 }
1160 }
1161 // Increase the relevant timeouts
1162 if (!IsLite()) {
1163 TProof::AddEnvVar("PROOF_INTWAIT", "5000");
1164 gEnv->SetValue("Proof.SocketActivityTimeout", 6000);
1165 } else {
1166 gEnv->SetValue("ProofLite.StartupTimeOut", 5000);
1167 }
1168 // Warn for slowness
1169 Printf(" ");
1170 if (!IsLite()) {
1171 Printf(" ---> Starting a debug run with valgrind (master:%s, workers:%s)", mstlab.Data(), wrklab.Data());
1172 } else {
1173 Printf(" ---> Starting a debug run with valgrind (workers:%s)", wrklab.Data());
1174 }
1175 Printf(" ---> Please be patient: startup may be VERY slow ...");
1176 Printf(" ---> Logs will be available as special tags in the log window (from the progress dialog or TProof::LogViewer()) ");
1177 Printf(" ---> (Reminder: this debug run makes sense only if you are running a debug version of ROOT)");
1178 Printf(" ");
1179
1180 } else if (opt.BeginsWith("igprof-pp")) {
1181
1182 // IgProf profiling on master and worker. PROOF does not set the
1183 // environment for you: proper environment variables (like PATH and
1184 // LD_LIBRARY_PATH) should be set externally
1185
1186 Printf("*** Requested IgProf performance profiling ***");
1187 TString addLogExt = "__igprof.pp__.log";
1188 TString addLogFmt = "igprof -pk -pp -t proofserv.exe -o %s.%s";
1189 TString tmp;
1190
1191 if (IsLite()) {
1192 addLogFmt.Append("\"");
1193 addLogFmt.Prepend("\"");
1194 }
1195
1196 tmp.Form(addLogFmt.Data(), "<logfilemst>", addLogExt.Data());
1197 TProof::AddEnvVar("PROOF_MASTER_WRAPPERCMD", tmp.Data());
1198
1199 tmp.Form(addLogFmt.Data(), "<logfilewrk>", addLogExt.Data());
1200 TProof::AddEnvVar("PROOF_SLAVE_WRAPPERCMD", tmp.Data() );
1201
1202 TProof::AddEnvVar("PROOF_ADDITIONALLOG", addLogExt.Data());
1203
1204 } else if (opt.BeginsWith("cpupin=")) {
1205 // Enable CPU pinning. Takes as argument the list of processor IDs
1206 // that will be used in order. Processor IDs are numbered from 0,
1207 // use likwid to see how they are organized. A possible parameter
1208 // format would be:
1209 //
1210 // cpupin=3+4+0+9+10+22+7
1211 //
1212 // Only the specified processor IDs will be used in a round-robin
1213 // fashion, dealing with the fact that you can request more workers
1214 // than the number of processor IDs you have specified.
1215 //
1216 // To use all available processors in their order:
1217 //
1218 // cpupin=*
1219
1220 opt.Remove(0, 7);
1221
1222 // Remove any char which is neither a number nor a plus '+'
1223 for (Ssiz_t i=0; i<opt.Length(); i++) {
1224 Char_t c = opt[i];
1225 if ((c != '+') && ((c < '0') || (c > '9')))
1226 opt[i] = '_';
1227 }
1228 opt.ReplaceAll("_", "");
1229 TProof::AddEnvVar("PROOF_SLAVE_CPUPIN_ORDER", opt);
1230 #ifdef R__LINUX
1231 cpuPin = kTRUE;
1232 #endif
1233 } else if (opt.BeginsWith("workers=")) {
1234
1235 // Request for a given number of workers (within the max) or worker
1236 // startup combination:
1237 // workers=5 start max 5 workers (or less, if less are assigned)
1238 // workers=2x start max 2 workers per node (or less, if less are assigned)
1239 opt.ReplaceAll("workers=","");
1240 TProof::AddEnvVar("PROOF_NWORKERS", opt);
1241 }
1242 }
1243
1244 // In case of PROOF-Lite, enable CPU pinning when requested (Linux only)
1245 #ifdef R__LINUX
1246 if (IsLite() && cpuPin) {
1247 Printf("*** Requested CPU pinning ***");
1248 const TList *ev = GetEnvVars();
1249 const char *pinCmd = "taskset -c <cpupin>";
1250 TString val;
1251 TNamed *p;
1252 if (ev && (p = dynamic_cast<TNamed *>(ev->FindObject("PROOF_SLAVE_WRAPPERCMD")))) {
1253 val = p->GetTitle();
1254 val.Insert(val.Length()-1, " ");
1255 val.Insert(val.Length()-1, pinCmd);
1256 }
1257 else {
1258 val.Form("\"%s\"", pinCmd);
1259 }
1260 TProof::AddEnvVar("PROOF_SLAVE_WRAPPERCMD", val.Data());
1261 }
1262 #endif
1263}
1264
1265////////////////////////////////////////////////////////////////////////////////
1266/// Make sure that 'path' exists; if 'writable' is kTRUE, make also sure
1267/// that the path is writable
1268
1269Int_t TProof::AssertPath(const char *inpath, Bool_t writable)
1270{
1271 if (!inpath || strlen(inpath) <= 0) {
1272 Error("AssertPath", "undefined input path");
1273 return -1;
1274 }
1275
1276 TString path(inpath);
1277 gSystem->ExpandPathName(path);
1278
1279 if (gSystem->AccessPathName(path, kFileExists)) {
1280 if (gSystem->mkdir(path, kTRUE) != 0) {
1281 Error("AssertPath", "could not create path %s", path.Data());
1282 return -1;
1283 }
1284 }
1285 // It must be writable
1286 if (gSystem->AccessPathName(path, kWritePermission) && writable) {
1287 if (gSystem->Chmod(path, 0666) != 0) {
1288 Error("AssertPath", "could not make path %s writable", path.Data());
1289 return -1;
1290 }
1291 }
1292
1293 // Done
1294 return 0;
1295}
1296
1297////////////////////////////////////////////////////////////////////////////////
1298/// Set manager and schedule its destruction after this for clean
1299/// operations.
1300
1302{
1303 fManager = mgr;
1304
1305 if (mgr) {
1307 gROOT->GetListOfSockets()->Remove(mgr);
1308 gROOT->GetListOfSockets()->Add(mgr);
1309 }
1310}
1311
1312////////////////////////////////////////////////////////////////////////////////
1313/// Works on the master node only.
1314/// It starts workers on the machines in workerList and sets the paths,
1315/// packages and macros as on the master.
1316/// It is a subbstitute for StartSlaves(...)
1317/// The code is mostly the master part of StartSlaves,
1318/// with the parallel startup removed.
1319
1321{
1322 if (!IsMaster()) {
1323 Error("AddWorkers", "AddWorkers can only be called on the master!");
1324 return -1;
1325 }
1326
1327 if (!workerList || !(workerList->GetSize())) {
1328 Error("AddWorkers", "empty list of workers!");
1329 return -2;
1330 }
1331
1332 // Code taken from master part of StartSlaves with the parllel part removed
1333
1335 if (fImage.IsNull())
1337
1338 // Get all workers
1339 UInt_t nSlaves = workerList->GetSize();
1340 UInt_t nSlavesDone = 0;
1341 Int_t ord = 0;
1342
1343 // Loop over all new workers and start them (if we had already workers it means we are
1344 // increasing parallelism or that is not the first time we are called)
1345 Bool_t goMoreParallel = (fSlaves->GetEntries() > 0) ? kTRUE : kFALSE;
1346
1347 // A list of TSlave objects for workers that are being added
1348 TList *addedWorkers = new TList();
1349 if (!addedWorkers) {
1350 // This is needed to silence Coverity ...
1351 Error("AddWorkers", "cannot create new list for the workers to be added");
1352 return -2;
1353 }
1354 addedWorkers->SetOwner(kFALSE);
1355 TListIter next(workerList);
1356 TObject *to;
1357 TProofNodeInfo *worker;
1358 TSlaveInfo *dummysi = new TSlaveInfo();
1359 while ((to = next())) {
1360 // Get the next worker from the list
1361 worker = (TProofNodeInfo *)to;
1362
1363 // Read back worker node info
1364 const Char_t *image = worker->GetImage().Data();
1365 const Char_t *workdir = worker->GetWorkDir().Data();
1366 Int_t perfidx = worker->GetPerfIndex();
1367 Int_t sport = worker->GetPort();
1368 if (sport == -1)
1369 sport = fUrl.GetPort();
1370
1371 // Create worker server
1372 TString fullord;
1373 if (worker->GetOrdinal().Length() > 0) {
1374 fullord.Form("%s.%s", gProofServ->GetOrdinal(), worker->GetOrdinal().Data());
1375 } else {
1376 fullord.Form("%s.%d", gProofServ->GetOrdinal(), ord);
1377 }
1378
1379 // Remove worker from the list of workers terminated gracefully
1380 dummysi->SetOrdinal(fullord);
1382 SafeDelete(rmsi);
1383
1384 // Create worker server
1385 TString wn(worker->GetNodeName());
1386 if (wn == "localhost" || wn.BeginsWith("localhost.")) wn = gSystem->HostName();
1387 TUrl u(TString::Format("%s:%d", wn.Data(), sport));
1388 // Add group info in the password firdl, if any
1389 if (strlen(gProofServ->GetGroup()) > 0) {
1390 // Set also the user, otherwise the password is not exported
1391 if (strlen(u.GetUser()) <= 0)
1394 }
1395 TSlave *slave = 0;
1396 if (worker->IsWorker()) {
1397 slave = CreateSlave(u.GetUrl(), fullord, perfidx, image, workdir);
1398 } else {
1399 slave = CreateSubmaster(u.GetUrl(), fullord,
1400 image, worker->GetMsd(), worker->GetNWrks());
1401 }
1402
1403 // Add to global list (we will add to the monitor list after
1404 // finalizing the server startup)
1405 Bool_t slaveOk = kTRUE;
1406 fSlaves->Add(slave);
1407 if (slave->IsValid()) {
1408 addedWorkers->Add(slave);
1409 } else {
1410 slaveOk = kFALSE;
1411 fBadSlaves->Add(slave);
1412 Warning("AddWorkers", "worker '%s' is invalid", slave->GetOrdinal());
1413 }
1414
1415 PDB(kGlobal,3)
1416 Info("AddWorkers", "worker on host %s created"
1417 " and added to list (ord: %s)", worker->GetName(), slave->GetOrdinal());
1418
1419 // Notify opening of connection
1420 nSlavesDone++;
1422 m << TString("Opening connections to workers") << nSlaves
1423 << nSlavesDone << slaveOk;
1425
1426 ord++;
1427 } //end of the worker loop
1428 SafeDelete(dummysi);
1429
1430 // Cleanup
1431 SafeDelete(workerList);
1432
1433 nSlavesDone = 0;
1434
1435 // Here we finalize the server startup: in this way the bulk
1436 // of remote operations are almost parallelized
1437 TIter nxsl(addedWorkers);
1438 TSlave *sl = 0;
1439 while ((sl = (TSlave *) nxsl())) {
1440
1441 // Finalize setup of the server
1442 if (sl->IsValid())
1443 sl->SetupServ(TSlave::kSlave, 0);
1444
1445 // Monitor good slaves
1446 Bool_t slaveOk = kTRUE;
1447 if (sl->IsValid()) {
1448 fAllMonitor->Add(sl->GetSocket());
1449 PDB(kGlobal,3)
1450 Info("AddWorkers", "worker on host %s finalized"
1451 " and added to list", sl->GetOrdinal());
1452 } else {
1453 slaveOk = kFALSE;
1454 fBadSlaves->Add(sl);
1455 }
1456
1457 // Notify end of startup operations
1458 nSlavesDone++;
1460 m << TString("Setting up worker servers") << nSlaves
1461 << nSlavesDone << slaveOk;
1463 }
1464
1465 // Now set new state on the added workers (on all workers for simplicity)
1466 // use fEnabledPackages, fLoadedMacros,
1467 // gSystem->GetDynamicPath() and gSystem->GetIncludePath()
1468 // no need to load packages that are only loaded and not enabled (dyn mode)
1469 Int_t nwrk = GetRemoteProtocol() > 35 ? -1 : 9999;
1470 TNamed *n = 0;
1471 if (TProof::GetEnvVars() &&
1472 (n = (TNamed *) TProof::GetEnvVars()->FindObject("PROOF_NWORKERS"))) {
1473 TString s(n->GetTitle());
1474 if (s.IsDigit()) nwrk = s.Atoi();
1475 }
1476
1477 if (fDynamicStartup && goMoreParallel) {
1478
1479 PDB(kGlobal, 3)
1480 Info("AddWorkers", "will invoke GoMoreParallel()");
1481 Int_t nw = GoMoreParallel(nwrk);
1482 PDB(kGlobal, 3)
1483 Info("AddWorkers", "GoMoreParallel()=%d", nw);
1484
1485 }
1486 else {
1487 // Not in Dynamic Workers mode
1488 PDB(kGlobal, 3)
1489 Info("AddWorkers", "will invoke GoParallel()");
1490 GoParallel(nwrk, kFALSE, 0);
1491 }
1492
1493 // Set worker processing environment
1494 SetupWorkersEnv(addedWorkers, goMoreParallel);
1495
1496 // Update list of current workers
1497 PDB(kGlobal, 3)
1498 Info("AddWorkers", "will invoke SaveWorkerInfo()");
1500
1501 // Inform the client that the number of workers has changed
1502 if (fDynamicStartup && gProofServ) {
1503 PDB(kGlobal, 3)
1504 Info("AddWorkers", "will invoke SendParallel()");
1506
1507 if (goMoreParallel && fPlayer) {
1508 // In case we are adding workers dynamically to an existing process, we
1509 // should invoke a special player's Process() to set only added workers
1510 // to the proper state
1511 PDB(kGlobal, 3)
1512 Info("AddWorkers", "will send the PROCESS message to selected workers");
1513 fPlayer->JoinProcess(addedWorkers);
1514 // Update merger counters (new workers are not yet active)
1515 fMergePrg.SetNWrks(fActiveSlaves->GetSize() + addedWorkers->GetSize());
1516 }
1517 }
1518
1519 // Cleanup
1520 delete addedWorkers;
1521
1522 return 0;
1523}
1524
1525////////////////////////////////////////////////////////////////////////////////
1526/// Set up packages, loaded macros, include and lib paths ...
1527
1528void TProof::SetupWorkersEnv(TList *addedWorkers, Bool_t increasingWorkers)
1529{
1530 TList *server_packs = gProofServ ? gProofServ->GetEnabledPackages() : nullptr;
1531 // Packages
1532 TList *packs = server_packs ? server_packs : GetEnabledPackages();
1533 if (packs && packs->GetSize() > 0) {
1534 TIter nxp(packs);
1535 TPair *pck = 0;
1536 while ((pck = (TPair *) nxp())) {
1537 // Upload and Enable methods are intelligent and avoid
1538 // re-uploading or re-enabling of a package to a node that has it.
1539 if (fDynamicStartup && increasingWorkers) {
1540 // Upload only on added workers
1541 PDB(kGlobal, 3)
1542 Info("SetupWorkersEnv", "will invoke UploadPackage() and EnablePackage() on added workers");
1543 if (UploadPackage(pck->GetName(), kUntar, addedWorkers) >= 0)
1544 EnablePackage(pck->GetName(), (TList *) pck->Value(), kTRUE, addedWorkers);
1545 } else {
1546 PDB(kGlobal, 3)
1547 Info("SetupWorkersEnv", "will invoke UploadPackage() and EnablePackage() on all workers");
1548 if (UploadPackage(pck->GetName()) >= 0)
1549 EnablePackage(pck->GetName(), (TList *) pck->Value(), kTRUE);
1550 }
1551 }
1552 }
1553
1554 if (server_packs) {
1555 server_packs->Delete();
1556 delete server_packs;
1557 }
1558
1559 // Loaded macros
1560 if (fLoadedMacros) {
1561 TIter nxp(fLoadedMacros);
1562 TObjString *os = 0;
1563 while ((os = (TObjString *) nxp())) {
1564 PDB(kGlobal, 3) {
1565 Info("SetupWorkersEnv", "will invoke Load() on selected workers");
1566 Printf("Loading a macro : %s", os->GetName());
1567 }
1568 Load(os->GetName(), kTRUE, kTRUE, addedWorkers);
1569 }
1570 }
1571
1572 // Dynamic path
1574 dyn.ReplaceAll(":", " ");
1575 dyn.ReplaceAll("\"", " ");
1576 PDB(kGlobal, 3)
1577 Info("SetupWorkersEnv", "will invoke AddDynamicPath() on selected workers");
1578 AddDynamicPath(dyn, kFALSE, addedWorkers, kFALSE); // Do not Collect
1579
1580 // Include path
1582 inc.ReplaceAll("-I", " ");
1583 inc.ReplaceAll("\"", " ");
1584 PDB(kGlobal, 3)
1585 Info("SetupWorkersEnv", "will invoke AddIncludePath() on selected workers");
1586 AddIncludePath(inc, kFALSE, addedWorkers, kFALSE); // Do not Collect
1587
1588 // Done
1589 return;
1590}
1591
1592////////////////////////////////////////////////////////////////////////////////
1593/// Used for shuting down the workres after a query is finished.
1594/// Sends each of the workers from the workerList, a kPROOF_STOP message.
1595/// If the workerList == 0, shutdown all the workers.
1596
1598{
1599 if (!IsMaster()) {
1600 Error("RemoveWorkers", "RemoveWorkers can only be called on the master!");
1601 return -1;
1602 }
1603
1604 fFileMap.clear(); // This could be avoided if CopyFromCache was used in SendFile
1605
1606 if (!workerList) {
1607 // shutdown all the workers
1608 TIter nxsl(fSlaves);
1609 TSlave *sl = 0;
1610 while ((sl = (TSlave *) nxsl())) {
1611 // Shut down the worker assumig that it is not processing
1612 TerminateWorker(sl);
1613 }
1614
1615 } else {
1616 if (!(workerList->GetSize())) {
1617 Error("RemoveWorkers", "The list of workers should not be empty!");
1618 return -2;
1619 }
1620
1621 // Loop over all the workers and stop them
1622 TListIter next(workerList);
1623 TObject *to;
1624 TProofNodeInfo *worker;
1625 while ((to = next())) {
1626 TSlave *sl = 0;
1627 if (!strcmp(to->ClassName(), "TProofNodeInfo")) {
1628 // Get the next worker from the list
1629 worker = (TProofNodeInfo *)to;
1630 TIter nxsl(fSlaves);
1631 while ((sl = (TSlave *) nxsl())) {
1632 // Shut down the worker assumig that it is not processing
1633 if (sl->GetName() == worker->GetNodeName())
1634 break;
1635 }
1636 } else if (to->InheritsFrom(TSlave::Class())) {
1637 sl = (TSlave *) to;
1638 } else {
1639 Warning("RemoveWorkers","unknown object type: %s - it should be"
1640 " TProofNodeInfo or inheriting from TSlave", to->ClassName());
1641 }
1642 // Shut down the worker assumig that it is not processing
1643 if (sl) {
1644 if (gDebug > 0)
1645 Info("RemoveWorkers","terminating worker %s", sl->GetOrdinal());
1646 TerminateWorker(sl);
1647 }
1648 }
1649 }
1650
1651 // Update also the master counter
1652 if (gProofServ && fSlaves->GetSize() <= 0) gProofServ->ReleaseWorker("master");
1653
1654 return 0;
1655}
1656
1657////////////////////////////////////////////////////////////////////////////////
1658/// Start up PROOF slaves.
1659
1661{
1662 // If this is a master server, find the config file and start slave
1663 // servers as specified in the config file
1665
1666 Int_t pc = 0;
1667 TList *workerList = new TList;
1668 // Get list of workers
1669 if (gProofServ->GetWorkers(workerList, pc) == TProofServ::kQueryStop) {
1670 TString emsg("no resource currently available for this session: please retry later");
1671 if (gDebug > 0) Info("StartSlaves", "%s", emsg.Data());
1673 return kFALSE;
1674 }
1675 // Setup the workers
1676 if (AddWorkers(workerList) < 0)
1677 return kFALSE;
1678
1679 } else {
1680
1681 // create master server
1682 Printf("Starting master: opening connection ...");
1683 TSlave *slave = CreateSubmaster(fUrl.GetUrl(), "0", "master", 0);
1684
1685 if (slave->IsValid()) {
1686
1687 // Notify
1688 fprintf(stderr,"Starting master:"
1689 " connection open: setting up server ... \r");
1690 StartupMessage("Connection to master opened", kTRUE, 1, 1);
1691
1692 if (!attach) {
1693
1694 // Set worker interrupt handler
1695 slave->SetInterruptHandler(kTRUE);
1696
1697 // Finalize setup of the server
1699
1700 if (slave->IsValid()) {
1701
1702 // Notify
1703 Printf("Starting master: OK ");
1704 StartupMessage("Master started", kTRUE, 1, 1);
1705
1706 // check protocol compatibility
1707 // protocol 1 is not supported anymore
1708 if (fProtocol == 1) {
1709 Error("StartSlaves",
1710 "client and remote protocols not compatible (%d and %d)",
1712 slave->Close("S");
1713 delete slave;
1714 return kFALSE;
1715 }
1716
1717 fSlaves->Add(slave);
1718 fAllMonitor->Add(slave->GetSocket());
1719
1720 // Unset worker interrupt handler
1722
1723 // Set interrupt PROOF handler from now on
1725
1726 // Give-up after 5 minutes
1727 Int_t rc = Collect(slave, 300);
1728 Int_t slStatus = slave->GetStatus();
1729 if (slStatus == -99 || slStatus == -98 || rc == 0) {
1730 fSlaves->Remove(slave);
1731 fAllMonitor->Remove(slave->GetSocket());
1732 if (slStatus == -99)
1733 Error("StartSlaves", "no resources available or problems setting up workers (check logs)");
1734 else if (slStatus == -98)
1735 Error("StartSlaves", "could not setup output redirection on master");
1736 else
1737 Error("StartSlaves", "setting up master");
1738 slave->Close("S");
1739 delete slave;
1740 return 0;
1741 }
1742
1743 if (!slave->IsValid()) {
1744 fSlaves->Remove(slave);
1745 fAllMonitor->Remove(slave->GetSocket());
1746 slave->Close("S");
1747 delete slave;
1748 Error("StartSlaves",
1749 "failed to setup connection with PROOF master server");
1750 return kFALSE;
1751 }
1752
1753 if (!gROOT->IsBatch() && TestBit(kUseProgressDialog)) {
1754 if ((fProgressDialog =
1755 gROOT->GetPluginManager()->FindHandler("TProofProgressDialog")))
1756 if (fProgressDialog->LoadPlugin() == -1)
1757 fProgressDialog = 0;
1758 }
1759 } else {
1760 // Notify
1761 Printf("Starting master: failure");
1762 }
1763 } else {
1764
1765 // Notify
1766 Printf("Starting master: OK ");
1767 StartupMessage("Master attached", kTRUE, 1, 1);
1768
1769 if (!gROOT->IsBatch() && TestBit(kUseProgressDialog)) {
1770 if ((fProgressDialog =
1771 gROOT->GetPluginManager()->FindHandler("TProofProgressDialog")))
1772 if (fProgressDialog->LoadPlugin() == -1)
1773 fProgressDialog = 0;
1774 }
1775
1776 fSlaves->Add(slave);
1778 }
1779
1780 } else {
1781 delete slave;
1782 // Notify only if verbosity is on: most likely the failure has already been notified
1783 if (gDebug > 0)
1784 Error("StartSlaves", "failed to create (or connect to) the PROOF master server");
1785 return kFALSE;
1786 }
1787 }
1788
1789 return kTRUE;
1790}
1791
1792////////////////////////////////////////////////////////////////////////////////
1793/// Close all open slave servers.
1794/// Client can decide to shutdown the remote session by passing option is 'S'
1795/// or 's'. Default for clients is detach, if supported. Masters always
1796/// shutdown the remote counterpart.
1797
1799{
1800 { std::lock_guard<std::recursive_mutex> lock(fCloseMutex);
1801
1802 fValid = kFALSE;
1803 if (fSlaves) {
1804 if (fIntHandler)
1806
1807 TIter nxs(fSlaves);
1808 TSlave *sl = 0;
1809 while ((sl = (TSlave *)nxs()))
1810 sl->Close(opt);
1811
1812 fActiveSlaves->Clear("nodelete");
1813 fUniqueSlaves->Clear("nodelete");
1814 fAllUniqueSlaves->Clear("nodelete");
1815 fNonUniqueMasters->Clear("nodelete");
1816 fBadSlaves->Clear("nodelete");
1817 fInactiveSlaves->Clear("nodelete");
1818 fSlaves->Delete();
1819 }
1820 }
1821
1823 gROOT->GetListOfSockets()->Remove(this);
1824
1825 if (fChains) {
1826 while (TChain *chain = dynamic_cast<TChain*> (fChains->First()) ) {
1827 // remove "chain" from list
1828 chain->SetProof(0);
1829 RemoveChain(chain);
1830 }
1831 }
1832
1833 if (IsProofd()) {
1834
1835 gROOT->GetListOfProofs()->Remove(this);
1836 if (gProof && gProof == this) {
1837 // Set previous proofd-related as default
1838 TIter pvp(gROOT->GetListOfProofs(), kIterBackward);
1839 while ((gProof = (TProof *)pvp())) {
1840 if (gProof->IsProofd())
1841 break;
1842 }
1843 }
1844 }
1845 }
1846}
1847
1848////////////////////////////////////////////////////////////////////////////////
1849/// Create a new TSlave of type TSlave::kSlave.
1850/// Note: creation of TSlave is private with TProof as a friend.
1851/// Derived classes must use this function to create slaves.
1852
1853TSlave *TProof::CreateSlave(const char *url, const char *ord,
1854 Int_t perf, const char *image, const char *workdir)
1855{
1856 TSlave* sl = TSlave::Create(url, ord, perf, image,
1857 this, TSlave::kSlave, workdir, 0);
1858
1859 if (sl->IsValid()) {
1860 sl->SetInputHandler(new TProofInputHandler(this, sl->GetSocket()));
1861 // must set fParallel to 1 for slaves since they do not
1862 // report their fParallel with a LOG_DONE message
1863 sl->fParallel = 1;
1864 }
1865
1866 return sl;
1867}
1868
1869
1870////////////////////////////////////////////////////////////////////////////////
1871/// Create a new TSlave of type TSlave::kMaster.
1872/// Note: creation of TSlave is private with TProof as a friend.
1873/// Derived classes must use this function to create slaves.
1874
1875TSlave *TProof::CreateSubmaster(const char *url, const char *ord,
1876 const char *image, const char *msd, Int_t nwk)
1877{
1878 TSlave *sl = TSlave::Create(url, ord, 100, image, this,
1879 TSlave::kMaster, 0, msd, nwk);
1880
1881 if (sl->IsValid()) {
1882 sl->SetInputHandler(new TProofInputHandler(this, sl->GetSocket()));
1883 }
1884
1885 return sl;
1886}
1887
1888////////////////////////////////////////////////////////////////////////////////
1889/// Find slave that has TSocket s. Returns 0 in case slave is not found.
1890
1892{
1893 TSlave *sl;
1894 TIter next(fSlaves);
1895
1896 while ((sl = (TSlave *)next())) {
1897 if (sl->IsValid() && sl->GetSocket() == s)
1898 return sl;
1899 }
1900 return 0;
1901}
1902
1903////////////////////////////////////////////////////////////////////////////////
1904/// Add to the fUniqueSlave list the active slaves that have a unique
1905/// (user) file system image. This information is used to transfer files
1906/// only once to nodes that share a file system (an image). Submasters
1907/// which are not in fUniqueSlaves are put in the fNonUniqueMasters
1908/// list. That list is used to trigger the transferring of files to
1909/// the submaster's unique slaves without the need to transfer the file
1910/// to the submaster.
1911
1913{
1919
1920 TIter next(fActiveSlaves);
1921
1922 while (TSlave *sl = dynamic_cast<TSlave*>(next())) {
1923 if (fImage == sl->fImage) {
1924 if (sl->GetSlaveType() == TSlave::kMaster) {
1926 fAllUniqueSlaves->Add(sl);
1927 fAllUniqueMonitor->Add(sl->GetSocket());
1928 }
1929 continue;
1930 }
1931
1932 TIter next2(fUniqueSlaves);
1933 TSlave *replace_slave = 0;
1934 Bool_t add = kTRUE;
1935 while (TSlave *sl2 = dynamic_cast<TSlave*>(next2())) {
1936 if (sl->fImage == sl2->fImage) {
1937 add = kFALSE;
1938 if (sl->GetSlaveType() == TSlave::kMaster) {
1939 if (sl2->GetSlaveType() == TSlave::kSlave) {
1940 // give preference to master
1941 replace_slave = sl2;
1942 add = kTRUE;
1943 } else if (sl2->GetSlaveType() == TSlave::kMaster) {
1945 fAllUniqueSlaves->Add(sl);
1946 fAllUniqueMonitor->Add(sl->GetSocket());
1947 } else {
1948 Error("FindUniqueSlaves", "TSlave is neither Master nor Slave");
1949 R__ASSERT(0);
1950 }
1951 }
1952 break;
1953 }
1954 }
1955
1956 if (add) {
1957 fUniqueSlaves->Add(sl);
1958 fAllUniqueSlaves->Add(sl);
1959 fUniqueMonitor->Add(sl->GetSocket());
1960 fAllUniqueMonitor->Add(sl->GetSocket());
1961 if (replace_slave) {
1962 fUniqueSlaves->Remove(replace_slave);
1963 fAllUniqueSlaves->Remove(replace_slave);
1964 fUniqueMonitor->Remove(replace_slave->GetSocket());
1965 fAllUniqueMonitor->Remove(replace_slave->GetSocket());
1966 }
1967 }
1968 }
1969
1970 // will be actiavted in Collect()
1973}
1974
1975////////////////////////////////////////////////////////////////////////////////
1976/// Return number of slaves as described in the config file.
1977
1979{
1980 return fSlaves->GetSize();
1981}
1982
1983////////////////////////////////////////////////////////////////////////////////
1984/// Return number of active slaves, i.e. slaves that are valid and in
1985/// the current computing group.
1986
1988{
1989 return fActiveSlaves->GetSize();
1990}
1991
1992////////////////////////////////////////////////////////////////////////////////
1993/// Return number of inactive slaves, i.e. slaves that are valid but not in
1994/// the current computing group.
1995
1997{
1998 return fInactiveSlaves->GetSize();
1999}
2000
2001////////////////////////////////////////////////////////////////////////////////
2002/// Return number of unique slaves, i.e. active slaves that have each a
2003/// unique different user files system.
2004
2006{
2007 return fUniqueSlaves->GetSize();
2008}
2009
2010////////////////////////////////////////////////////////////////////////////////
2011/// Return number of bad slaves. This are slaves that we in the config
2012/// file, but refused to startup or that died during the PROOF session.
2013
2015{
2016 return fBadSlaves->GetSize();
2017}
2018
2019////////////////////////////////////////////////////////////////////////////////
2020/// Ask the for the statistics of the slaves.
2021
2023{
2024 if (!IsValid()) return;
2025
2028}
2029
2030////////////////////////////////////////////////////////////////////////////////
2031/// Get statistics about CPU time, real time and bytes read.
2032/// If verbose, print the resuls (always available via GetCpuTime(), GetRealTime()
2033/// and GetBytesRead()
2034
2036{
2037 if (fProtocol > 27) {
2038 // This returns the correct result
2039 AskStatistics();
2040 } else {
2041 // AskStatistics is buggy: parse the output of Print()
2044 Print();
2045 gSystem->RedirectOutput(0, 0, &rh);
2046 TMacro *mp = GetLastLog();
2047 if (mp) {
2048 // Look for global directories
2049 TIter nxl(mp->GetListOfLines());
2050 TObjString *os = 0;
2051 while ((os = (TObjString *) nxl())) {
2052 TString s(os->GetName());
2053 if (s.Contains("Total MB's processed:")) {
2054 s.ReplaceAll("Total MB's processed:", "");
2055 if (s.IsFloat()) fBytesRead = (Long64_t) s.Atof() * (1024*1024);
2056 } else if (s.Contains("Total real time used (s):")) {
2057 s.ReplaceAll("Total real time used (s):", "");
2058 if (s.IsFloat()) fRealTime = s.Atof();
2059 } else if (s.Contains("Total CPU time used (s):")) {
2060 s.ReplaceAll("Total CPU time used (s):", "");
2061 if (s.IsFloat()) fCpuTime = s.Atof();
2062 }
2063 }
2064 delete mp;
2065 }
2066 }
2067
2068 if (verbose) {
2069 Printf(" Real/CPU time (s): %.3f / %.3f; workers: %d; processed: %.2f MBs",
2070 GetRealTime(), GetCpuTime(), GetParallel(), float(GetBytesRead())/(1024*1024));
2071 }
2072}
2073
2074////////////////////////////////////////////////////////////////////////////////
2075/// Ask the for the number of parallel slaves.
2076
2078{
2079 if (!IsValid()) return;
2080
2083}
2084
2085////////////////////////////////////////////////////////////////////////////////
2086/// Ask the master for the list of queries.
2087
2089{
2090 if (!IsValid() || TestBit(TProof::kIsMaster)) return (TList *)0;
2091
2092 Bool_t all = ((strchr(opt,'A') || strchr(opt,'a'))) ? kTRUE : kFALSE;
2094 m << all;
2097
2098 // This should have been filled by now
2099 return fQueries;
2100}
2101
2102////////////////////////////////////////////////////////////////////////////////
2103/// Number of queries processed by this session
2104
2106{
2107 if (fQueries)
2108 return fQueries->GetSize() - fOtherQueries;
2109 return 0;
2110}
2111
2112////////////////////////////////////////////////////////////////////////////////
2113/// Set max number of draw queries whose results are saved
2114
2116{
2117 if (max > 0) {
2118 if (fPlayer)
2120 fMaxDrawQueries = max;
2121 }
2122}
2123
2124////////////////////////////////////////////////////////////////////////////////
2125/// Get max number of queries whose full results are kept in the
2126/// remote sandbox
2127
2129{
2131 m << kFALSE;
2134}
2135
2136////////////////////////////////////////////////////////////////////////////////
2137/// Return pointer to the list of query results in the player
2138
2140{
2141 return (fPlayer ? fPlayer->GetListOfResults() : (TList *)0);
2142}
2143
2144////////////////////////////////////////////////////////////////////////////////
2145/// Return pointer to the full TQueryResult instance owned by the player
2146/// and referenced by 'ref'. If ref = 0 or "", return the last query result.
2147
2149{
2150 return (fPlayer ? fPlayer->GetQueryResult(ref) : (TQueryResult *)0);
2151}
2152
2153////////////////////////////////////////////////////////////////////////////////
2154/// Ask the master for the list of queries.
2155/// Options:
2156/// "A" show information about all the queries known to the
2157/// server, i.e. even those processed by other sessions
2158/// "L" show only information about queries locally available
2159/// i.e. already retrieved. If "L" is specified, "A" is
2160/// ignored.
2161/// "F" show all details available about queries
2162/// "H" print help menu
2163/// Default ""
2164
2166{
2167 Bool_t help = ((strchr(opt,'H') || strchr(opt,'h'))) ? kTRUE : kFALSE;
2168 if (help) {
2169
2170 // Help
2171
2172 Printf("+++");
2173 Printf("+++ Options: \"A\" show all queries known to server");
2174 Printf("+++ \"L\" show retrieved queries");
2175 Printf("+++ \"F\" full listing of query info");
2176 Printf("+++ \"H\" print this menu");
2177 Printf("+++");
2178 Printf("+++ (case insensitive)");
2179 Printf("+++");
2180 Printf("+++ Use Retrieve(<#>) to retrieve the full"
2181 " query results from the master");
2182 Printf("+++ e.g. Retrieve(8)");
2183
2184 Printf("+++");
2185
2186 return;
2187 }
2188
2189 if (!IsValid()) return;
2190
2191 Bool_t local = ((strchr(opt,'L') || strchr(opt,'l'))) ? kTRUE : kFALSE;
2192
2193 TObject *pq = 0;
2194 if (!local) {
2195 GetListOfQueries(opt);
2196
2197 if (!fQueries) return;
2198
2199 TIter nxq(fQueries);
2200
2201 // Queries processed by other sessions
2202 if (fOtherQueries > 0) {
2203 Printf("+++");
2204 Printf("+++ Queries processed during other sessions: %d", fOtherQueries);
2205 Int_t nq = 0;
2206 while (nq++ < fOtherQueries && (pq = nxq()))
2207 pq->Print(opt);
2208 }
2209
2210 // Queries processed by this session
2211 Printf("+++");
2212 Printf("+++ Queries processed during this session: selector: %d, draw: %d",
2214 while ((pq = nxq()))
2215 pq->Print(opt);
2216
2217 } else {
2218
2219 // Queries processed by this session
2220 Printf("+++");
2221 Printf("+++ Queries processed during this session: selector: %d, draw: %d",
2223
2224 // Queries available locally
2225 TList *listlocal = fPlayer ? fPlayer->GetListOfResults() : (TList *)0;
2226 if (listlocal) {
2227 Printf("+++");
2228 Printf("+++ Queries available locally: %d", listlocal->GetSize());
2229 TIter nxlq(listlocal);
2230 while ((pq = nxlq()))
2231 pq->Print(opt);
2232 }
2233 }
2234 Printf("+++");
2235}
2236
2237////////////////////////////////////////////////////////////////////////////////
2238/// See if the data is ready to be analyzed.
2239
2241{
2242 if (!IsValid()) return kFALSE;
2243
2244 TList submasters;
2245 TIter nextSlave(GetListOfActiveSlaves());
2246 while (TSlave *sl = dynamic_cast<TSlave*>(nextSlave())) {
2247 if (sl->GetSlaveType() == TSlave::kMaster) {
2248 submasters.Add(sl);
2249 }
2250 }
2251
2252 fDataReady = kTRUE; //see if any submasters set it to false
2253 fBytesReady = 0;
2254 fTotalBytes = 0;
2255 //loop over submasters and see if data is ready
2256 if (submasters.GetSize() > 0) {
2257 Broadcast(kPROOF_DATA_READY, &submasters);
2258 Collect(&submasters);
2259 }
2260
2261 bytesready = fBytesReady;
2262 totalbytes = fTotalBytes;
2263
2264 EmitVA("IsDataReady(Long64_t,Long64_t)", 2, totalbytes, bytesready);
2265
2266 PDB(kGlobal,2)
2267 Info("IsDataReady", "%lld / %lld (%s)",
2268 bytesready, totalbytes, fDataReady?"READY":"NOT READY");
2269
2270 return fDataReady;
2271}
2272
2273////////////////////////////////////////////////////////////////////////////////
2274/// Send interrupt to master or slave servers.
2275
2277{
2278 if (!IsValid()) return;
2279
2280 TList *slaves = 0;
2281 if (list == kAll) slaves = fSlaves;
2282 if (list == kActive) slaves = fActiveSlaves;
2283 if (list == kUnique) slaves = fUniqueSlaves;
2284 if (list == kAllUnique) slaves = fAllUniqueSlaves;
2285
2286 if (slaves->GetSize() == 0) return;
2287
2288 TSlave *sl;
2289 TIter next(slaves);
2290
2291 while ((sl = (TSlave *)next())) {
2292 if (sl->IsValid()) {
2293
2294 // Ask slave to progate the interrupt request
2295 sl->Interrupt((Int_t)type);
2296 }
2297 }
2298}
2299
2300////////////////////////////////////////////////////////////////////////////////
2301/// Returns number of slaves active in parallel mode. Returns 0 in case
2302/// there are no active slaves. Returns -1 in case of error.
2303
2305{
2306 if (!IsValid()) return -1;
2307
2308 // iterate over active slaves and return total number of slaves
2309 TIter nextSlave(GetListOfActiveSlaves());
2310 Int_t nparallel = 0;
2311 while (TSlave* sl = dynamic_cast<TSlave*>(nextSlave()))
2312 if (sl->GetParallel() >= 0)
2313 nparallel += sl->GetParallel();
2314
2315 return nparallel;
2316}
2317
2318////////////////////////////////////////////////////////////////////////////////
2319/// Returns list of TSlaveInfo's. In case of error return 0.
2320
2322{
2323 if (!IsValid()) return 0;
2324
2325 if (fSlaveInfo == 0) {
2328 } else {
2329 fSlaveInfo->Delete();
2330 }
2331
2332 TList masters;
2333 TIter next(GetListOfSlaves());
2334 TSlave *slave;
2335
2336 while ((slave = (TSlave *) next()) != 0) {
2337 if (slave->GetSlaveType() == TSlave::kSlave) {
2338 const char *name = IsLite() ? gSystem->HostName() : slave->GetName();
2339 TSlaveInfo *slaveinfo = new TSlaveInfo(slave->GetOrdinal(),
2340 name,
2341 slave->GetPerfIdx());
2342 fSlaveInfo->Add(slaveinfo);
2343
2344 TIter nextactive(GetListOfActiveSlaves());
2345 TSlave *activeslave;
2346 while ((activeslave = (TSlave *) nextactive())) {
2347 if (TString(slaveinfo->GetOrdinal()) == activeslave->GetOrdinal()) {
2348 slaveinfo->SetStatus(TSlaveInfo::kActive);
2349 break;
2350 }
2351 }
2352
2353 TIter nextbad(GetListOfBadSlaves());
2354 TSlave *badslave;
2355 while ((badslave = (TSlave *) nextbad())) {
2356 if (TString(slaveinfo->GetOrdinal()) == badslave->GetOrdinal()) {
2357 slaveinfo->SetStatus(TSlaveInfo::kBad);
2358 break;
2359 }
2360 }
2361 // Get system info if supported
2362 if (slave->IsValid()) {
2363 if (slave->GetSocket()->Send(kPROOF_GETSLAVEINFO) == -1)
2364 MarkBad(slave, "could not send kPROOF_GETSLAVEINFO message");
2365 else
2366 masters.Add(slave);
2367 }
2368
2369 } else if (slave->GetSlaveType() == TSlave::kMaster) {
2370 if (slave->IsValid()) {
2371 if (slave->GetSocket()->Send(kPROOF_GETSLAVEINFO) == -1)
2372 MarkBad(slave, "could not send kPROOF_GETSLAVEINFO message");
2373 else
2374 masters.Add(slave);
2375 }
2376 } else {
2377 Error("GetSlaveInfo", "TSlave is neither Master nor Slave");
2378 R__ASSERT(0);
2379 }
2380 }
2381 if (masters.GetSize() > 0) Collect(&masters);
2382
2383 return fSlaveInfo;
2384}
2385
2386////////////////////////////////////////////////////////////////////////////////
2387/// Activate slave server list.
2388
2390{
2391 TMonitor *mon = fAllMonitor;
2392 mon->DeActivateAll();
2393
2394 slaves = !slaves ? fActiveSlaves : slaves;
2395
2396 TIter next(slaves);
2397 TSlave *sl;
2398 while ((sl = (TSlave*) next())) {
2399 if (sl->IsValid())
2400 mon->Activate(sl->GetSocket());
2401 }
2402}
2403
2404////////////////////////////////////////////////////////////////////////////////
2405/// Activate (on == TRUE) or deactivate (on == FALSE) all sockets
2406/// monitored by 'mon'.
2407
2409{
2410 TMonitor *m = (mon) ? mon : fCurrentMonitor;
2411 if (m) {
2412 if (on)
2413 m->ActivateAll();
2414 else
2415 m->DeActivateAll();
2416 }
2417}
2418
2419////////////////////////////////////////////////////////////////////////////////
2420/// Broadcast the group priority to all workers in the specified list. Returns
2421/// the number of workers the message was successfully sent to.
2422/// Returns -1 in case of error.
2423
2424Int_t TProof::BroadcastGroupPriority(const char *grp, Int_t priority, TList *workers)
2425{
2426 if (!IsValid()) return -1;
2427
2428 if (workers->GetSize() == 0) return 0;
2429
2430 int nsent = 0;
2431 TIter next(workers);
2432
2433 TSlave *wrk;
2434 while ((wrk = (TSlave *)next())) {
2435 if (wrk->IsValid()) {
2436 if (wrk->SendGroupPriority(grp, priority) == -1)
2437 MarkBad(wrk, "could not send group priority");
2438 else
2439 nsent++;
2440 }
2441 }
2442
2443 return nsent;
2444}
2445
2446////////////////////////////////////////////////////////////////////////////////
2447/// Broadcast the group priority to all workers in the specified list. Returns
2448/// the number of workers the message was successfully sent to.
2449/// Returns -1 in case of error.
2450
2451Int_t TProof::BroadcastGroupPriority(const char *grp, Int_t priority, ESlaves list)
2452{
2453 TList *workers = 0;
2454 if (list == kAll) workers = fSlaves;
2455 if (list == kActive) workers = fActiveSlaves;
2456 if (list == kUnique) workers = fUniqueSlaves;
2457 if (list == kAllUnique) workers = fAllUniqueSlaves;
2458
2459 return BroadcastGroupPriority(grp, priority, workers);
2460}
2461
2462////////////////////////////////////////////////////////////////////////////////
2463/// Reset the merge progress notificator
2464
2466{
2468}
2469
2470////////////////////////////////////////////////////////////////////////////////
2471/// Broadcast a message to all slaves in the specified list. Returns
2472/// the number of slaves the message was successfully sent to.
2473/// Returns -1 in case of error.
2474
2476{
2477 if (!IsValid()) return -1;
2478
2479 if (!slaves || slaves->GetSize() == 0) return 0;
2480
2481 int nsent = 0;
2482 TIter next(slaves);
2483
2484 TSlave *sl;
2485 while ((sl = (TSlave *)next())) {
2486 if (sl->IsValid()) {
2487 if (sl->GetSocket()->Send(mess) == -1)
2488 MarkBad(sl, "could not broadcast request");
2489 else
2490 nsent++;
2491 }
2492 }
2493
2494 return nsent;
2495}
2496
2497////////////////////////////////////////////////////////////////////////////////
2498/// Broadcast a message to all slaves in the specified list (either
2499/// all slaves or only the active slaves). Returns the number of slaves
2500/// the message was successfully sent to. Returns -1 in case of error.
2501
2503{
2504 TList *slaves = 0;
2505 if (list == kAll) slaves = fSlaves;
2506 if (list == kActive) slaves = fActiveSlaves;
2507 if (list == kUnique) slaves = fUniqueSlaves;
2508 if (list == kAllUnique) slaves = fAllUniqueSlaves;
2509
2510 return Broadcast(mess, slaves);
2511}
2512
2513////////////////////////////////////////////////////////////////////////////////
2514/// Broadcast a character string buffer to all slaves in the specified
2515/// list. Use kind to set the TMessage what field. Returns the number of
2516/// slaves the message was sent to. Returns -1 in case of error.
2517
2518Int_t TProof::Broadcast(const char *str, Int_t kind, TList *slaves)
2519{
2520 TMessage mess(kind);
2521 if (str) mess.WriteString(str);
2522 return Broadcast(mess, slaves);
2523}
2524
2525////////////////////////////////////////////////////////////////////////////////
2526/// Broadcast a character string buffer to all slaves in the specified
2527/// list (either all slaves or only the active slaves). Use kind to
2528/// set the TMessage what field. Returns the number of slaves the message
2529/// was sent to. Returns -1 in case of error.
2530
2531Int_t TProof::Broadcast(const char *str, Int_t kind, ESlaves list)
2532{
2533 TMessage mess(kind);
2534 if (str) mess.WriteString(str);
2535 return Broadcast(mess, list);
2536}
2537
2538////////////////////////////////////////////////////////////////////////////////
2539/// Broadcast an object to all slaves in the specified list. Use kind to
2540/// set the TMEssage what field. Returns the number of slaves the message
2541/// was sent to. Returns -1 in case of error.
2542
2544{
2545 TMessage mess(kind);
2546 mess.WriteObject(obj);
2547 return Broadcast(mess, slaves);
2548}
2549
2550////////////////////////////////////////////////////////////////////////////////
2551/// Broadcast an object to all slaves in the specified list. Use kind to
2552/// set the TMEssage what field. Returns the number of slaves the message
2553/// was sent to. Returns -1 in case of error.
2554
2556{
2557 TMessage mess(kind);
2558 mess.WriteObject(obj);
2559 return Broadcast(mess, list);
2560}
2561
2562////////////////////////////////////////////////////////////////////////////////
2563/// Broadcast a raw buffer of specified length to all slaves in the
2564/// specified list. Returns the number of slaves the buffer was sent to.
2565/// Returns -1 in case of error.
2566
2567Int_t TProof::BroadcastRaw(const void *buffer, Int_t length, TList *slaves)
2568{
2569 if (!IsValid()) return -1;
2570
2571 if (slaves->GetSize() == 0) return 0;
2572
2573 int nsent = 0;
2574 TIter next(slaves);
2575
2576 TSlave *sl;
2577 while ((sl = (TSlave *)next())) {
2578 if (sl->IsValid()) {
2579 if (sl->GetSocket()->SendRaw(buffer, length) == -1)
2580 MarkBad(sl, "could not send broadcast-raw request");
2581 else
2582 nsent++;
2583 }
2584 }
2585
2586 return nsent;
2587}
2588
2589////////////////////////////////////////////////////////////////////////////////
2590/// Broadcast a raw buffer of specified length to all slaves in the
2591/// specified list. Returns the number of slaves the buffer was sent to.
2592/// Returns -1 in case of error.
2593
2595{
2596 TList *slaves = 0;
2597 if (list == kAll) slaves = fSlaves;
2598 if (list == kActive) slaves = fActiveSlaves;
2599 if (list == kUnique) slaves = fUniqueSlaves;
2600 if (list == kAllUnique) slaves = fAllUniqueSlaves;
2601
2602 return BroadcastRaw(buffer, length, slaves);
2603}
2604
2605////////////////////////////////////////////////////////////////////////////////
2606/// Broadcast file to all workers in the specified list. Returns the number of workers
2607/// the buffer was sent to.
2608/// Returns -1 in case of error.
2609
2610Int_t TProof::BroadcastFile(const char *file, Int_t opt, const char *rfile, TList *wrks)
2611{
2612 if (!IsValid()) return -1;
2613
2614 if (wrks->GetSize() == 0) return 0;
2615
2616 int nsent = 0;
2617 TIter next(wrks);
2618
2619 TSlave *wrk;
2620 while ((wrk = (TSlave *)next())) {
2621 if (wrk->IsValid()) {
2622 if (SendFile(file, opt, rfile, wrk) < 0)
2623 Error("BroadcastFile",
2624 "problems sending file to worker %s (%s)",
2625 wrk->GetOrdinal(), wrk->GetName());
2626 else
2627 nsent++;
2628 }
2629 }
2630
2631 return nsent;
2632}
2633
2634////////////////////////////////////////////////////////////////////////////////
2635/// Broadcast file to all workers in the specified list. Returns the number of workers
2636/// the buffer was sent to.
2637/// Returns -1 in case of error.
2638
2639Int_t TProof::BroadcastFile(const char *file, Int_t opt, const char *rfile, ESlaves list)
2640{
2641 TList *wrks = 0;
2642 if (list == kAll) wrks = fSlaves;
2643 if (list == kActive) wrks = fActiveSlaves;
2644 if (list == kUnique) wrks = fUniqueSlaves;
2645 if (list == kAllUnique) wrks = fAllUniqueSlaves;
2646
2647 return BroadcastFile(file, opt, rfile, wrks);
2648}
2649
2650////////////////////////////////////////////////////////////////////////////////
2651/// Release the used monitor to be used, making sure to delete newly created
2652/// monitors.
2653
2655{
2656 if (mon && (mon != fAllMonitor) && (mon != fActiveMonitor)
2657 && (mon != fUniqueMonitor) && (mon != fAllUniqueMonitor)) {
2658 delete mon;
2659 }
2660}
2661
2662////////////////////////////////////////////////////////////////////////////////
2663/// Collect responses from slave sl. Returns the number of slaves that
2664/// responded (=1).
2665/// If timeout >= 0, wait at most timeout seconds (timeout = -1 by default,
2666/// which means wait forever).
2667/// If defined (>= 0) endtype is the message that stops this collection.
2668
2669Int_t TProof::Collect(const TSlave *sl, Long_t timeout, Int_t endtype, Bool_t deactonfail)
2670{
2671 Int_t rc = 0;
2672
2673 TMonitor *mon = 0;
2674 if (!sl->IsValid()) return 0;
2675
2677 mon = new TMonitor;
2678 } else {
2679 mon = fAllMonitor;
2680 mon->DeActivateAll();
2681 }
2682 mon->Activate(sl->GetSocket());
2683
2684 rc = Collect(mon, timeout, endtype, deactonfail);
2685 ReleaseMonitor(mon);
2686 return rc;
2687}
2688
2689////////////////////////////////////////////////////////////////////////////////
2690/// Collect responses from the slave servers. Returns the number of slaves
2691/// that responded.
2692/// If timeout >= 0, wait at most timeout seconds (timeout = -1 by default,
2693/// which means wait forever).
2694/// If defined (>= 0) endtype is the message that stops this collection.
2695
2696Int_t TProof::Collect(TList *slaves, Long_t timeout, Int_t endtype, Bool_t deactonfail)
2697{
2698 Int_t rc = 0;
2699
2700 TMonitor *mon = 0;
2701
2703 mon = new TMonitor;
2704 } else {
2705 mon = fAllMonitor;
2706 mon->DeActivateAll();
2707 }
2708 TIter next(slaves);
2709 TSlave *sl;
2710 while ((sl = (TSlave*) next())) {
2711 if (sl->IsValid())
2712 mon->Activate(sl->GetSocket());
2713 }
2714
2715 rc = Collect(mon, timeout, endtype, deactonfail);
2716 ReleaseMonitor(mon);
2717 return rc;
2718}
2719
2720////////////////////////////////////////////////////////////////////////////////
2721/// Collect responses from the slave servers. Returns the number of slaves
2722/// that responded.
2723/// If timeout >= 0, wait at most timeout seconds (timeout = -1 by default,
2724/// which means wait forever).
2725/// If defined (>= 0) endtype is the message that stops this collection.
2726
2727Int_t TProof::Collect(ESlaves list, Long_t timeout, Int_t endtype, Bool_t deactonfail)
2728{
2729 Int_t rc = 0;
2730 TMonitor *mon = 0;
2731
2732 if (list == kAll) mon = fAllMonitor;
2733 if (list == kActive) mon = fActiveMonitor;
2734 if (list == kUnique) mon = fUniqueMonitor;
2735 if (list == kAllUnique) mon = fAllUniqueMonitor;
2736 if (fCurrentMonitor == mon) {
2737 // Get a copy
2738 mon = new TMonitor(*mon);
2739 }
2740 mon->ActivateAll();
2741
2742 rc = Collect(mon, timeout, endtype, deactonfail);
2743 ReleaseMonitor(mon);
2744 return rc;
2745}
2746
2747////////////////////////////////////////////////////////////////////////////////
2748/// Collect responses from the slave servers. Returns the number of messages
2749/// received. Can be 0 if there are no active slaves.
2750/// If timeout >= 0, wait at most timeout seconds (timeout = -1 by default,
2751/// which means wait forever).
2752/// If defined (>= 0) endtype is the message that stops this collection.
2753/// Collect also stops its execution from time to time to check for new
2754/// workers in Dynamic Startup mode.
2755
2756Int_t TProof::Collect(TMonitor *mon, Long_t timeout, Int_t endtype, Bool_t deactonfail)
2757{
2758 Int_t collectId = gRandom->Integer(9999);
2759
2760 PDB(kCollect, 3)
2761 Info("Collect", ">>>>>> Entering collect responses #%04d", collectId);
2762
2763 // Reset the status flag and clear the messages in the list, if any
2764 fStatus = 0;
2766
2767 Long_t actto = (Long_t)(gEnv->GetValue("Proof.SocketActivityTimeout", -1) * 1000);
2768
2769 if (!mon->GetActive(actto)) return 0;
2770
2772
2773 // Used by external code to know what we are monitoring
2774 TMonitor *savedMonitor = 0;
2775 if (fCurrentMonitor) {
2776 savedMonitor = fCurrentMonitor;
2777 fCurrentMonitor = mon;
2778 } else {
2779 fCurrentMonitor = mon;
2780 fBytesRead = 0;
2781 fRealTime = 0.0;
2782 fCpuTime = 0.0;
2783 }
2784
2785 // We want messages on the main window during synchronous collection,
2786 // but we save the present status to restore it at the end
2787 Bool_t saveRedirLog = fRedirLog;
2788 if (!IsIdle() && !IsSync())
2789 fRedirLog = kFALSE;
2790
2791 int cnt = 0, rc = 0;
2792
2793 // Timeout counter
2794 Long_t nto = timeout;
2795 PDB(kCollect, 2)
2796 Info("Collect","#%04d: active: %d", collectId, mon->GetActive());
2797
2798 // On clients, handle Ctrl-C during collection
2799 if (fIntHandler)
2800 fIntHandler->Add();
2801
2802 // Sockets w/o activity during the last 'sto' millisecs are deactivated
2803 Int_t nact = 0;
2804 Long_t sto = -1;
2805 Int_t nsto = 60;
2806 Int_t pollint = gEnv->GetValue("Proof.DynamicStartupPollInt", (Int_t) kPROOF_DynWrkPollInt_s);
2807 mon->ResetInterrupt();
2808 while ((nact = mon->GetActive(sto)) && (nto < 0 || nto > 0)) {
2809
2810 // Dump last waiting sockets, if in debug mode
2811 PDB(kCollect, 2) {
2812 if (nact < 4) {
2813 TList *al = mon->GetListOfActives();
2814 if (al && al->GetSize() > 0) {
2815 Info("Collect"," %d node(s) still active:", al->GetSize());
2816 TIter nxs(al);
2817 TSocket *xs = 0;
2818 while ((xs = (TSocket *)nxs())) {
2819 TSlave *wrk = FindSlave(xs);
2820 if (wrk)
2821 Info("Collect"," %s (%s)", wrk->GetName(), wrk->GetOrdinal());
2822 else
2823 Info("Collect"," %p: %s:%d", xs, xs->GetInetAddress().GetHostName(),
2824 xs->GetInetAddress().GetPort());
2825 }
2826 }
2827 delete al;
2828 }
2829 }
2830
2831 // Preemptive poll for new workers on the master only in Dynamic Mode and only
2832 // during processing (TODO: should work on Top Master only)
2834 ((fLastPollWorkers_s == -1) || (time(0)-fLastPollWorkers_s >= pollint))) {
2837 fLastPollWorkers_s = time(0);
2839 PDB(kCollect, 1)
2840 Info("Collect","#%04d: now active: %d", collectId, mon->GetActive());
2841 }
2842
2843 // Wait for a ready socket
2844 PDB(kCollect, 3)
2845 Info("Collect", "Will invoke Select() #%04d", collectId);
2846 TSocket *s = mon->Select(1000);
2847
2848 if (s && s != (TSocket *)(-1)) {
2849 // Get and analyse the info it did receive
2850 rc = CollectInputFrom(s, endtype, deactonfail);
2851 if (rc == 1 || (rc == 2 && !savedMonitor)) {
2852 // Deactivate it if we are done with it
2853 mon->DeActivate(s);
2854 TList *al = mon->GetListOfActives();
2855 PDB(kCollect, 2)
2856 Info("Collect","#%04d: deactivating %p (active: %d, %p)", collectId,
2857 s, mon->GetActive(),
2858 al->First());
2859 delete al;
2860 } else if (rc == 2) {
2861 // This end message was for the saved monitor
2862 // Deactivate it if we are done with it
2863 if (savedMonitor) {
2864 savedMonitor->DeActivate(s);
2865 TList *al = mon->GetListOfActives();
2866 PDB(kCollect, 2)
2867 Info("Collect","save monitor: deactivating %p (active: %d, %p)",
2868 s, savedMonitor->GetActive(),
2869 al->First());
2870 delete al;
2871 }
2872 }
2873
2874 // Update counter (if no error occured)
2875 if (rc >= 0)
2876 cnt++;
2877 } else {
2878 // If not timed-out, exit if not stopped or not aborted
2879 // (player exits status is finished in such a case); otherwise,
2880 // we still need to collect the partial output info
2881 if (!s)
2883 mon->DeActivateAll();
2884 // Decrease the timeout counter if requested
2885 if (s == (TSocket *)(-1) && nto > 0)
2886 nto--;
2887 }
2888
2889 // Check if there are workers with ready output to be sent and ask the first to send it
2891 // Maximum number of concurrent sendings
2892 Int_t mxws = gEnv->GetValue("Proof.ControlSendOutput", 1);
2893 if (TProof::GetParameter(fPlayer->GetInputList(), "PROOF_ControlSendOutput", mxws) != 0)
2894 mxws = gEnv->GetValue("Proof.ControlSendOutput", 1);
2895 TIter nxwr(fWrksOutputReady);
2896 TSlave *wrk = 0;
2897 while (mxws && (wrk = (TSlave *) nxwr())) {
2898 if (!wrk->TestBit(TSlave::kOutputRequested)) {
2899 // Ask worker for output
2900 TMessage sendoutput(kPROOF_SENDOUTPUT);
2901 PDB(kCollect, 2)
2902 Info("Collect", "worker %s was asked to send its output to master",
2903 wrk->GetOrdinal());
2904 if (wrk->GetSocket()->Send(sendoutput) != 1) {
2906 mxws--;
2907 }
2908 } else {
2909 // Count
2910 mxws--;
2911 }
2912 }
2913 }
2914
2915 // Check if we need to check the socket activity (we do it every 10 cycles ~ 10 sec)
2916 sto = -1;
2917 if (--nsto <= 0) {
2918 sto = (Long_t) actto;
2919 nsto = 60;
2920 }
2921
2922 } // end loop over active monitors
2923
2924 // If timed-out, deactivate the remaining sockets
2925 if (nto == 0) {
2926 TList *al = mon->GetListOfActives();
2927 if (al && al->GetSize() > 0) {
2928 // Notify the name of those which did timeout
2929 Info("Collect"," %d node(s) went in timeout:", al->GetSize());
2930 TIter nxs(al);
2931 TSocket *xs = 0;
2932 while ((xs = (TSocket *)nxs())) {
2933 TSlave *wrk = FindSlave(xs);
2934 if (wrk)
2935 Info("Collect"," %s", wrk->GetName());
2936 else
2937 Info("Collect"," %p: %s:%d", xs, xs->GetInetAddress().GetHostName(),
2938 xs->GetInetAddress().GetPort());
2939 }
2940 }
2941 delete al;
2942 mon->DeActivateAll();
2943 }
2944
2945 // Deactivate Ctrl-C special handler
2946 if (fIntHandler)
2948
2949 // make sure group view is up to date
2950 SendGroupView();
2951
2952 // Restore redirection setting
2953 fRedirLog = saveRedirLog;
2954
2955 // Restore the monitor
2956 fCurrentMonitor = savedMonitor;
2957
2959
2960 PDB(kCollect, 3)
2961 Info("Collect", "<<<<<< Exiting collect responses #%04d", collectId);
2962
2963 return cnt;
2964}
2965
2966////////////////////////////////////////////////////////////////////////////////
2967/// Asks the PROOF Serv for new workers in Dynamic Startup mode and activates
2968/// them. Returns the number of new workers found, or <0 on errors.
2969
2971{
2972 // Requests for worker updates
2973 Int_t dummy = 0;
2974 TList *reqWorkers = new TList();
2975 reqWorkers->SetOwner(kFALSE);
2976
2977 if (!TestBit(TProof::kIsMaster)) {
2978 Error("PollForNewWorkers", "Can't invoke: not on a master -- should not happen!");
2979 return -1;
2980 }
2981 if (!gProofServ) {
2982 Error("PollForNewWorkers", "No ProofServ available -- should not happen!");
2983 return -1;
2984 }
2985
2986 gProofServ->GetWorkers(reqWorkers, dummy, kTRUE); // last 2 are dummy
2987
2988 // List of new workers only (TProofNodeInfo)
2989 TList *newWorkers = new TList();
2990 newWorkers->SetOwner(kTRUE);
2991
2992 TIter next(reqWorkers);
2993 TProofNodeInfo *ni;
2994 TString fullOrd;
2995 while (( ni = dynamic_cast<TProofNodeInfo *>(next()) )) {
2996
2997 // Form the full ordinal
2998 fullOrd.Form("%s.%s", gProofServ->GetOrdinal(), ni->GetOrdinal().Data());
2999
3000 TIter nextInner(fSlaves);
3001 TSlave *sl;
3002 Bool_t found = kFALSE;
3003 while (( sl = dynamic_cast<TSlave *>(nextInner()) )) {
3004 if ( strcmp(sl->GetOrdinal(), fullOrd.Data()) == 0 ) {
3005 found = kTRUE;
3006 break;
3007 }
3008 }
3009
3010 if (found) delete ni;
3011 else {
3012 newWorkers->Add(ni);
3013 PDB(kGlobal, 1)
3014 Info("PollForNewWorkers", "New worker found: %s:%s",
3015 ni->GetNodeName().Data(), fullOrd.Data());
3016 }
3017 }
3018
3019 delete reqWorkers; // not owner
3020
3021 Int_t nNewWorkers = newWorkers->GetEntries();
3022
3023 // Add the new workers
3024 if (nNewWorkers > 0) {
3025 PDB(kGlobal, 1)
3026 Info("PollForNewWorkers", "Requesting to add %d new worker(s)", newWorkers->GetEntries());
3027 Int_t rv = AddWorkers(newWorkers);
3028 if (rv < 0) {
3029 Error("PollForNewWorkers", "Call to AddWorkers() failed (got %d < 0)", rv);
3030 return -1;
3031 }
3032 // Don't delete newWorkers: AddWorkers() will do that
3033 }
3034 else {
3035 PDB(kGlobal, 2)
3036 Info("PollForNewWorkers", "No new worker found");
3037 delete newWorkers;
3038 }
3039
3040 return nNewWorkers;
3041}
3042
3043////////////////////////////////////////////////////////////////////////////////
3044/// Remove links to objects in list 'ol' from gDirectory
3045
3047{
3048 if (ol) {
3049 TIter nxo(ol);
3050 TObject *o = 0;
3051 while ((o = nxo()))
3052 gDirectory->RecursiveRemove(o);
3053 }
3054}
3055
3056////////////////////////////////////////////////////////////////////////////////
3057/// Collect and analyze available input from socket s.
3058/// Returns 0 on success, -1 if any failure occurs.
3059
3061{
3062 TMessage *mess;
3063
3064 Int_t recvrc = 0;
3065 if ((recvrc = s->Recv(mess)) < 0) {
3066 PDB(kCollect,2)
3067 Info("CollectInputFrom","%p: got %d from Recv()", s, recvrc);
3068 Bool_t bad = kTRUE;
3069 if (recvrc == -5) {
3070 // Broken connection: try reconnection
3072 if (s->Reconnect() == 0) {
3074 bad = kFALSE;
3075 }
3076 }
3077 if (bad)
3078 MarkBad(s, "problems receiving a message in TProof::CollectInputFrom(...)");
3079 // Ignore this wake up
3080 return -1;
3081 }
3082 if (!mess) {
3083 // we get here in case the remote server died
3084 MarkBad(s, "undefined message in TProof::CollectInputFrom(...)");
3085 return -1;
3086 }
3087 Int_t rc = 0;
3088
3089 Int_t what = mess->What();
3090 TSlave *sl = FindSlave(s);
3091 rc = HandleInputMessage(sl, mess, deactonfail);
3092 if (rc == 1 && (endtype >= 0) && (what != endtype))
3093 // This message was for the base monitor in recursive case
3094 rc = 2;
3095
3096 // We are done successfully
3097 return rc;
3098}
3099
3100////////////////////////////////////////////////////////////////////////////////
3101/// Analyze the received message.
3102/// Returns 0 on success (1 if this the last message from this socket), -1 if
3103/// any failure occurs.
3104
3106{
3107 char str[512];
3108 TObject *obj;
3109 Int_t rc = 0;
3110
3111 if (!mess || !sl) {
3112 Warning("HandleInputMessage", "given an empty message or undefined worker");
3113 return -1;
3114 }
3115 Bool_t delete_mess = kTRUE;
3116 TSocket *s = sl->GetSocket();
3117 if (!s) {
3118 Warning("HandleInputMessage", "worker socket is undefined");
3119 return -1;
3120 }
3121
3122 // The message type
3123 Int_t what = mess->What();
3124
3125 PDB(kCollect,3)
3126 Info("HandleInputMessage", "got type %d from '%s'", what, sl->GetOrdinal());
3127
3128 switch (what) {
3129
3130 case kMESS_OK:
3131 // Add the message to the list
3132 fRecvMessages->Add(mess);
3133 delete_mess = kFALSE;
3134 break;
3135
3136 case kMESS_OBJECT:
3137 if (fPlayer) fPlayer->HandleRecvHisto(mess);
3138 break;
3139
3140 case kPROOF_FATAL:
3141 { TString msg;
3142 if ((mess->BufferSize() > mess->Length()))
3143 (*mess) >> msg;
3144 if (msg.IsNull()) {
3145 MarkBad(s, "received kPROOF_FATAL");
3146 } else {
3147 MarkBad(s, msg);
3148 }
3149 }
3151 // Finalize the progress dialog
3152 Emit("StopProcess(Bool_t)", kTRUE);
3153 }
3154 break;
3155
3156 case kPROOF_STOP:
3157 // Stop collection from this worker
3158 Info("HandleInputMessage", "received kPROOF_STOP from %s: disabling any further collection this worker",
3159 sl->GetOrdinal());
3160 rc = 1;
3161 break;
3162
3164 // Add the message to the list
3165 fRecvMessages->Add(mess);
3166 delete_mess = kFALSE;
3167 rc = 1;
3168 break;
3169
3170 case kPROOF_TOUCH:
3171 // send a request for touching the remote admin file
3172 {
3173 sl->Touch();
3174 }
3175 break;
3176
3177 case kPROOF_GETOBJECT:
3178 // send slave object it asks for
3179 mess->ReadString(str, sizeof(str));
3180 obj = gDirectory->Get(str);
3181 if (obj)
3182 s->SendObject(obj);
3183 else
3184 s->Send(kMESS_NOTOK);
3185 break;
3186
3187 case kPROOF_GETPACKET:
3188 {
3189 PDB(kGlobal,2)
3190 Info("HandleInputMessage","%s: kPROOF_GETPACKET", sl->GetOrdinal());
3191 TDSetElement *elem = 0;
3192 elem = fPlayer ? fPlayer->GetNextPacket(sl, mess) : 0;
3193
3194 if (elem != (TDSetElement*) -1) {
3196 answ << elem;
3197 s->Send(answ);
3198
3199 while (fWaitingSlaves != 0 && fWaitingSlaves->GetSize()) {
3200 TPair *p = (TPair*) fWaitingSlaves->First();
3201 s = (TSocket*) p->Key();
3202 TMessage *m = (TMessage*) p->Value();
3203
3204 elem = fPlayer ? fPlayer->GetNextPacket(sl, m) : 0;
3205 if (elem != (TDSetElement*) -1) {
3207 a << elem;
3208 s->Send(a);
3209 // remove has to happen via Links because TPair does not have
3210 // a Compare() function and therefore RemoveFirst() and
3211 // Remove(TObject*) do not work
3213 delete p;
3214 delete m;
3215 } else {
3216 break;
3217 }
3218 }
3219 } else {
3220 if (fWaitingSlaves == 0) fWaitingSlaves = new TList;
3221 fWaitingSlaves->Add(new TPair(s, mess));
3222 delete_mess = kFALSE;
3223 }
3224 }
3225 break;
3226
3227 case kPROOF_LOGFILE:
3228 {
3229 Int_t size;
3230 (*mess) >> size;
3231 PDB(kGlobal,2)
3232 Info("HandleInputMessage","%s: kPROOF_LOGFILE: size: %d", sl->GetOrdinal(), size);
3233 RecvLogFile(s, size);
3234 }
3235 break;
3236
3237 case kPROOF_LOGDONE:
3238 (*mess) >> sl->fStatus >> sl->fParallel;
3239 PDB(kCollect,2)
3240 Info("HandleInputMessage","%s: kPROOF_LOGDONE: status %d parallel %d",
3241 sl->GetOrdinal(), sl->fStatus, sl->fParallel);
3242 if (sl->fStatus != 0) {
3243 // Return last nonzero status
3244 fStatus = sl->fStatus;
3245 // Deactivate the worker, if required
3246 if (deactonfail) DeactivateWorker(sl->fOrdinal);
3247 }
3248 // Remove from the workers-ready list
3252 }
3253 rc = 1;
3254 break;
3255
3256 case kPROOF_GETSTATS:
3257 {
3258 (*mess) >> sl->fBytesRead >> sl->fRealTime >> sl->fCpuTime
3259 >> sl->fWorkDir >> sl->fProofWorkDir;
3260 PDB(kCollect,2)
3261 Info("HandleInputMessage", "kPROOF_GETSTATS: %s", sl->fWorkDir.Data());
3262 TString img;
3263 if ((mess->BufferSize() > mess->Length()))
3264 (*mess) >> img;
3265 // Set image
3266 if (img.IsNull()) {
3267 if (sl->fImage.IsNull())
3268 sl->fImage.Form("%s:%s", TUrl(sl->fName).GetHostFQDN(),
3269 sl->fProofWorkDir.Data());
3270 } else {
3271 sl->fImage = img;
3272 }
3273 PDB(kGlobal,2)
3274 Info("HandleInputMessage",
3275 "kPROOF_GETSTATS:%s image: %s", sl->GetOrdinal(), sl->GetImage());
3276
3277 fBytesRead += sl->fBytesRead;
3278 fRealTime += sl->fRealTime;
3279 fCpuTime += sl->fCpuTime;
3280 rc = 1;
3281 }
3282 break;
3283
3284 case kPROOF_GETPARALLEL:
3285 {
3286 Bool_t async = kFALSE;
3287 (*mess) >> sl->fParallel;
3288 if ((mess->BufferSize() > mess->Length()))
3289 (*mess) >> async;
3290 rc = (async) ? 0 : 1;
3291 }
3292 break;
3293
3294 case kPROOF_CHECKFILE:
3295 { // New servers (>= 5.22) send the status
3296 if ((mess->BufferSize() > mess->Length())) {
3297 (*mess) >> fCheckFileStatus;
3298 } else {
3299 // Form old servers this meant success (failure was signaled with the
3300 // dangerous kPROOF_FATAL)
3301 fCheckFileStatus = 1;
3302 }
3303 rc = 1;
3304 }
3305 break;
3306
3307 case kPROOF_SENDFILE:
3308 { // New server: signals ending of sendfile operation
3309 rc = 1;
3310 }
3311 break;
3312
3314 {
3315 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_PACKAGE_LIST: enter");
3316 Int_t type = 0;
3317 (*mess) >> type;
3318 switch (type) {
3322 if (fEnabledPackages) {
3324 } else {
3325 Error("HandleInputMessage",
3326 "kPROOF_PACKAGE_LIST: kListEnabledPackages: TList not found in message!");
3327 }
3328 break;
3332 if (fAvailablePackages) {
3334 } else {
3335 Error("HandleInputMessage",
3336 "kPROOF_PACKAGE_LIST: kListPackages: TList not found in message!");
3337 }
3338 break;
3339 default:
3340 Error("HandleInputMessage", "kPROOF_PACKAGE_LIST: unknown type: %d", type);
3341 }
3342 }
3343 break;
3344
3345 case kPROOF_SENDOUTPUT:
3346 {
3347 // We start measuring the merging time
3349
3350 // Worker is ready to send output: make sure the relevant bit is reset
3352 PDB(kGlobal,2)
3353 Info("HandleInputMessage","kPROOF_SENDOUTPUT: enter (%s)", sl->GetOrdinal());
3354 // Create the list if not yet done
3355 if (!fWrksOutputReady) {
3356 fWrksOutputReady = new TList;
3358 }
3359 fWrksOutputReady->Add(sl);
3360 }
3361 break;
3362
3364 {
3365 // We start measuring the merging time
3367
3368 PDB(kGlobal,2)
3369 Info("HandleInputMessage","kPROOF_OUTPUTOBJECT: enter");
3370 Int_t type = 0;
3371 const char *prefix = gProofServ ? gProofServ->GetPrefix() : "Lite-0";
3373 Info("HandleInputMessage", "finalization on %s started ...", prefix);
3375 }
3376
3377 while ((mess->BufferSize() > mess->Length())) {
3378 (*mess) >> type;
3379 // If a query result header, add it to the player list
3380 if (fPlayer) {
3381 if (type == 0) {
3382 // Retrieve query result instance (output list not filled)
3383 TQueryResult *pq =
3385 if (pq) {
3386 // Add query to the result list in TProofPlayer
3389 // And clear the output list, as we start merging a new set of results
3390 if (fPlayer->GetOutputList())
3392 // Add the unique query tag as TNamed object to the input list
3393 // so that it is available in TSelectors for monitoring
3394 TString qid = TString::Format("%s:%s",pq->GetTitle(),pq->GetName());
3395 if (fPlayer->GetInputList()->FindObject("PROOF_QueryTag"))
3396 fPlayer->GetInputList()->Remove(fPlayer->GetInputList()->FindObject("PROOF_QueryTag"));
3397 fPlayer->AddInput(new TNamed("PROOF_QueryTag", qid.Data()));
3398 } else {
3399 Warning("HandleInputMessage","kPROOF_OUTPUTOBJECT: query result missing");
3400 }
3401 } else if (type > 0) {
3402 // Read object
3403 TObject *o = mess->ReadObject(TObject::Class());
3404 // Increment counter on the client side
3406 TString msg;
3407 Bool_t changed = kFALSE;
3408 msg.Form("%s: merging output objects ... %s", prefix, fMergePrg.Export(changed));
3409 if (gProofServ) {
3411 } else if (IsTty() || changed) {
3412 fprintf(stderr, "%s\r", msg.Data());
3413 }
3414 // Add or merge it
3415 if ((fPlayer->AddOutputObject(o) == 1)) {
3416 // Remove the object if it has been merged
3417 SafeDelete(o);
3418 }
3419 if (type > 1) {
3420 // Update the merger progress info
3422 if (TestBit(TProof::kIsClient) && !IsLite()) {
3423 // In PROOFLite this has to be done once only in TProofLite::Process
3425 if (pq) {
3427 // Add input objects (do not override remote settings, if any)
3428 TObject *xo = 0;
3429 TIter nxin(fPlayer->GetInputList());
3430 // Servers prior to 5.28/00 do not create the input list in the TQueryResult
3431 if (!pq->GetInputList()) pq->SetInputList(new TList());
3432 while ((xo = nxin()))
3433 if (!pq->GetInputList()->FindObject(xo->GetName()))
3434 pq->AddInput(xo->Clone());
3435 // If the last object, notify the GUI that the result arrived
3436 QueryResultReady(TString::Format("%s:%s", pq->GetTitle(), pq->GetName()));
3437 }
3438 // Processing is over
3439 UpdateDialog();
3440 }
3441 }
3442 }
3443 } else {
3444 Warning("HandleInputMessage", "kPROOF_OUTPUTOBJECT: player undefined!");
3445 }
3446 }
3447 }
3448 break;
3449
3450 case kPROOF_OUTPUTLIST:
3451 {
3452 // We start measuring the merging time
3453
3454 PDB(kGlobal,2)
3455 Info("HandleInputMessage","%s: kPROOF_OUTPUTLIST: enter", sl->GetOrdinal());
3456 TList *out = 0;
3457 if (fPlayer) {
3459 if (TestBit(TProof::kIsMaster) || fProtocol < 7) {
3460 out = (TList *) mess->ReadObject(TList::Class());
3461 } else {
3462 TQueryResult *pq =
3464 if (pq) {
3465 // Add query to the result list in TProofPlayer
3468 // To avoid accidental cleanups from anywhere else
3469 // remove objects from gDirectory and clone the list
3470 out = pq->GetOutputList();
3471 CleanGDirectory(out);
3472 out = (TList *) out->Clone();
3473 // Notify the GUI that the result arrived
3474 QueryResultReady(TString::Format("%s:%s", pq->GetTitle(), pq->GetName()));
3475 } else {
3476 PDB(kGlobal,2)
3477 Info("HandleInputMessage",
3478 "%s: kPROOF_OUTPUTLIST: query result missing", sl->GetOrdinal());
3479 }
3480 }
3481 if (out) {
3482 out->SetOwner();
3483 fPlayer->AddOutput(out); // Incorporate the list
3484 SafeDelete(out);
3485 } else {
3486 PDB(kGlobal,2)
3487 Info("HandleInputMessage",
3488 "%s: kPROOF_OUTPUTLIST: outputlist is empty", sl->GetOrdinal());
3489 }
3490 } else {
3491 Warning("HandleInputMessage",
3492 "%s: kPROOF_OUTPUTLIST: player undefined!", sl->GetOrdinal());
3493 }
3494 // On clients at this point processing is over
3495 if (TestBit(TProof::kIsClient) && !IsLite())
3496 UpdateDialog();
3497 }
3498 break;
3499
3500 case kPROOF_QUERYLIST:
3501 {
3502 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_QUERYLIST: enter");
3503 (*mess) >> fOtherQueries >> fDrawQueries;
3504 if (fQueries) {
3505 fQueries->Delete();
3506 delete fQueries;
3507 fQueries = 0;
3508 }
3509 fQueries = (TList *) mess->ReadObject(TList::Class());
3510 }
3511 break;
3512
3513 case kPROOF_RETRIEVE:
3514 {
3515 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_RETRIEVE: enter");
3516 TQueryResult *pq =
3518 if (pq && fPlayer) {
3520 // Notify the GUI that the result arrived
3521 QueryResultReady(TString::Format("%s:%s", pq->GetTitle(), pq->GetName()));
3522 } else {
3523 PDB(kGlobal,2)
3524 Info("HandleInputMessage",
3525 "kPROOF_RETRIEVE: query result missing or player undefined");
3526 }
3527 }
3528 break;
3529
3530 case kPROOF_MAXQUERIES:
3531 {
3532 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_MAXQUERIES: enter");
3533 Int_t max = 0;
3534
3535 (*mess) >> max;
3536 Printf("Number of queries fully kept remotely: %d", max);
3537 }
3538 break;
3539
3541 {
3542 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_SERVERSTARTED: enter");
3543
3544 UInt_t tot = 0, done = 0;
3545 TString action;
3546 Bool_t st = kTRUE;
3547
3548 (*mess) >> action >> tot >> done >> st;
3549
3551 if (tot) {
3552 TString type = (action.Contains("submas")) ? "submasters"
3553 : "workers";
3554 Int_t frac = (Int_t) (done*100.)/tot;
3555 char msg[512] = {0};
3556 if (frac >= 100) {
3557 snprintf(msg, 512, "%s: OK (%d %s) \n",
3558 action.Data(),tot, type.Data());
3559 } else {
3560 snprintf(msg, 512, "%s: %d out of %d (%d %%)\r",
3561 action.Data(), done, tot, frac);
3562 }
3563 if (fSync)
3564 fprintf(stderr,"%s", msg);
3565 else
3566 NotifyLogMsg(msg, 0);
3567 }
3568 // Notify GUIs
3569 StartupMessage(action.Data(), st, (Int_t)done, (Int_t)tot);
3570 } else {
3571
3572 // Just send the message one level up
3574 m << action << tot << done << st;
3576 }
3577 }
3578 break;
3579
3581 {
3582 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_DATASET_STATUS: enter");
3583
3584 UInt_t tot = 0, done = 0;
3585 TString action;
3586 Bool_t st = kTRUE;
3587
3588 (*mess) >> action >> tot >> done >> st;
3589
3591 if (tot) {
3592 TString type = "files";
3593 Int_t frac = (Int_t) (done*100.)/tot;
3594 char msg[512] = {0};
3595 if (frac >= 100) {
3596 snprintf(msg, 512, "%s: OK (%d %s) \n",
3597 action.Data(),tot, type.Data());
3598 } else {
3599 snprintf(msg, 512, "%s: %d out of %d (%d %%)\r",
3600 action.Data(), done, tot, frac);
3601 }
3602 if (fSync)
3603 fprintf(stderr,"%s", msg);
3604 else
3605 NotifyLogMsg(msg, 0);
3606 }
3607 // Notify GUIs
3608 DataSetStatus(action.Data(), st, (Int_t)done, (Int_t)tot);
3609 } else {
3610
3611 // Just send the message one level up
3613 m << action << tot << done << st;
3615 }
3616 }
3617 break;
3618
3620 {
3621 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_STARTPROCESS: enter");
3622
3623 // For Proof-Lite this variable is the number of workers and is set
3624 // by the player
3625 if (!IsLite()) {
3626 fNotIdle = 1;
3628 }
3629
3630 // Redirect the output, if needed
3632
3633 // The signal is used on masters by XrdProofdProtocol to catch
3634 // the start of processing; on clients it allows to update the
3635 // progress dialog
3636 if (!TestBit(TProof::kIsMaster)) {
3637
3638 // This is the end of preparation
3639 fQuerySTW.Stop();
3641 PDB(kGlobal,2) Info("HandleInputMessage","Preparation time: %f s", fPrepTime);
3642
3643 TString selec;
3644 Int_t dsz = -1;
3645 Long64_t first = -1, nent = -1;
3646 (*mess) >> selec >> dsz >> first >> nent;
3647 // Start or reset the progress dialog
3648 if (!gROOT->IsBatch()) {
3649 if (fProgressDialog &&
3652 fProgressDialog->ExecPlugin(5, this,
3653 selec.Data(), dsz, first, nent);
3655 } else {
3656 ResetProgressDialog(selec, dsz, first, nent);
3657 }
3658 }
3660 }
3661 }
3662 }
3663 break;
3664
3665 case kPROOF_ENDINIT:
3666 {
3667 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_ENDINIT: enter");
3668
3670 if (fPlayer)
3672 }
3673 }
3674 break;
3675
3676 case kPROOF_SETIDLE:
3677 {
3678 PDB(kGlobal,2)
3679 Info("HandleInputMessage","kPROOF_SETIDLE from '%s': enter (%d)", sl->GetOrdinal(), fNotIdle);
3680
3681 // The session is idle
3682 if (IsLite()) {
3683 if (fNotIdle > 0) {
3684 fNotIdle--;
3685 PDB(kGlobal,2)
3686 Info("HandleInputMessage", "%s: got kPROOF_SETIDLE", sl->GetOrdinal());
3687 } else {
3688 Warning("HandleInputMessage",
3689 "%s: got kPROOF_SETIDLE but no running workers ! protocol error?",
3690 sl->GetOrdinal());
3691 }
3692 } else {
3693 fNotIdle = 0;
3694 // Check if the query has been enqueued
3695 if ((mess->BufferSize() > mess->Length()))
3696 (*mess) >> fIsWaiting;
3697 }
3698 }
3699 break;
3700
3702 {
3703 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_QUERYSUBMITTED: enter");
3704
3705 // We have received the sequential number
3706 (*mess) >> fSeqNum;
3707 Bool_t sync = fSync;
3708 if ((mess->BufferSize() > mess->Length()))
3709 (*mess) >> sync;
3710 if (sync != fSync && fSync) {
3711 // The server required to switch to asynchronous mode
3712 Activate();
3713 fSync = kFALSE;
3714 }
3715 DisableGoAsyn();
3716 // Check if the query has been enqueued
3717 fIsWaiting = kTRUE;
3718 // For Proof-Lite this variable is the number of workers and is set by the player
3719 if (!IsLite())
3720 fNotIdle = 1;
3721
3722 rc = 1;
3723 }
3724 break;
3725
3726 case kPROOF_SESSIONTAG:
3727 {
3728 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_SESSIONTAG: enter");
3729
3730 // We have received the unique tag and save it as name of this object
3731 TString stag;
3732 (*mess) >> stag;
3733 SetName(stag);
3734 // In the TSlave object
3735 sl->SetSessionTag(stag);
3736 // Server may have also sent the group
3737 if ((mess->BufferSize() > mess->Length()))
3738 (*mess) >> fGroup;
3739 // Server may have also sent the user
3740 if ((mess->BufferSize() > mess->Length())) {
3741 TString usr;
3742 (*mess) >> usr;
3743 if (!usr.IsNull()) fUrl.SetUser(usr.Data());
3744 }
3745 }
3746 break;
3747
3748 case kPROOF_FEEDBACK:
3749 {
3750 PDB(kGlobal,2)
3751 Info("HandleInputMessage","kPROOF_FEEDBACK: enter");
3752 TList *out = (TList *) mess->ReadObject(TList::Class());
3753 out->SetOwner();
3754 if (fPlayer)
3755 fPlayer->StoreFeedback(sl, out); // Adopts the list
3756 else
3757 // Not yet ready: stop collect asap
3758 rc = 1;
3759 }
3760 break;
3761
3762 case kPROOF_AUTOBIN:
3763 {
3764 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_AUTOBIN: enter");
3765
3766 TString name;
3767 Double_t xmin, xmax, ymin, ymax, zmin, zmax;
3768
3769 (*mess) >> name >> xmin >> xmax >> ymin >> ymax >> zmin >> zmax;
3770
3772
3774
3775 answ << name << xmin << xmax << ymin << ymax << zmin << zmax;
3776
3777 s->Send(answ);
3778 }
3779 break;
3780
3781 case kPROOF_PROGRESS:
3782 {
3783 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_PROGRESS: enter");
3784
3785 if (GetRemoteProtocol() > 25) {
3786 // New format
3788 (*mess) >> pi;
3789 fPlayer->Progress(sl,pi);
3790 } else if (GetRemoteProtocol() > 11) {
3791 Long64_t total, processed, bytesread;
3792 Float_t initTime, procTime, evtrti, mbrti;
3793 (*mess) >> total >> processed >> bytesread
3794 >> initTime >> procTime
3795 >> evtrti >> mbrti;
3796 if (fPlayer)
3797 fPlayer->Progress(sl, total, processed, bytesread,
3798 initTime, procTime, evtrti, mbrti);
3799
3800 } else {
3801 // Old format
3802 Long64_t total, processed;
3803 (*mess) >> total >> processed;
3804 if (fPlayer)
3805 fPlayer->Progress(sl, total, processed);
3806 }
3807 }
3808 break;
3809
3810 case kPROOF_STOPPROCESS:
3811 {
3812 // This message is sent from a worker that finished processing.
3813 // We determine whether it was asked to finish by the
3814 // packetizer or stopped during processing a packet
3815 // (by TProof::RemoveWorkers() or by an external signal).
3816 // In the later case call packetizer->MarkBad.
3817 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_STOPPROCESS: enter");
3818
3819 Long64_t events = 0;
3820 Bool_t abort = kFALSE;
3821 TProofProgressStatus *status = 0;
3822
3823 if ((mess->BufferSize() > mess->Length()) && (fProtocol > 18)) {
3824 (*mess) >> status >> abort;
3825 } else if ((mess->BufferSize() > mess->Length()) && (fProtocol > 8)) {
3826 (*mess) >> events >> abort;
3827 } else {
3828 (*mess) >> events;
3829 }
3830 if (fPlayer) {
3831 if (fProtocol > 18) {
3832 TList *listOfMissingFiles = 0;
3833 if (!(listOfMissingFiles = (TList *)GetOutput("MissingFiles"))) {
3834 listOfMissingFiles = new TList();
3835 listOfMissingFiles->SetName("MissingFiles");
3836 if (fPlayer)
3837 fPlayer->AddOutputObject(listOfMissingFiles);
3838 }
3839 if (fPlayer->GetPacketizer()) {
3840 Int_t ret =
3841 fPlayer->GetPacketizer()->AddProcessed(sl, status, 0, &listOfMissingFiles);
3842 if (ret > 0)
3843 fPlayer->GetPacketizer()->MarkBad(sl, status, &listOfMissingFiles);
3844 // This object is now owned by the packetizer
3845 status = 0;
3846 }
3847 if (status) fPlayer->AddEventsProcessed(status->GetEntries());
3848 } else {
3849 fPlayer->AddEventsProcessed(events);
3850 }
3851 }
3852 SafeDelete(status);
3854 Emit("StopProcess(Bool_t)", abort);
3855 break;
3856 }
3857
3858 case kPROOF_SUBMERGER:
3859 {
3860 PDB(kGlobal,2) Info("HandleInputMessage", "kPROOF_SUBMERGER: enter");
3861 HandleSubmerger(mess, sl);
3862 }
3863 break;
3864
3866 {
3867 PDB(kGlobal,2) Info("HandleInputMessage", "kPROOF_GETSLAVEINFO: enter");
3868
3869 Bool_t active = (GetListOfActiveSlaves()->FindObject(sl) != 0);
3870 Bool_t bad = (GetListOfBadSlaves()->FindObject(sl) != 0);
3871 TList* tmpinfo = 0;
3872 (*mess) >> tmpinfo;
3873 if (tmpinfo == 0) {
3874 Error("HandleInputMessage", "kPROOF_GETSLAVEINFO: no list received!");
3875 } else {
3876 tmpinfo->SetOwner(kFALSE);
3877 Int_t nentries = tmpinfo->GetSize();
3878 for (Int_t i=0; i<nentries; i++) {
3879 TSlaveInfo* slinfo =
3880 dynamic_cast<TSlaveInfo*>(tmpinfo->At(i));
3881 if (slinfo) {
3882 // If PROOF-Lite
3883 if (IsLite()) slinfo->fHostName = gSystem->HostName();
3884 // Check if we have already a instance for this worker
3885 TIter nxw(fSlaveInfo);
3886 TSlaveInfo *ourwi = 0;
3887 while ((ourwi = (TSlaveInfo *)nxw())) {
3888 if (!strcmp(ourwi->GetOrdinal(), slinfo->GetOrdinal())) {
3889 ourwi->SetSysInfo(slinfo->GetSysInfo());
3890 ourwi->fHostName = slinfo->GetName();
3891 if (slinfo->GetDataDir() && (strlen(slinfo->GetDataDir()) > 0))
3892 ourwi->fDataDir = slinfo->GetDataDir();
3893 break;
3894 }
3895 }
3896 if (!ourwi) {
3897 fSlaveInfo->Add(slinfo);
3898 } else {
3899 slinfo = ourwi;
3900 }
3901 if (slinfo->fStatus != TSlaveInfo::kBad) {
3902 if (!active) slinfo->SetStatus(TSlaveInfo::kNotActive);
3903 if (bad) slinfo->SetStatus(TSlaveInfo::kBad);
3904 }
3905 if (sl->GetMsd() && (strlen(sl->GetMsd()) > 0))
3906 slinfo->fMsd = sl->GetMsd();
3907 }
3908 }
3909 delete tmpinfo;
3910 rc = 1;
3911 }
3912 }
3913 break;
3914
3916 {
3917 PDB(kGlobal,2)
3918 Info("HandleInputMessage", "kPROOF_VALIDATE_DSET: enter");
3919 TDSet* dset = 0;
3920 (*mess) >> dset;
3921 if (!fDSet)
3922 Error("HandleInputMessage", "kPROOF_VALIDATE_DSET: fDSet not set");
3923 else
3924 fDSet->Validate(dset);
3925 delete dset;
3926 }
3927 break;
3928
3929 case kPROOF_DATA_READY:
3930 {
3931 PDB(kGlobal,2) Info("HandleInputMessage", "kPROOF_DATA_READY: enter");
3932 Bool_t dataready = kFALSE;
3933 Long64_t totalbytes, bytesready;
3934 (*mess) >> dataready >> totalbytes >> bytesready;
3935 fTotalBytes += totalbytes;
3936 fBytesReady += bytesready;
3937 if (dataready == kFALSE) fDataReady = dataready;
3938 }
3939 break;
3940
3941 case kPROOF_PING:
3942 // do nothing (ping is already acknowledged)
3943 break;
3944
3945 case kPROOF_MESSAGE:
3946 {
3947 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_MESSAGE: enter");
3948
3949 // We have received the unique tag and save it as name of this object
3950 TString msg;
3951 (*mess) >> msg;
3952 Bool_t lfeed = kTRUE;
3953 if ((mess->BufferSize() > mess->Length()))
3954 (*mess) >> lfeed;
3955
3957
3958 if (fSync) {
3959 // Notify locally
3960 fprintf(stderr,"%s%c", msg.Data(), (lfeed ? '\n' : '\r'));
3961 } else {
3962 // Notify locally taking care of redirection, windows logs, ...
3963 NotifyLogMsg(msg, (lfeed ? "\n" : "\r"));
3964 }
3965 } else {
3966
3967 // The message is logged for debugging purposes.
3968 fprintf(stderr,"%s%c", msg.Data(), (lfeed ? '\n' : '\r'));
3969 if (gProofServ) {
3970 // We hide it during normal operations
3972
3973 // And send the message one level up
3974 gProofServ->SendAsynMessage(msg, lfeed);
3975 }
3976 }
3977 }
3978 break;
3979
3981 {
3982 TString vac;
3983 (*mess) >> vac;
3984 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_VERSARCHCOMP: %s", vac.Data());
3985 Int_t from = 0;
3986 TString vers, archcomp;
3987 if (vac.Tokenize(vers, from, "|"))
3988 vac.Tokenize(archcomp, from, "|");
3989 sl->SetArchCompiler(archcomp);
3990 vers.ReplaceAll(":","|");
3991 sl->SetROOTVersion(vers);
3992 }
3993 break;
3994
3995 default:
3996 {
3997 Error("HandleInputMessage", "unknown command received from '%s' (what = %d)",
3998 sl->GetOrdinal(), what);
3999 }
4000 break;
4001 }
4002
4003 // Cleanup
4004 if (delete_mess)
4005 delete mess;
4006
4007 // We are done successfully
4008 return rc;
4009}
4010
4011////////////////////////////////////////////////////////////////////////////////
4012/// Process a message of type kPROOF_SUBMERGER
4013
4015{
4016 // Message sub-type
4017 Int_t type = 0;
4018 (*mess) >> type;
4019 TSocket *s = sl->GetSocket();
4020
4021 switch (type) {
4022 case kOutputSent:
4023 {
4024 if (IsEndMaster()) {
4025 Int_t merger_id = -1;
4026 (*mess) >> merger_id;
4027
4028 PDB(kSubmerger, 2)
4029 Info("HandleSubmerger", "kOutputSent: Worker %s:%d:%s had sent its output to merger #%d",
4030 sl->GetName(), sl->GetPort(), sl->GetOrdinal(), merger_id);
4031
4032 if (!fMergers || fMergers->GetSize() <= merger_id) {
4033 Error("HandleSubmerger", "kOutputSize: #%d not in list ", merger_id);
4034 break;
4035 }
4036 TMergerInfo * mi = (TMergerInfo *) fMergers->At(merger_id);
4037 mi->SetMergedWorker();
4038 if (mi->AreAllWorkersMerged()) {
4039 mi->Deactivate();
4040 if (GetActiveMergersCount() == 0) {
4041 fMergers->Clear();
4042 delete fMergers;
4044 fMergersCount = -1;
4046 PDB(kSubmerger, 2) Info("HandleSubmerger", "all mergers removed ... ");
4047 }
4048 }
4049 } else {
4050 PDB(kSubmerger, 2) Error("HandleSubmerger","kOutputSent: received not on endmaster!");
4051 }
4052 }
4053 break;
4054
4055 case kMergerDown:
4056 {
4057 Int_t merger_id = -1;
4058 (*mess) >> merger_id;
4059
4060 PDB(kSubmerger, 2) Info("HandleSubmerger", "kMergerDown: #%d ", merger_id);
4061
4062 if (!fMergers || fMergers->GetSize() <= merger_id) {
4063 Error("HandleSubmerger", "kMergerDown: #%d not in list ", merger_id);
4064 break;
4065 }
4066
4067 TMergerInfo * mi = (TMergerInfo *) fMergers->At(merger_id);
4068 if (!mi->IsActive()) {
4069 break;
4070 } else {
4071 mi->Deactivate();
4072 }
4073
4074 // Stop the invalid merger in the case it is still listening
4076 stop << Int_t(kStopMerging);
4077 stop << 0;
4078 s->Send(stop);
4079
4080 // Ask for results from merger (only original results from this node as worker are returned)
4081 AskForOutput(mi->GetMerger());
4082
4083 // Ask for results from all workers assigned to this merger
4084 TIter nxo(mi->GetWorkers());
4085 TObject * o = 0;
4086 while ((o = nxo())) {
4087 AskForOutput((TSlave *)o);
4088 }
4089 PDB(kSubmerger, 2) Info("HandleSubmerger", "kMergerDown:%d: exit", merger_id);
4090 }
4091 break;
4092
4093 case kOutputSize:
4094 {
4095 if (IsEndMaster()) {
4096 PDB(kSubmerger, 2)
4097 Info("HandleSubmerger", "worker %s reported as finished ", sl->GetOrdinal());
4098
4099 const char *prefix = gProofServ ? gProofServ->GetPrefix() : "Lite-0";
4100 if (!fFinalizationRunning) {
4101 Info("HandleSubmerger", "finalization on %s started ...", prefix);
4103 }
4104
4105 Int_t output_size = 0;
4106 Int_t merging_port = 0;
4107 (*mess) >> output_size >> merging_port;
4108
4109 PDB(kSubmerger, 2) Info("HandleSubmerger",
4110 "kOutputSize: Worker %s:%d:%s reports %d output objects (+ available port %d)",
4111 sl->GetName(), sl->GetPort(), sl->GetOrdinal(), output_size, merging_port);
4112 TString msg;
4113 if (!fMergersSet) {
4114
4116
4117 // First pass - setting number of mergers according to user or dynamically
4118 fMergersCount = -1; // No mergers used if not set by user
4119 TParameter<Int_t> *mc = dynamic_cast<TParameter<Int_t> *>(GetParameter("PROOF_UseMergers"));
4120 if (mc) fMergersCount = mc->GetVal(); // Value set by user
4121 TParameter<Int_t> *mh = dynamic_cast<TParameter<Int_t> *>(GetParameter("PROOF_MergersByHost"));
4122 if (mh) fMergersByHost = (mh->GetVal() != 0) ? kTRUE : kFALSE; // Assign submergers by hostname
4123
4124 // Mergers count specified by user but not valid
4125 if (fMergersCount < 0 || (fMergersCount > (activeWorkers/2) )) {
4126 msg.Form("%s: Invalid request: cannot start %d mergers for %d workers",
4127 prefix, fMergersCount, activeWorkers);
4128 if (gProofServ)
4130 else
4131 Printf("%s",msg.Data());
4132 fMergersCount = 0;
4133 }
4134 // Mergers count will be set dynamically
4135 if ((fMergersCount == 0) && (!fMergersByHost)) {
4136 if (activeWorkers > 1) {
4137 fMergersCount = TMath::Nint(TMath::Sqrt(activeWorkers));
4138 if (activeWorkers / fMergersCount < 2)
4139 fMergersCount = (Int_t) TMath::Sqrt(activeWorkers);
4140 }
4141 if (fMergersCount > 1)
4142 msg.Form("%s: Number of mergers set dynamically to %d (for %d workers)",
4143 prefix, fMergersCount, activeWorkers);
4144 else {
4145 msg.Form("%s: No mergers will be used for %d workers",
4146 prefix, activeWorkers);
4147 fMergersCount = -1;
4148 }
4149 if (gProofServ)
4151 else
4152 Printf("%s",msg.Data());
4153 } else if (fMergersByHost) {
4154 // We force mergers at host level to minimize network traffic
4155 if (activeWorkers > 1) {
4156 fMergersCount = 0;
4157 THashList hosts;
4158 TIter nxwk(fSlaves);
4159 TObject *wrk = 0;
4160 while ((wrk = nxwk())) {
4161 if (!hosts.FindObject(wrk->GetName())) {
4162 hosts.Add(new TObjString(wrk->GetName()));
4163 fMergersCount++;
4164 }
4165 }
4166 }
4167 if (fMergersCount > 1)
4168 msg.Form("%s: Number of mergers set to %d (for %d workers), one for each slave host",
4169 prefix, fMergersCount, activeWorkers);
4170 else {
4171 msg.Form("%s: No mergers will be used for %d workers",
4172 prefix, activeWorkers);
4173 fMergersCount = -1;
4174 }
4175 if (gProofServ)
4177 else
4178 Printf("%s",msg.Data());
4179 } else {
4180 msg.Form("%s: Number of mergers set by user to %d (for %d workers)",
4181 prefix, fMergersCount, activeWorkers);
4182 if (gProofServ)
4184 else
4185 Printf("%s",msg.Data());
4186 }
4187
4188 // We started merging; we call it here because fMergersCount is still the original number
4189 // and can be saved internally
4191
4192 // Update merger counters (new workers are not yet active)
4194
4195 if (fMergersCount > 0) {
4196
4197 fMergers = new TList();
4199 // Total number of workers, which will not act as mergers ('pure workers')
4200 fWorkersToMerge = (activeWorkers - fMergersCount);
4201 // Establish the first merger
4202 if (!CreateMerger(sl, merging_port)) {
4203 // Cannot establish first merger
4204 AskForOutput(sl);
4206 fMergersCount--;
4207 }
4209 } else {
4210 AskForOutput(sl);
4211 }
4213 } else {
4214 // Multiple pass
4215 if (fMergersCount == -1) {
4216 // No mergers. Workers send their outputs directly to master
4217 AskForOutput(sl);
4218 } else {
4219 if ((fRedirectNext > 0 ) && (!fMergersByHost)) {
4220 RedirectWorker(s, sl, output_size);
4221 fRedirectNext--;
4222 } else {
4223 Bool_t newMerger = kTRUE;
4224 if (fMergersByHost) {
4225 TIter nxmg(fMergers);
4226 TMergerInfo *mgi = 0;
4227 while ((mgi = (TMergerInfo *) nxmg())) {
4228 if (!strcmp(sl->GetName(), mgi->GetMerger()->GetName())) {
4229 newMerger = kFALSE;
4230 break;
4231 }
4232 }
4233 }
4234 if ((fMergersCount > fMergers->GetSize()) && newMerger) {
4235 // Still not enough mergers established
4236 if (!CreateMerger(sl, merging_port)) {
4237 // Cannot establish a merger
4238 AskForOutput(sl);
4240 fMergersCount--;
4241 }
4242 } else
4243 RedirectWorker(s, sl, output_size);
4244 }
4245 }
4246 }
4247 } else {
4248 Error("HandleSubMerger","kOutputSize received not on endmaster!");
4249 }
4250 }
4251 break;
4252 }
4253}
4254
4255////////////////////////////////////////////////////////////////////////////////
4256/// Redirect output of worker sl to some merger
4257
4259{
4260 Int_t merger_id = -1;
4261
4262 if (fMergersByHost) {
4263 for (Int_t i = 0; i < fMergers->GetSize(); i++) {
4264 TMergerInfo *mgi = (TMergerInfo *)fMergers->At(i);
4265 if (!strcmp(sl->GetName(), mgi->GetMerger()->GetName())) {
4266 merger_id = i;
4267 break;
4268 }
4269 }
4270 } else {
4271 merger_id = FindNextFreeMerger();
4272 }
4273
4274 if (merger_id == -1) {
4275 // No free merger (probably it had crashed before)
4276 AskForOutput(sl);
4277 } else {
4278 TMessage sendoutput(kPROOF_SUBMERGER);
4279 sendoutput << Int_t(kSendOutput);
4280 PDB(kSubmerger, 2)
4281 Info("RedirectWorker", "redirecting worker %s to merger %d", sl->GetOrdinal(), merger_id);
4282
4283 PDB(kSubmerger, 2) Info("RedirectWorker", "redirecting output to merger #%d", merger_id);
4284 if (!fMergers || fMergers->GetSize() <= merger_id) {
4285 Error("RedirectWorker", "#%d not in list ", merger_id);
4286 return;
4287 }
4288 TMergerInfo * mi = (TMergerInfo *) fMergers->At(merger_id);
4289
4290 TString hname = (IsLite()) ? "localhost" : mi->GetMerger()->GetName();
4291 sendoutput << merger_id;
4292 sendoutput << hname;
4293 sendoutput << mi->GetPort();
4294 s->Send(sendoutput);
4295 mi->AddMergedObjects(output_size);
4296 mi->AddWorker(sl);
4297 }
4298}
4299
4300////////////////////////////////////////////////////////////////////////////////
4301/// Return a merger, which is both active and still accepts some workers to be
4302/// assigned to it. It works on the 'round-robin' basis.
4303
4305{
4306 while (fLastAssignedMerger < fMergers->GetSize() &&
4307 (!((TMergerInfo*)fMergers->At(fLastAssignedMerger))->IsActive() ||
4308 ((TMergerInfo*)fMergers->At(fLastAssignedMerger))->AreAllWorkersAssigned())) {
4310 }
4311
4314 } else {
4315 return fLastAssignedMerger++;
4316 }
4317
4318 while (fLastAssignedMerger < fMergers->GetSize() &&
4319 (!((TMergerInfo*)fMergers->At(fLastAssignedMerger))->IsActive() ||
4320 ((TMergerInfo*)fMergers->At(fLastAssignedMerger))->AreAllWorkersAssigned())) {
4322 }
4323
4325 return -1;
4326 } else {
4327 return fLastAssignedMerger++;
4328 }
4329}
4330
4331////////////////////////////////////////////////////////////////////////////////
4332/// Master asks for output from worker sl
4333
4335{
4336 TMessage sendoutput(kPROOF_SUBMERGER);
4337 sendoutput << Int_t(kSendOutput);
4338
4339 PDB(kSubmerger, 2) Info("AskForOutput",
4340 "worker %s was asked to send its output to master",
4341 sl->GetOrdinal());
4342
4343 sendoutput << -1;
4344 sendoutput << TString("master");
4345 sendoutput << -1;
4346 sl->GetSocket()->Send(sendoutput);
4348}
4349
4350////////////////////////////////////////////////////////////////////////////////
4351/// Final update of the progress dialog
4352
4354{
4355 if (!fPlayer) return;
4356
4357 // Handle abort ...
4359 if (fSync)
4360 Info("UpdateDialog",
4361 "processing was aborted - %lld events processed",
4363
4364 if (GetRemoteProtocol() > 11) {
4365 // New format
4366 Progress(-1, fPlayer->GetEventsProcessed(), -1, -1., -1., -1., -1.);
4367 } else {
4369 }
4370 Emit("StopProcess(Bool_t)", kTRUE);
4371 }
4372
4373 // Handle stop ...
4375 if (fSync)
4376 Info("UpdateDialog",
4377 "processing was stopped - %lld events processed",
4379
4380 if (GetRemoteProtocol() > 25) {
4381 // New format
4382 Progress(-1, fPlayer->GetEventsProcessed(), -1, -1., -1., -1., -1., -1, -1, -1.);
4383 } else if (GetRemoteProtocol() > 11) {
4384 Progress(-1, fPlayer->GetEventsProcessed(), -1, -1., -1., -1., -1.);
4385 } else {
4387 }
4388 Emit("StopProcess(Bool_t)", kFALSE);
4389 }
4390
4391 // Final update of the dialog box
4392 if (GetRemoteProtocol() > 25) {
4393 // New format
4394 EmitVA("Progress(Long64_t,Long64_t,Long64_t,Float_t,Float_t,Float_t,Float_t,Int_t,Int_t,Float_t)",
4395 10, (Long64_t)(-1), (Long64_t)(-1), (Long64_t)(-1),(Float_t)(-1.),(Float_t)(-1.),
4396 (Float_t)(-1.),(Float_t)(-1.),(Int_t)(-1),(Int_t)(-1),(Float_t)(-1.));
4397 } else if (GetRemoteProtocol() > 11) {
4398 // New format
4399 EmitVA("Progress(Long64_t,Long64_t,Long64_t,Float_t,Float_t,Float_t,Float_t)",
4400 7, (Long64_t)(-1), (Long64_t)(-1), (Long64_t)(-1),
4401 (Float_t)(-1.),(Float_t)(-1.),(Float_t)(-1.),(Float_t)(-1.));
4402 } else {
4403 EmitVA("Progress(Long64_t,Long64_t)", 2, (Long64_t)(-1), (Long64_t)(-1));
4404 }
4405}
4406
4407////////////////////////////////////////////////////////////////////////////////
4408/// Activate the a-sync input handler.
4409
4411{
4412 TIter next(fSlaves);
4413 TSlave *sl;
4414
4415 while ((sl = (TSlave*) next()))
4416 if (sl->GetInputHandler())
4417 sl->GetInputHandler()->Add();
4418}
4419
4420////////////////////////////////////////////////////////////////////////////////
4421/// De-activate a-sync input handler.
4422
4424{
4425 TIter next(fSlaves);
4426 TSlave *sl;
4427
4428 while ((sl = (TSlave*) next()))
4429 if (sl->GetInputHandler())
4430 sl->GetInputHandler()->Remove();
4431}
4432
4433////////////////////////////////////////////////////////////////////////////////
4434/// Get the active mergers count
4435
4437{
4438 if (!fMergers) return 0;
4439
4440 Int_t active_mergers = 0;
4441
4442 TIter mergers(fMergers);
4443 TMergerInfo *mi = 0;
4444 while ((mi = (TMergerInfo *)mergers())) {
4445 if (mi->IsActive()) active_mergers++;
4446 }
4447
4448 return active_mergers;
4449}
4450
4451////////////////////////////////////////////////////////////////////////////////
4452/// Create a new merger
4453
4455{
4456 PDB(kSubmerger, 2)
4457 Info("CreateMerger", "worker %s will be merger ", sl->GetOrdinal());
4458
4459 PDB(kSubmerger, 2) Info("CreateMerger","Begin");
4460
4461 if (port <= 0) {
4462 PDB(kSubmerger,2)
4463 Info("CreateMerger", "cannot create merger on port %d - exit", port);
4464 return kFALSE;
4465 }
4466
4467 Int_t workers = -1;
4468 if (!fMergersByHost) {
4469 Int_t mergersToCreate = fMergersCount - fMergers->GetSize();
4470 // Number of pure workers, which are not simply divisible by mergers
4471 Int_t rest = fWorkersToMerge % mergersToCreate;
4472 // We add one more worker for each of the first 'rest' mergers being established
4473 if (rest > 0 && fMergers->GetSize() < rest) {
4474 rest = 1;
4475 } else {
4476 rest = 0;
4477 }
4478 workers = (fWorkersToMerge / mergersToCreate) + rest;
4479 } else {
4480 Int_t workersOnHost = 0;
4481 for (Int_t i = 0; i < fActiveSlaves->GetSize(); i++) {
4482 if(!strcmp(sl->GetName(), fActiveSlaves->At(i)->GetName())) workersOnHost++;
4483 }
4484 workers = workersOnHost - 1;
4485 }
4486
4487 TString msg;
4488 msg.Form("worker %s on host %s will be merger for %d additional workers", sl->GetOrdinal(), sl->GetName(), workers);
4489
4490 if (gProofServ) {
4492 } else {
4493 Printf("%s",msg.Data());
4494 }
4495 TMergerInfo * merger = new TMergerInfo(sl, port, workers);
4496
4497 TMessage bemerger(kPROOF_SUBMERGER);
4498 bemerger << Int_t(kBeMerger);
4499 bemerger << fMergers->GetSize();
4500 bemerger << workers;
4501 sl->GetSocket()->Send(bemerger);
4502
4503 PDB(kSubmerger,2) Info("CreateMerger",
4504 "merger #%d (port: %d) for %d workers started",
4505 fMergers->GetSize(), port, workers);
4506
4507 fMergers->Add(merger);
4508 fWorkersToMerge = fWorkersToMerge - workers;
4509
4510 fRedirectNext = workers / 2;
4511
4512 PDB(kSubmerger, 2) Info("CreateMerger", "exit");
4513 return kTRUE;
4514}
4515
4516////////////////////////////////////////////////////////////////////////////////
4517/// Add a bad slave server to the bad slave list and remove it from
4518/// the active list and from the two monitor objects. Assume that the work
4519/// done by this worker was lost and ask packerizer to reassign it.
4520
4521void TProof::MarkBad(TSlave *wrk, const char *reason)
4522{
4523 std::lock_guard<std::recursive_mutex> lock(fCloseMutex);
4524
4525 // We may have been invalidated in the meanwhile: nothing to do in such a case
4526 if (!IsValid()) return;
4527
4528 if (!wrk) {
4529 Error("MarkBad", "worker instance undefined: protocol error? ");
4530 return;
4531 }
4532
4533 // Local URL
4534 static TString thisurl;
4535 if (thisurl.IsNull()) {
4536 if (IsMaster()) {
4537 Int_t port = gEnv->GetValue("ProofServ.XpdPort",-1);
4538 thisurl = TUrl(gSystem->HostName()).GetHostFQDN();
4539 if (port > 0) thisurl += TString::Format(":%d", port);
4540 } else {
4541 thisurl.Form("%s@%s:%d", fUrl.GetUser(), fUrl.GetHost(), fUrl.GetPort());
4542 }
4543 }
4544
4545 if (!reason || (strcmp(reason, kPROOF_TerminateWorker) && strcmp(reason, kPROOF_WorkerIdleTO))) {
4546 // Message for notification
4547 const char *mastertype = (gProofServ && gProofServ->IsTopMaster()) ? "top master" : "master";
4548 TString src = IsMaster() ? Form("%s at %s", mastertype, thisurl.Data()) : "local session";
4549 TString msg;
4550 msg.Form("\n +++ Message from %s : marking %s:%d (%s) as bad\n +++ Reason: %s",
4551 src.Data(), wrk->GetName(), wrk->GetPort(), wrk->GetOrdinal(),
4552 (reason && strlen(reason)) ? reason : "unknown");
4553 Info("MarkBad", "%s", msg.Data());
4554 // Notify one level up, if the case
4555 // Add some hint for diagnostics
4556 if (gProofServ) {
4557 msg += TString::Format("\n\n +++ Most likely your code crashed on worker %s at %s:%d.\n",
4558 wrk->GetOrdinal(), wrk->GetName(), wrk->GetPort());
4559 } else {
4560 msg += TString::Format("\n\n +++ Most likely your code crashed\n");
4561 }
4562 msg += TString::Format(" +++ Please check the session logs for error messages either using\n");
4563 msg += TString::Format(" +++ the 'Show logs' button or executing\n");
4564 msg += TString::Format(" +++\n");
4565 if (gProofServ) {
4566 msg += TString::Format(" +++ root [] TProof::Mgr(\"%s\")->GetSessionLogs()->"
4567 "Display(\"%s\",0)\n\n", thisurl.Data(), wrk->GetOrdinal());
4569 } else {
4570 msg += TString::Format(" +++ root [] TProof::Mgr(\"%s\")->GetSessionLogs()->"
4571 "Display(\"*\")\n\n", thisurl.Data());
4572 Printf("%s", msg.Data());
4573 }
4574 } else if (reason) {
4575 if (gDebug > 0 && strcmp(reason, kPROOF_WorkerIdleTO)) {
4576 Info("MarkBad", "worker %s at %s:%d asked to terminate",
4577 wrk->GetOrdinal(), wrk->GetName(), wrk->GetPort());
4578 }
4579 }
4580
4581 if (IsMaster() && reason) {
4582 if (strcmp(reason, kPROOF_TerminateWorker)) {
4583 // if the reason was not a planned termination
4584 TList *listOfMissingFiles = 0;
4585 if (!(listOfMissingFiles = (TList *)GetOutput("MissingFiles"))) {
4586 listOfMissingFiles = new TList();
4587 listOfMissingFiles->SetName("MissingFiles");
4588 if (fPlayer)
4589 fPlayer->AddOutputObject(listOfMissingFiles);
4590 }
4591 // If a query is being processed, assume that the work done by
4592 // the worker was lost and needs to be reassigned.
4593 TVirtualPacketizer *packetizer = fPlayer ? fPlayer->GetPacketizer() : 0;
4594 if (packetizer) {
4595 // the worker was lost so do resubmit the packets
4596 packetizer->MarkBad(wrk, 0, &listOfMissingFiles);
4597 }
4598 } else {
4599 // Tell the coordinator that we are gone
4600 if (gProofServ) {
4601 TString ord(wrk->GetOrdinal());
4602 Int_t id = ord.Last('.');
4603 if (id != kNPOS) ord.Remove(0, id+1);
4605 }
4606 }
4607 } else if (TestBit(TProof::kIsClient) && reason && !strcmp(reason, kPROOF_WorkerIdleTO)) {
4608 // We are invalid after this
4609 fValid = kFALSE;
4610 }
4611
4612 fActiveSlaves->Remove(wrk);
4614
4615 fAllMonitor->Remove(wrk->GetSocket());
4617
4619
4620 if (IsMaster()) {
4621 if (reason && !strcmp(reason, kPROOF_TerminateWorker)) {
4622 // if the reason was a planned termination then delete the worker and
4623 // remove it from all the lists
4624 fSlaves->Remove(wrk);
4625 fBadSlaves->Remove(wrk);
4626 fActiveSlaves->Remove(wrk);
4627 fInactiveSlaves->Remove(wrk);
4628 fUniqueSlaves->Remove(wrk);
4631
4632 // we add it to the list of terminated slave infos instead, so that it
4633 // stays available in the .workers persistent file
4634 TSlaveInfo *si = new TSlaveInfo(
4635 wrk->GetOrdinal(),
4636 Form("%s@%s:%d", wrk->GetUser(), wrk->GetName(), wrk->GetPort()),
4637 0, "", wrk->GetWorkDir());
4639 else delete si;
4640
4641 delete wrk;
4642 } else {
4643 fBadSlaves->Add(wrk);
4644 fActiveSlaves->Remove(wrk);
4645 fUniqueSlaves->Remove(wrk);
4649 wrk->Close();
4650 // Update the mergers count, if needed
4651 if (fMergersSet) {
4652 Int_t mergersCount = -1;
4653 TParameter<Int_t> *mc = dynamic_cast<TParameter<Int_t> *>(GetParameter("PROOF_UseMergers"));
4654 if (mc) mergersCount = mc->GetVal(); // Value set by user
4655 // Mergers count is set dynamically: recalculate it
4656 if (mergersCount == 0) {
4658 if (activeWorkers > 1) {
4659 fMergersCount = TMath::Nint(TMath::Sqrt(activeWorkers));
4660 if (activeWorkers / fMergersCount < 2)
4661 fMergersCount = (Int_t) TMath::Sqrt(activeWorkers);
4662 }
4663 }
4664 }
4665 }
4666
4667 // Update session workers files
4669 } else {
4670 // On clients the proof session should be removed from the lists
4671 // and deleted, since it is not valid anymore
4672 fSlaves->Remove(wrk);
4673 if (fManager)
4674 fManager->DiscardSession(this);
4675 }
4676}
4677
4678////////////////////////////////////////////////////////////////////////////////
4679/// Add slave with socket s to the bad slave list and remove if from
4680/// the active list and from the two monitor objects.
4681
4682void TProof::MarkBad(TSocket *s, const char *reason)
4683{
4684 std::lock_guard<std::recursive_mutex> lock(fCloseMutex);
4685
4686 // We may have been invalidated in the meanwhile: nothing to do in such a case
4687 if (!IsValid()) return;
4688
4689 TSlave *wrk = FindSlave(s);
4690 MarkBad(wrk, reason);
4691}
4692
4693////////////////////////////////////////////////////////////////////////////////
4694/// Ask an active worker 'wrk' to terminate, i.e. to shutdown
4695
4697{
4698 if (!wrk) {
4699 Warning("TerminateWorker", "worker instance undefined: protocol error? ");
4700 return;
4701 }
4702
4703 // Send stop message
4704 if (wrk->GetSocket() && wrk->GetSocket()->IsValid()) {
4705 TMessage mess(kPROOF_STOP);
4706 wrk->GetSocket()->Send(mess);
4707 } else {
4708 if (gDebug > 0)
4709 Info("TerminateWorker", "connection to worker is already down: cannot"
4710 " send termination message");
4711 }
4712
4713 // This is a bad worker from now on
4715}
4716
4717////////////////////////////////////////////////////////////////////////////////
4718/// Ask an active worker 'ord' to terminate, i.e. to shutdown
4719
4720void TProof::TerminateWorker(const char *ord)
4721{
4722 if (ord && strlen(ord) > 0) {
4723 Bool_t all = (ord[0] == '*') ? kTRUE : kFALSE;
4724 if (IsMaster()) {
4725 TIter nxw(fSlaves);