ROOT
master
Reference Guide
Loading...
Searching...
No Matches
TProof.cxx
Go to the documentation of this file.
1
// @(#)root/proof:$Id: a2a50e759072c37ccbc65ecbcce735a76de86e95 $
2
// Author: Fons Rademakers 13/02/97
3
4
/*************************************************************************
5
* Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
6
* All rights reserved. *
7
* *
8
* For the licensing terms see $ROOTSYS/LICENSE. *
9
* For the list of contributors see $ROOTSYS/README/CREDITS. *
10
*************************************************************************/
11
12
/**
13
\defgroup proof PROOF
14
15
Classes defining the Parallel ROOT Facility, PROOF, a framework for parallel analysis of ROOT TTrees.
16
17
\deprecated
18
We keep PROOF for those who still need it for legacy use cases.
19
PROOF is not developed anymore and receiving only limited support.
20
%ROOT has since a few years moved to RDataFrame and related products as multi-core/multi-processing engines.
21
22
*/
23
24
/**
25
\defgroup proofkernel PROOF kernel Libraries
26
\ingroup proof
27
28
The PROOF kernel libraries (libProof, libProofPlayer, libProofDraw) contain the classes defining
29
the kernel of the PROOF facility, i.e. the protocol and the utilities to steer data processing
30
and handling of results.
31
32
*/
33
34
/** \class TProof
35
\ingroup proofkernel
36
37
This class controls a Parallel ROOT Facility, PROOF, cluster.
38
It fires the worker servers, it keeps track of how many workers are
39
running, it keeps track of the workers running status, it broadcasts
40
messages to all workers, it collects results, etc.
41
42
*/
43
44
#include <stdlib.h>
45
#include <fcntl.h>
46
#include <errno.h>
47
#ifdef WIN32
48
# include <io.h>
49
# include <sys/stat.h>
50
# include <sys/types.h>
51
# include "snprintf.h"
52
#else
53
# include <unistd.h>
54
#endif
55
#include <vector>
56
57
#include "RConfigure.h"
58
#include "
Riostream.h
"
59
#include "Getline.h"
60
#include "
TBrowser.h
"
61
#include "
TChain.h
"
62
#include "
TCondor.h
"
63
#include "
TDSet.h
"
64
#include "
TError.h
"
65
#include "
TEnv.h
"
66
#include "
TEntryList.h
"
67
#include "
TEventList.h
"
68
#include "
TFile.h
"
69
#include "
TFileInfo.h
"
70
#include "
TFTP.h
"
71
#include "
THashList.h
"
72
#include "
TInterpreter.h
"
73
#include "
TKey.h
"
74
#include "
TMap.h
"
75
#include "
TMath.h
"
76
#include "
TMessage.h
"
77
#include "
TMonitor.h
"
78
#include "
TObjArray.h
"
79
#include "
TObjString.h
"
80
#include "
TParameter.h
"
81
#include "
TProof.h
"
82
#include "
TProofNodeInfo.h
"
83
#include "
TProofOutputFile.h
"
84
#include "
TVirtualProofPlayer.h
"
85
#include "
TVirtualPacketizer.h
"
86
#include "
TProofServ.h
"
87
#include "
TPluginManager.h
"
88
#include "
TQueryResult.h
"
89
#include "
TRandom.h
"
90
#include "
TRegexp.h
"
91
#include "
TROOT.h
"
92
#include "
TSlave.h
"
93
#include "
TSocket.h
"
94
#include "
TSortedList.h
"
95
#include "
TSystem.h
"
96
#include "
TTree.h
"
97
#include "
TUrl.h
"
98
#include "
TFileCollection.h
"
99
#include "
TDataSetManager.h
"
100
#include "
TDataSetManagerFile.h
"
101
#include "
TMacro.h
"
102
#include "
TSelector.h
"
103
#include "
TPRegexp.h
"
104
#include "
TPackMgr.h
"
105
106
#include <mutex>
107
108
TProof
*
gProof
= 0;
109
110
// Rotating indicator
111
char
TProofMergePrg::fgCr
[4] = {
'-'
,
'\\'
,
'|'
,
'/'
};
112
113
TList
*
TProof::fgProofEnvList
= 0;
// List of env vars for proofserv
114
TPluginHandler
*
TProof::fgLogViewer
= 0;
// Log viewer handler
115
116
ClassImp
(
TProof
);
117
118
//----- PROOF Interrupt signal handler -----------------------------------------
119
////////////////////////////////////////////////////////////////////////////////
120
/// TProof interrupt handler.
121
122
Bool_t
TProofInterruptHandler::Notify
()
123
{
124
if
(!
fProof
->
IsTty
() ||
fProof
->
GetRemoteProtocol
() < 22) {
125
126
// Cannot ask the user : abort any remote processing
127
fProof
->
StopProcess
(
kTRUE
);
128
129
}
else
{
130
// Real stop or request to switch to asynchronous?
131
const
char
*
a
= 0;
132
if
(
fProof
->
GetRemoteProtocol
() < 22) {
133
a
=
Getline
(
"\nSwitch to asynchronous mode not supported remotely:"
134
"\nEnter S/s to stop, Q/q to quit, any other key to continue: "
);
135
}
else
{
136
a
=
Getline
(
"\nEnter A/a to switch asynchronous, S/s to stop, Q/q to quit,"
137
" any other key to continue: "
);
138
}
139
if
(
a
[0] ==
'Q'
||
a
[0] ==
'S'
||
a
[0] ==
'q'
||
a
[0] ==
's'
) {
140
141
Info
(
"Notify"
,
"Processing interrupt signal ... %c"
,
a
[0]);
142
143
// Stop or abort any remote processing
144
Bool_t
abort = (
a
[0] ==
'Q'
||
a
[0] ==
'q'
) ?
kTRUE
:
kFALSE
;
145
fProof
->
StopProcess
(abort);
146
147
}
else
if
((
a
[0] ==
'A'
||
a
[0] ==
'a'
) &&
fProof
->
GetRemoteProtocol
() >= 22) {
148
// Stop any remote processing
149
fProof
->
GoAsynchronous
();
150
}
151
}
152
153
return
kTRUE
;
154
}
155
156
//----- Input handler for messages from TProofServ -----------------------------
157
////////////////////////////////////////////////////////////////////////////////
158
/// Constructor
159
160
TProofInputHandler::TProofInputHandler
(
TProof
*
p
,
TSocket
*s)
161
:
TFileHandler
(s->GetDescriptor(),1),
162
fSocket(s), fProof(
p
)
163
{
164
}
165
166
////////////////////////////////////////////////////////////////////////////////
167
/// Handle input
168
169
Bool_t
TProofInputHandler::Notify
()
170
{
171
fProof
->
CollectInputFrom
(
fSocket
);
172
return
kTRUE
;
173
}
174
175
176
//------------------------------------------------------------------------------
177
178
ClassImp
(
TSlaveInfo
);
179
180
////////////////////////////////////////////////////////////////////////////////
181
/// Used to sort slaveinfos by ordinal.
182
183
Int_t
TSlaveInfo::Compare
(
const
TObject
*obj)
const
184
{
185
if
(!obj)
return
1;
186
187
const
TSlaveInfo
*
si
=
dynamic_cast<
const
TSlaveInfo
*
>
(obj);
188
189
if
(!
si
)
return
fOrdinal
.
CompareTo
(obj->
GetName
());
190
191
const
char
*
myord
=
GetOrdinal
();
192
const
char
*
otherord
=
si
->GetOrdinal();
193
while
(
myord
&&
otherord
) {
194
Int_t
myval
= atoi(
myord
);
195
Int_t
otherval
= atoi(
otherord
);
196
if
(
myval
<
otherval
)
return
1;
197
if
(
myval
>
otherval
)
return
-1;
198
myord
=
strchr
(
myord
,
'.'
);
199
if
(
myord
)
myord
++;
200
otherord
=
strchr
(
otherord
,
'.'
);
201
if
(
otherord
)
otherord
++;
202
}
203
if
(
myord
)
return
-1;
204
if
(
otherord
)
return
1;
205
return
0;
206
}
207
208
////////////////////////////////////////////////////////////////////////////////
209
/// Used to compare slaveinfos by ordinal.
210
211
Bool_t
TSlaveInfo::IsEqual
(
const
TObject
* obj)
const
212
{
213
if
(!obj)
return
kFALSE
;
214
const
TSlaveInfo
*
si
=
dynamic_cast<
const
TSlaveInfo
*
>
(obj);
215
if
(!
si
)
return
kFALSE
;
216
return
(
strcmp
(
GetOrdinal
(),
si
->GetOrdinal()) == 0);
217
}
218
219
////////////////////////////////////////////////////////////////////////////////
220
/// Print slave info. If opt = "active" print only the active
221
/// slaves, if opt="notactive" print only the not active slaves,
222
/// if opt = "bad" print only the bad slaves, else
223
/// print all slaves.
224
225
void
TSlaveInfo::Print
(
Option_t
*opt)
const
226
{
227
TString
stat =
fStatus
==
kActive
?
"active"
:
228
fStatus
==
kBad
?
"bad"
:
229
"not active"
;
230
231
Bool_t
newfmt
=
kFALSE
;
232
TString
oo
(opt);
233
if
(
oo
.Contains(
"N"
)) {
234
newfmt
=
kTRUE
;
235
oo
.ReplaceAll(
"N"
,
""
);
236
}
237
if
(
oo
==
"active"
&&
fStatus
!=
kActive
)
return
;
238
if
(
oo
==
"notactive"
&&
fStatus
!=
kNotActive
)
return
;
239
if
(
oo
==
"bad"
&&
fStatus
!=
kBad
)
return
;
240
241
if
(
newfmt
) {
242
TString
msd
,
si
,
datadir
;
243
if
(!(
fMsd
.
IsNull
()))
msd
.Form(
"| msd: %s "
,
fMsd
.
Data
());
244
if
(!(
fDataDir
.
IsNull
()))
datadir
.Form(
"| datadir: %s "
,
fDataDir
.
Data
());
245
if
(
fSysInfo
.
fCpus
> 0) {
246
si
.Form(
"| %s, %d cores, %d MB ram"
,
fHostName
.
Data
(),
247
fSysInfo
.
fCpus
,
fSysInfo
.
fPhysRam
);
248
}
else
{
249
si
.Form(
"| %s"
,
fHostName
.
Data
());
250
}
251
Printf
(
"Worker: %9s %s %s%s| %s"
,
fOrdinal
.
Data
(),
si
.Data(),
msd
.Data(),
datadir
.Data(), stat.
Data
());
252
253
}
else
{
254
TString
msd
=
fMsd
.
IsNull
() ?
"<null>"
:
fMsd
.
Data
();
255
256
std::cout <<
"Slave: "
<<
fOrdinal
257
<<
" hostname: "
<<
fHostName
258
<<
" msd: "
<<
msd
259
<<
" perf index: "
<<
fPerfIndex
260
<<
" "
<< stat
261
<< std::endl;
262
}
263
}
264
265
////////////////////////////////////////////////////////////////////////////////
266
/// Setter for fSysInfo
267
268
void
TSlaveInfo::SetSysInfo
(
SysInfo_t
si
)
269
{
270
fSysInfo
.
fOS
=
si
.fOS;
// OS
271
fSysInfo
.
fModel
=
si
.fModel;
// computer model
272
fSysInfo
.
fCpuType
=
si
.fCpuType;
// type of cpu
273
fSysInfo
.
fCpus
=
si
.fCpus;
// number of cpus
274
fSysInfo
.
fCpuSpeed
=
si
.fCpuSpeed;
// cpu speed in MHz
275
fSysInfo
.
fBusSpeed
=
si
.fBusSpeed;
// bus speed in MHz
276
fSysInfo
.
fL2Cache
=
si
.fL2Cache;
// level 2 cache size in KB
277
fSysInfo
.
fPhysRam
=
si
.fPhysRam;
// Physical RAM
278
}
279
280
ClassImp
(
TProof
);
281
282
//------------------------------------------------------------------------------
283
284
////////////////////////////////////////////////////////////////////////////////
285
/// Destructor
286
287
TMergerInfo::~TMergerInfo
()
288
{
289
// Just delete the list, the objects are owned by other list
290
if
(
fWorkers
) {
291
fWorkers
->
SetOwner
(
kFALSE
);
292
SafeDelete
(
fWorkers
);
293
}
294
}
295
////////////////////////////////////////////////////////////////////////////////
296
/// Increase number of already merged workers by 1
297
298
void
TMergerInfo::SetMergedWorker
()
299
{
300
if
(
AreAllWorkersMerged
())
301
Error
(
"SetMergedWorker"
,
"all workers have been already merged before!"
);
302
else
303
fMergedWorkers
++;
304
}
305
306
////////////////////////////////////////////////////////////////////////////////
307
/// Add new worker to the list of workers to be merged by this merger
308
309
void
TMergerInfo::AddWorker
(
TSlave
*
sl
)
310
{
311
if
(!
fWorkers
)
312
fWorkers
=
new
TList
();
313
if
(
fWorkersToMerge
==
fWorkers
->
GetSize
()) {
314
Error
(
"AddWorker"
,
"all workers have been already assigned to this merger"
);
315
return
;
316
}
317
fWorkers
->
Add
(
sl
);
318
}
319
320
////////////////////////////////////////////////////////////////////////////////
321
/// Return if merger has already merged all workers, i.e. if it has finished its merging job
322
323
Bool_t
TMergerInfo::AreAllWorkersMerged
()
324
{
325
return
(
fWorkersToMerge
==
fMergedWorkers
);
326
}
327
328
////////////////////////////////////////////////////////////////////////////////
329
/// Return if the determined number of workers has been already assigned to this merger
330
331
Bool_t
TMergerInfo::AreAllWorkersAssigned
()
332
{
333
if
(!
fWorkers
)
334
return
kFALSE
;
335
336
return
(
fWorkers
->
GetSize
() ==
fWorkersToMerge
);
337
}
338
339
////////////////////////////////////////////////////////////////////////////////
340
/// This a private API function.
341
/// It checks whether the connection string contains a PoD cluster protocol.
342
/// If it does, then the connection string will be changed to reflect
343
/// a real PROOF connection string for a PROOF cluster managed by PoD.
344
/// PoD: http://pod.gsi.de .
345
/// Return -1 if the PoD request failed; return 0 otherwise.
346
347
static
Int_t
PoDCheckUrl
(
TString
*
_cluster
)
348
{
349
if
( !
_cluster
)
350
return
0;
351
352
// trim spaces from both sides of the string
353
*
_cluster
=
_cluster
->Strip(
TString::kBoth
);
354
// PoD protocol string
355
const
TString
pod_prot
(
"pod"
);
356
357
// URL test
358
// TODO: The URL test is to support remote PoD servers (not managed by pod-remote)
359
TUrl
url
(
_cluster
->Data() );
360
if
(
pod_prot
.CompareTo(
url
.GetProtocol(),
TString::kIgnoreCase
) )
361
return
0;
362
363
// PoD cluster is used
364
// call pod-info in a batch mode (-b).
365
// pod-info will find either a local PoD cluster or
366
// a remote one, manged by pod-remote.
367
*
_cluster
=
gSystem
->
GetFromPipe
(
"pod-info -c -b"
);
368
if
( 0 ==
_cluster
->Length() ) {
369
Error
(
"PoDCheckUrl"
,
"PoD server is not running"
);
370
return
-1;
371
}
372
return
0;
373
}
374
375
////////////////////////////////////////////////////////////////////////////////
376
/// Create a PROOF environment. Starting PROOF involves either connecting
377
/// to a master server, which in turn will start a set of slave servers, or
378
/// directly starting as master server (if master = ""). Masterurl is of
379
/// the form: [proof[s]://]host[:port]. Conffile is the name of the config
380
/// file describing the remote PROOF cluster (this argument alows you to
381
/// describe different cluster configurations).
382
/// The default is proof.conf. Confdir is the directory where the config
383
/// file and other PROOF related files are (like motd and noproof files).
384
/// Loglevel is the log level (default = 1). User specified custom config
385
/// files will be first looked for in $HOME/.conffile.
386
387
TProof::TProof
(
const
char
*
masterurl
,
const
char
*
conffile
,
const
char
*
confdir
,
388
Int_t
loglevel
,
const
char
*
alias
,
TProofMgr
*
mgr
)
389
: fUrl(
masterurl
)
390
{
391
// Default initializations
392
InitMembers
();
393
394
// This may be needed during init
395
fManager
=
mgr
;
396
397
// Default server type
398
fServType
=
TProofMgr::kXProofd
;
399
400
// Default query mode
401
fQueryMode
=
kSync
;
402
403
// Parse the main URL, adjusting the missing fields and setting the relevant
404
// bits
405
ResetBit
(
TProof::kIsClient
);
406
ResetBit
(
TProof::kIsMaster
);
407
408
// Protocol and Host
409
if
(!
masterurl
||
strlen
(
masterurl
) <= 0) {
410
fUrl
.
SetProtocol
(
"proof"
);
411
fUrl
.
SetHost
(
"__master__"
);
412
}
else
if
(!(
strstr
(
masterurl
,
"://"
))) {
413
fUrl
.
SetProtocol
(
"proof"
);
414
}
415
// Port
416
if
(
fUrl
.
GetPort
() ==
TUrl
(
" "
).
GetPort
())
417
fUrl
.
SetPort
(
TUrl
(
"proof:// "
).
GetPort
());
418
419
// Make sure to store the FQDN, so to get a solid reference for subsequent checks
420
if
(!
strcmp
(
fUrl
.
GetHost
(),
"__master__"
))
421
fMaster
=
fUrl
.
GetHost
();
422
else
if
(!
strlen
(
fUrl
.
GetHost
()))
423
fMaster
=
gSystem
->
GetHostByName
(
gSystem
->
HostName
()).GetHostName();
424
else
425
fMaster
=
gSystem
->
GetHostByName
(
fUrl
.
GetHost
()).GetHostName();
426
427
// Server type
428
if
(
strlen
(
fUrl
.
GetOptions
()) > 0) {
429
TString
opts
(
fUrl
.
GetOptions
());
430
if
(!(
strncmp
(
fUrl
.
GetOptions
(),
"std"
,3))) {
431
fServType
=
TProofMgr::kProofd
;
432
opts
.Remove(0,3);
433
fUrl
.
SetOptions
(
opts
.Data());
434
}
else
if
(!(
strncmp
(
fUrl
.
GetOptions
(),
"lite"
,4))) {
435
fServType
=
TProofMgr::kProofLite
;
436
opts
.Remove(0,4);
437
fUrl
.
SetOptions
(
opts
.Data());
438
}
439
}
440
441
// Instance type
442
fMasterServ
=
kFALSE
;
443
SetBit
(
TProof::kIsClient
);
444
ResetBit
(
TProof::kIsMaster
);
445
if
(
fMaster
==
"__master__"
) {
446
fMasterServ
=
kTRUE
;
447
ResetBit
(
TProof::kIsClient
);
448
SetBit
(
TProof::kIsMaster
);
449
}
else
if
(
fMaster
==
"prooflite"
) {
450
// Client and master are merged
451
fMasterServ
=
kTRUE
;
452
SetBit
(
TProof::kIsMaster
);
453
}
454
// Flag that we are a client
455
if
(
TestBit
(
TProof::kIsClient
))
456
if
(!
gSystem
->
Getenv
(
"ROOTPROOFCLIENT"
))
gSystem
->
Setenv
(
"ROOTPROOFCLIENT"
,
""
);
457
458
Init
(
masterurl
,
conffile
,
confdir
,
loglevel
,
alias
);
459
460
// If the user was not set, get it from the master
461
if
(
strlen
(
fUrl
.
GetUser
()) <= 0) {
462
TString
usr
,
emsg
;
463
if
(
Exec
(
"gProofServ->GetUser()"
,
"0"
,
kTRUE
) == 0) {
464
TObjString
*os =
fMacroLog
.
GetLineWith
(
"const char"
);
465
if
(os) {
466
Ssiz_t
fst
= os->GetString().First(
'\"'
);
467
Ssiz_t
lst
= os->GetString().Last(
'\"'
);
468
usr
= os->GetString()(
fst
+1,
lst
-
fst
-1);
469
}
else
{
470
emsg
=
"could not find 'const char *' string in macro log"
;
471
}
472
}
else
{
473
emsg
=
"could not retrieve user info"
;
474
}
475
if
(!
emsg
.IsNull()) {
476
// Get user logon name
477
UserGroup_t
*
pw
=
gSystem
->
GetUserInfo
();
478
if
(
pw
) {
479
usr
=
pw
->fUser;
480
delete
pw
;
481
}
482
Warning
(
"TProof"
,
"%s: using local default %s"
,
emsg
.Data(),
usr
.Data());
483
}
484
// Set the user name in the main URL
485
fUrl
.
SetUser
(
usr
.Data());
486
}
487
488
// If called by a manager, make sure it stays in last position
489
// for cleaning
490
if
(
mgr
) {
491
R__LOCKGUARD
(
gROOTMutex
);
492
gROOT
->GetListOfSockets()->Remove(
mgr
);
493
gROOT
->GetListOfSockets()->Add(
mgr
);
494
}
495
496
// Old-style server type: we add this to the list and set the global pointer
497
if
(
IsProofd
() ||
TestBit
(
TProof::kIsMaster
))
498
if
(!
gROOT
->GetListOfProofs()->FindObject(
this
))
499
gROOT
->GetListOfProofs()->Add(
this
);
500
501
// Still needed by the packetizers: needs to be changed
502
gProof
=
this
;
503
}
504
505
////////////////////////////////////////////////////////////////////////////////
506
/// Protected constructor to be used by classes deriving from TProof
507
/// (they have to call Init themselves and override StartSlaves
508
/// appropriately).
509
///
510
/// This constructor simply closes any previous gProof and sets gProof
511
/// to this instance.
512
513
TProof::TProof
() : fUrl(
""
), fServType(
TProofMgr
::kXProofd)
514
{
515
// Default initializations
516
InitMembers
();
517
518
if
(!
gROOT
->GetListOfProofs()->FindObject(
this
))
519
gROOT
->GetListOfProofs()->Add(
this
);
520
521
gProof
=
this
;
522
}
523
524
////////////////////////////////////////////////////////////////////////////////
525
/// Default initializations
526
527
void
TProof::InitMembers
()
528
{
529
fValid
=
kFALSE
;
530
fTty
=
kFALSE
;
531
fRecvMessages
= 0;
532
fSlaveInfo
= 0;
533
fMasterServ
=
kFALSE
;
534
fSendGroupView
=
kFALSE
;
535
fIsPollingWorkers
=
kFALSE
;
536
fLastPollWorkers_s
= -1;
537
fActiveSlaves
= 0;
538
fInactiveSlaves
= 0;
539
fUniqueSlaves
= 0;
540
fAllUniqueSlaves
= 0;
541
fNonUniqueMasters
= 0;
542
fActiveMonitor
= 0;
543
fUniqueMonitor
= 0;
544
fAllUniqueMonitor
= 0;
545
fCurrentMonitor
= 0;
546
fBytesRead
= 0;
547
fRealTime
= 0;
548
fCpuTime
= 0;
549
fIntHandler
= 0;
550
fProgressDialog
= 0;
551
fProgressDialogStarted
=
kFALSE
;
552
SetBit
(
kUseProgressDialog
);
553
fPlayer
= 0;
554
fFeedback
= 0;
555
fChains
= 0;
556
fDSet
= 0;
557
fNotIdle
= 0;
558
fSync
=
kTRUE
;
559
fRunStatus
=
kRunning
;
560
fIsWaiting
=
kFALSE
;
561
fRedirLog
=
kFALSE
;
562
fLogFileW
= 0;
563
fLogFileR
= 0;
564
fLogToWindowOnly
=
kFALSE
;
565
fSaveLogToMacro
=
kFALSE
;
566
fMacroLog
.
SetName
(
"ProofLogMacro"
);
567
568
fWaitingSlaves
= 0;
569
fQueries
= 0;
570
fOtherQueries
= 0;
571
fDrawQueries
= 0;
572
fMaxDrawQueries
= 1;
573
fSeqNum
= 0;
574
575
fSessionID
= -1;
576
fEndMaster
=
kFALSE
;
577
578
fPackMgr
= 0;
579
fEnabledPackagesOnCluster
= 0;
580
581
fInputData
= 0;
582
583
fPrintProgress
= 0;
584
585
fLoadedMacros
= 0;
586
587
fProtocol
= -1;
588
fSlaves
= 0;
589
fTerminatedSlaveInfos
= 0;
590
fBadSlaves
= 0;
591
fAllMonitor
= 0;
592
fDataReady
=
kFALSE
;
593
fBytesReady
= 0;
594
fTotalBytes
= 0;
595
fAvailablePackages
= 0;
596
fEnabledPackages
= 0;
597
fRunningDSets
= 0;
598
599
fCollectTimeout
= -1;
600
601
fManager
= 0;
602
fQueryMode
=
kSync
;
603
fDynamicStartup
=
kFALSE
;
604
605
fMergersSet
=
kFALSE
;
606
fMergersByHost
=
kFALSE
;
607
fMergers
= 0;
608
fMergersCount
= -1;
609
fLastAssignedMerger
= 0;
610
fWorkersToMerge
= 0;
611
fFinalizationRunning
=
kFALSE
;
612
613
fPerfTree
=
""
;
614
615
fWrksOutputReady
= 0;
616
617
fSelector
= 0;
618
619
fPrepTime
= 0.;
620
621
// Check if the user defined a list of environment variables to send over:
622
// include them into the dedicated list
623
if
(
gSystem
->
Getenv
(
"PROOF_ENVVARS"
)) {
624
TString
envs
(
gSystem
->
Getenv
(
"PROOF_ENVVARS"
)),
env
,
envsfound
;
625
Int_t
from = 0;
626
while
(
envs
.Tokenize(
env
, from,
","
)) {
627
if
(!
env
.IsNull()) {
628
if
(!
gSystem
->
Getenv
(
env
)) {
629
Warning
(
"Init"
,
"request for sending over undefined environemnt variable '%s' - ignoring"
,
env
.Data());
630
}
else
{
631
if
(!
envsfound
.IsNull())
envsfound
+=
","
;
632
envsfound
+=
env
;
633
TProof::DelEnvVar
(
env
);
634
TProof::AddEnvVar
(
env
,
gSystem
->
Getenv
(
env
));
635
}
636
}
637
}
638
if
(
envsfound
.IsNull()) {
639
Warning
(
"Init"
,
"none of the requested env variables were found: '%s'"
,
envs
.Data());
640
}
else
{
641
Info
(
"Init"
,
"the following environment variables have been added to the list to be sent to the nodes: '%s'"
,
envsfound
.Data());
642
}
643
}
644
645
// Done
646
return
;
647
}
648
649
////////////////////////////////////////////////////////////////////////////////
650
/// Clean up PROOF environment.
651
652
TProof::~TProof
()
653
{
654
if
(
fChains
) {
655
while
(
TChain
*
chain
=
dynamic_cast<
TChain
*
>
(
fChains
->
First
()) ) {
656
// remove "chain" from list
657
chain
->SetProof(0);
658
RemoveChain
(
chain
);
659
}
660
}
661
662
// remove links to packages enabled on the client
663
if
(
TestBit
(
TProof::kIsClient
)) {
664
// iterate over all packages
665
TList
*
epl
=
fPackMgr
->
GetListOfEnabled
();
666
if
(
epl
) {
667
TIter
nxp
(
epl
);
668
while
(
TObjString
*
pck
= (
TObjString
*)(
nxp
())) {
669
FileStat_t
stat;
670
if
(
gSystem
->
GetPathInfo
(
pck
->String(), stat) == 0) {
671
// check if symlink, if so unlink
672
// NOTE: GetPathInfo() returns 1 in case of symlink that does not point to
673
// existing file or to a directory, but if fIsLink is true the symlink exists
674
if
(stat.
fIsLink
)
675
gSystem
->
Unlink
(
pck
->String());
676
}
677
}
678
epl
->Delete();
679
delete
epl
;
680
}
681
}
682
683
Close
();
684
SafeDelete
(
fIntHandler
);
685
SafeDelete
(
fSlaves
);
686
SafeDelete
(
fActiveSlaves
);
687
SafeDelete
(
fInactiveSlaves
);
688
SafeDelete
(
fUniqueSlaves
);
689
SafeDelete
(
fAllUniqueSlaves
);
690
SafeDelete
(
fNonUniqueMasters
);
691
SafeDelete
(
fTerminatedSlaveInfos
);
692
SafeDelete
(
fBadSlaves
);
693
SafeDelete
(
fAllMonitor
);
694
SafeDelete
(
fActiveMonitor
);
695
SafeDelete
(
fUniqueMonitor
);
696
SafeDelete
(
fAllUniqueMonitor
);
697
SafeDelete
(
fSlaveInfo
);
698
SafeDelete
(
fChains
);
699
SafeDelete
(
fPlayer
);
700
SafeDelete
(
fFeedback
);
701
SafeDelete
(
fWaitingSlaves
);
702
SafeDelete
(
fAvailablePackages
);
703
SafeDelete
(
fEnabledPackages
);
704
SafeDelete
(
fLoadedMacros
);
705
SafeDelete
(
fPackMgr
);
706
SafeDelete
(
fRecvMessages
);
707
SafeDelete
(
fInputData
);
708
SafeDelete
(
fRunningDSets
);
709
SafeDelete
(
fEnabledPackagesOnCluster
);
710
if
(
fWrksOutputReady
) {
711
fWrksOutputReady
->
SetOwner
(
kFALSE
);
712
delete
fWrksOutputReady
;
713
}
714
if
(
fQueries
) {
715
fQueries
->
Delete
();
716
delete
fQueries
;
717
}
718
719
// remove file with redirected logs
720
if
(
TestBit
(
TProof::kIsClient
)) {
721
if
(
fLogFileR
)
722
fclose
(
fLogFileR
);
723
if
(
fLogFileW
)
724
fclose
(
fLogFileW
);
725
if
(
fLogFileName
.
Length
() > 0)
726
gSystem
->
Unlink
(
fLogFileName
);
727
}
728
729
// Remove for the global list
730
gROOT
->GetListOfProofs()->Remove(
this
);
731
// ... and from the manager list
732
if
(
fManager
&&
fManager
->
IsValid
())
733
fManager
->
DiscardSession
(
this
);
734
735
if
(
gProof
&&
gProof
==
this
) {
736
// Set previous as default
737
TIter
pvp
(
gROOT
->GetListOfProofs(),
kIterBackward
);
738
while
((
gProof
= (
TProof
*)
pvp
())) {
739
if
(
gProof
->
InheritsFrom
(
TProof::Class
()))
740
break
;
741
}
742
}
743
744
// For those interested in our destruction ...
745
Emit
(
"~TProof()"
);
746
Emit
(
"CloseWindow()"
);
747
}
748
749
////////////////////////////////////////////////////////////////////////////////
750
/// Start the PROOF environment. Starting PROOF involves either connecting
751
/// to a master server, which in turn will start a set of slave servers, or
752
/// directly starting as master server (if master = ""). For a description
753
/// of the arguments see the TProof ctor. Returns the number of started
754
/// master or slave servers, returns 0 in case of error, in which case
755
/// fValid remains false.
756
757
Int_t
TProof::Init
(
const
char
*,
const
char
*
conffile
,
758
const
char
*
confdir
,
Int_t
loglevel
,
const
char
*
alias
)
759
{
760
R__ASSERT
(
gSystem
);
761
762
fValid
=
kFALSE
;
763
764
// Connected to terminal?
765
fTty
= (
isatty
(0) == 0 ||
isatty
(1) == 0) ?
kFALSE
:
kTRUE
;
766
767
// If in attach mode, options is filled with additional info
768
Bool_t
attach =
kFALSE
;
769
if
(
strlen
(
fUrl
.
GetOptions
()) > 0) {
770
attach =
kTRUE
;
771
// A flag from the GUI
772
TString
opts
=
fUrl
.
GetOptions
();
773
if
(
opts
.Contains(
"GUI"
)) {
774
SetBit
(
TProof::kUsingSessionGui
);
775
opts
.Remove(
opts
.Index(
"GUI"
));
776
fUrl
.
SetOptions
(
opts
);
777
}
778
}
779
780
if
(
TestBit
(
TProof::kIsMaster
)) {
781
// Fill default conf file and conf dir
782
if
(!
conffile
|| !
conffile
[0])
783
fConfFile
=
kPROOF_ConfFile
;
784
if
(!
confdir
|| !
confdir
[0])
785
fConfDir
=
kPROOF_ConfDir
;
786
// The group; the client receives it in the kPROOF_SESSIONTAG message
787
if
(
gProofServ
)
fGroup
=
gProofServ
->
GetGroup
();
788
}
else
{
789
fConfDir
=
confdir
;
790
fConfFile
=
conffile
;
791
}
792
793
// Analysise the conffile field
794
if
(
fConfFile
.
Contains
(
"workers=0"
))
fConfFile
.
ReplaceAll
(
"workers=0"
,
"masteronly"
);
795
ParseConfigField
(
fConfFile
);
796
797
fWorkDir
=
gSystem
->
WorkingDirectory
();
798
fLogLevel
=
loglevel
;
799
fProtocol
=
kPROOF_Protocol
;
800
fSendGroupView
=
kTRUE
;
801
fImage
=
fMasterServ
?
""
:
"<local>"
;
802
fIntHandler
= 0;
803
fStatus
= 0;
804
fRecvMessages
=
new
TList
;
805
fRecvMessages
->
SetOwner
(
kTRUE
);
806
fSlaveInfo
= 0;
807
fChains
=
new
TList
;
808
fAvailablePackages
= 0;
809
fEnabledPackages
= 0;
810
fRunningDSets
= 0;
811
fEndMaster
=
TestBit
(
TProof::kIsMaster
) ?
kTRUE
:
kFALSE
;
812
fInputData
= 0;
813
ResetBit
(
TProof::kNewInputData
);
814
fPrintProgress
= 0;
815
816
// Timeout for some collect actions
817
fCollectTimeout
=
gEnv
->
GetValue
(
"Proof.CollectTimeout"
, -1);
818
819
// Should the workers be started dynamically; default: no
820
fDynamicStartup
=
gEnv
->
GetValue
(
"Proof.DynamicStartup"
,
kFALSE
);
821
822
// Default entry point for the data pool is the master
823
if
(
TestBit
(
TProof::kIsClient
))
824
fDataPoolUrl
.
Form
(
"root://%s"
,
fMaster
.
Data
());
825
else
826
fDataPoolUrl
=
""
;
827
828
fProgressDialog
= 0;
829
fProgressDialogStarted
=
kFALSE
;
830
831
// Default alias is the master name
832
TString
al
= (
alias
) ?
alias
:
fMaster
.
Data
();
833
SetAlias
(
al
);
834
835
// Client logging of messages from the master and slaves
836
fRedirLog
=
kFALSE
;
837
if
(
TestBit
(
TProof::kIsClient
)) {
838
fLogFileName
.
Form
(
"%s/ProofLog_%d"
,
gSystem
->
TempDirectory
(),
gSystem
->
GetPid
());
839
if
((
fLogFileW
=
fopen
(
fLogFileName
,
"w"
)) == 0)
840
Error
(
"Init"
,
"could not create temporary logfile"
);
841
if
((
fLogFileR
=
fopen
(
fLogFileName
,
"r"
)) == 0)
842
Error
(
"Init"
,
"could not open temp logfile for reading"
);
843
}
844
fLogToWindowOnly
=
kFALSE
;
845
846
// Status of cluster
847
fNotIdle
= 0;
848
// Query type
849
fSync
= (attach) ?
kFALSE
:
kTRUE
;
850
// Not enqueued
851
fIsWaiting
=
kFALSE
;
852
853
// Counters
854
fBytesRead
= 0;
855
fRealTime
= 0;
856
fCpuTime
= 0;
857
858
// List of queries
859
fQueries
= 0;
860
fOtherQueries
= 0;
861
fDrawQueries
= 0;
862
fMaxDrawQueries
= 1;
863
fSeqNum
= 0;
864
865
// Remote ID of the session
866
fSessionID
= -1;
867
868
// Part of active query
869
fWaitingSlaves
= 0;
870
871
// Make remote PROOF player
872
fPlayer
= 0;
873
MakePlayer
();
874
875
fFeedback
=
new
TList
;
876
fFeedback
->
SetOwner
();
877
fFeedback
->
SetName
(
"FeedbackList"
);
878
AddInput
(
fFeedback
);
879
880
// sort slaves by descending performance index
881
fSlaves
=
new
TSortedList
(
kSortDescending
);
882
fActiveSlaves
=
new
TList
;
883
fInactiveSlaves
=
new
TList
;
884
fUniqueSlaves
=
new
TList
;
885
fAllUniqueSlaves
=
new
TList
;
886
fNonUniqueMasters
=
new
TList
;
887
fBadSlaves
=
new
TList
;
888
fAllMonitor
=
new
TMonitor
;
889
fActiveMonitor
=
new
TMonitor
;
890
fUniqueMonitor
=
new
TMonitor
;
891
fAllUniqueMonitor
=
new
TMonitor
;
892
fCurrentMonitor
= 0;
893
894
fTerminatedSlaveInfos
=
new
TList
;
895
fTerminatedSlaveInfos
->
SetOwner
(
kTRUE
);
896
897
fLoadedMacros
= 0;
898
fPackMgr
= 0;
899
900
// Enable optimized sending of streamer infos to use embedded backward/forward
901
// compatibility support between different ROOT versions and different versions of
902
// users classes
903
Bool_t
enableSchemaEvolution
=
gEnv
->
GetValue
(
"Proof.SchemaEvolution"
,1);
904
if
(
enableSchemaEvolution
) {
905
TMessage::EnableSchemaEvolutionForAll
();
906
}
else
{
907
Info
(
"TProof"
,
"automatic schema evolution in TMessage explicitly disabled"
);
908
}
909
910
if
(
IsMaster
()) {
911
// to make UploadPackage() method work on the master as well.
912
if
(
gProofServ
)
fPackMgr
=
gProofServ
->
GetPackMgr
();
913
}
else
{
914
915
TString
sandbox
;
916
if
(
GetSandbox
(
sandbox
,
kTRUE
) != 0) {
917
Error
(
"Init"
,
"failure asserting sandbox directory %s"
,
sandbox
.Data());
918
return
0;
919
}
920
921
// Package Dir
922
TString
packdir
=
gEnv
->
GetValue
(
"Proof.PackageDir"
,
""
);
923
if
(
packdir
.IsNull())
924
packdir
.Form(
"%s/%s"
,
sandbox
.Data(),
kPROOF_PackDir
);
925
if
(
AssertPath
(
packdir
,
kTRUE
) != 0) {
926
Error
(
"Init"
,
"failure asserting directory %s"
,
packdir
.Data());
927
return
0;
928
}
929
fPackMgr
=
new
TPackMgr
(
packdir
);
930
if
(
gDebug
> 0)
931
Info
(
"Init"
,
"package directory set to %s"
,
packdir
.Data());
932
}
933
934
if
(!
IsMaster
()) {
935
// List of directories where to look for global packages
936
TString
globpack
=
gEnv
->
GetValue
(
"Proof.GlobalPackageDirs"
,
""
);
937
TProofServ::ResolveKeywords
(
globpack
);
938
Int_t
nglb
=
TPackMgr::RegisterGlobalPath
(
globpack
);
939
if
(
gDebug
> 0)
940
Info
(
"Init"
,
" %d global package directories registered"
,
nglb
);
941
}
942
943
// Master may want dynamic startup
944
if
(
fDynamicStartup
) {
945
if
(!
IsMaster
()) {
946
// If on client - start the master
947
if
(!
StartSlaves
(attach))
948
return
0;
949
}
950
}
else
{
951
952
// Master Only mode (for operations requiring only the master, e.g. dataset browsing,
953
// result retrieving, ...)
954
Bool_t
masterOnly
=
gEnv
->
GetValue
(
"Proof.MasterOnly"
,
kFALSE
);
955
if
(!
IsMaster
() || !
masterOnly
) {
956
// Start slaves (the old, static, per-session way)
957
if
(!
StartSlaves
(attach))
958
return
0;
959
// Client: Is Master in dynamic startup mode?
960
if
(!
IsMaster
()) {
961
Int_t
dyn
= 0;
962
GetRC
(
"Proof.DynamicStartup"
,
dyn
);
963
if
(
dyn
!= 0)
fDynamicStartup
=
kTRUE
;
964
}
965
}
966
}
967
// we are now properly initialized
968
fValid
=
kTRUE
;
969
970
// De-activate monitor (will be activated in Collect)
971
fAllMonitor
->
DeActivateAll
();
972
973
// By default go into parallel mode
974
Int_t
nwrk
=
GetRemoteProtocol
() > 35 ? -1 : 9999;
975
TNamed
*
n
= 0;
976
if
(
TProof::GetEnvVars
() &&
977
(
n
= (
TNamed
*)
TProof::GetEnvVars
()->
FindObject
(
"PROOF_NWORKERS"
))) {
978
TString
s(
n
->GetTitle());
979
if
(s.
IsDigit
())
nwrk
= s.
Atoi
();
980
}
981
GoParallel
(
nwrk
, attach);
982
983
// Send relevant initial state to slaves
984
if
(!attach)
985
SendInitialState
();
986
else
if
(!
IsIdle
())
987
// redirect log
988
fRedirLog
=
kTRUE
;
989
990
// Done at this point, the alias will be communicated to the coordinator, if any
991
if
(
TestBit
(
TProof::kIsClient
))
992
SetAlias
(
al
);
993
994
SetActive
(
kFALSE
);
995
996
if
(
IsValid
()) {
997
998
// Activate input handler
999
ActivateAsyncInput
();
1000
1001
R__LOCKGUARD
(
gROOTMutex
);
1002
gROOT
->GetListOfSockets()->Add(
this
);
1003
}
1004
1005
AskParallel
();
1006
1007
return
fActiveSlaves
->
GetSize
();
1008
}
1009
1010
////////////////////////////////////////////////////////////////////////////////
1011
/// Set the sandbox path from ' Proof.Sandbox' or the alternative var 'rc'.
1012
/// Use the existing setting or the default if nothing is found.
1013
/// If 'assert' is kTRUE, make also sure that the path exists.
1014
/// Return 0 on success, -1 on failure
1015
1016
Int_t
TProof::GetSandbox
(
TString
&
sb
,
Bool_t
assert
,
const
char
*
rc
)
1017
{
1018
// Get it from 'rc', if defined
1019
if
(
rc
&&
strlen
(
rc
))
sb
=
gEnv
->
GetValue
(
rc
,
sb
);
1020
// Or use the default 'rc'
1021
if
(
sb
.IsNull())
sb
=
gEnv
->
GetValue
(
"Proof.Sandbox"
,
""
);
1022
// If nothing found , use the default
1023
if
(
sb
.IsNull())
sb
.Form(
"~/%s"
,
kPROOF_WorkDir
);
1024
// Expand special settings
1025
if
(
sb
==
"."
) {
1026
sb
=
gSystem
->
pwd
();
1027
}
else
if
(
sb
==
".."
) {
1028
sb
=
gSystem
->
GetDirName
(
gSystem
->
pwd
());
1029
}
1030
gSystem
->
ExpandPathName
(
sb
);
1031
1032
// Assert the path, if required
1033
if
(
assert
&&
AssertPath
(
sb
,
kTRUE
) != 0)
return
-1;
1034
// Done
1035
return
0;
1036
}
1037
1038
////////////////////////////////////////////////////////////////////////////////
1039
/// The config file field may contain special instructions which need to be
1040
/// parsed at the beginning, e.g. for debug runs with valgrind.
1041
/// Several options can be given separated by a ','
1042
1043
void
TProof::ParseConfigField
(
const
char
*config)
1044
{
1045
TString
sconf
(config), opt;
1046
Ssiz_t
from = 0;
1047
#ifdef R__LINUX
1048
Bool_t
cpuPin
=
kFALSE
;
1049
#endif
1050
1051
// Analysise the field
1052
const
char
*
cq
= (
IsLite
()) ?
"\""
:
""
;
1053
while
(
sconf
.Tokenize(opt, from,
","
)) {
1054
if
(opt.
IsNull
())
continue
;
1055
1056
if
(opt.
BeginsWith
(
"valgrind"
)) {
1057
// Any existing valgrind setting? User can give full settings, which we fully respect,
1058
// or pass additional options for valgrind by prefixing 'valgrind_opts:'. For example,
1059
// TProof::AddEnvVar("PROOF_MASTER_WRAPPERCMD", "valgrind_opts:--time-stamp --leak-check=full"
1060
// will add option "--time-stamp --leak-check=full" to our default options
1061
TString
mst
, top, sub,
wrk
,
all
;
1062
TList
*
envs
=
fgProofEnvList
;
1063
TNamed
*
n
= 0;
1064
if
(
envs
) {
1065
if
((
n
= (
TNamed
*)
envs
->FindObject(
"PROOF_WRAPPERCMD"
)))
1066
all
=
n
->GetTitle();
1067
if
((
n
= (
TNamed
*)
envs
->FindObject(
"PROOF_MASTER_WRAPPERCMD"
)))
1068
mst
=
n
->GetTitle();
1069
if
((
n
= (
TNamed
*)
envs
->FindObject(
"PROOF_TOPMASTER_WRAPPERCMD"
)))
1070
top =
n
->GetTitle();
1071
if
((
n
= (
TNamed
*)
envs
->FindObject(
"PROOF_SUBMASTER_WRAPPERCMD"
)))
1072
sub =
n
->GetTitle();
1073
if
((
n
= (
TNamed
*)
envs
->FindObject(
"PROOF_SLAVE_WRAPPERCMD"
)))
1074
wrk
=
n
->GetTitle();
1075
}
1076
if
(
all
!=
""
&&
mst
==
""
)
mst
=
all
;
1077
if
(
all
!=
""
&& top ==
""
) top =
all
;
1078
if
(
all
!=
""
&& sub ==
""
) sub =
all
;
1079
if
(
all
!=
""
&&
wrk
==
""
)
wrk
=
all
;
1080
if
(
all
!=
""
&&
all
.BeginsWith(
"valgrind_opts:"
)) {
1081
// The field is used to add an option Reset the setting
1082
Info
(
"ParseConfigField"
,
"valgrind run: resetting 'PROOF_WRAPPERCMD':"
1083
" must be set again for next run , if any"
);
1084
TProof::DelEnvVar
(
"PROOF_WRAPPERCMD"
);
1085
}
1086
TString
var,
cmd
;
1087
cmd
.Form(
"%svalgrind -v --suppressions=<rootsys>/etc/valgrind-root.supp"
,
cq
);
1088
TString
mstlab
(
"NO"
),
wrklab
(
"NO"
);
1089
Bool_t
doMaster
= (opt ==
"valgrind"
|| (opt.
Contains
(
"master"
) &&
1090
!opt.
Contains
(
"topmaster"
) && !opt.
Contains
(
"submaster"
)))
1091
?
kTRUE
:
kFALSE
;
1092
if
(
doMaster
) {
1093
if
(!
IsLite
()) {
1094
// Check if we have to add a var
1095
if
(
mst
==
""
||
mst
.BeginsWith(
"valgrind_opts:"
)) {
1096
mst
.ReplaceAll(
"valgrind_opts:"
,
""
);
1097
var.
Form
(
"%s --log-file=<logfilemst>.valgrind.log %s"
,
cmd
.Data(),
mst
.Data());
1098
TProof::AddEnvVar
(
"PROOF_MASTER_WRAPPERCMD"
, var);
1099
mstlab
=
"YES"
;
1100
}
else
if
(
mst
!=
""
) {
1101
mstlab
=
"YES"
;
1102
}
1103
}
else
{
1104
if
(opt.
Contains
(
"master"
)) {
1105
Warning
(
"ParseConfigField"
,
1106
"master valgrinding does not make sense for PROOF-Lite: ignoring"
);
1107
opt.
ReplaceAll
(
"master"
,
""
);
1108
if
(!opt.
Contains
(
"workers"
))
return
;
1109
}
1110
if
(opt ==
"valgrind"
|| opt ==
"valgrind="
) opt =
"valgrind=workers"
;
1111
}
1112
}
1113
if
(opt.
Contains
(
"topmaster"
)) {
1114
// Check if we have to add a var
1115
if
(top ==
""
|| top.
BeginsWith
(
"valgrind_opts:"
)) {
1116
top.
ReplaceAll
(
"valgrind_opts:"
,
""
);
1117
var.
Form
(
"%s --log-file=<logfilemst>.valgrind.log %s"
,
cmd
.Data(), top.
Data
());
1118
TProof::AddEnvVar
(
"PROOF_TOPMASTER_WRAPPERCMD"
, var);
1119
mstlab
=
"YES"
;
1120
}
else
if
(top !=
""
) {
1121
mstlab
=
"YES"
;
1122
}
1123
}
1124
if
(opt.
Contains
(
"submaster"
)) {
1125
// Check if we have to add a var
1126
if
(sub ==
""
|| sub.
BeginsWith
(
"valgrind_opts:"
)) {
1127
sub.
ReplaceAll
(
"valgrind_opts:"
,
""
);
1128
var.
Form
(
"%s --log-file=<logfilemst>.valgrind.log %s"
,
cmd
.Data(), sub.
Data
());
1129
TProof::AddEnvVar
(
"PROOF_SUBMASTER_WRAPPERCMD"
, var);
1130
mstlab
=
"YES"
;
1131
}
else
if
(sub !=
""
) {
1132
mstlab
=
"YES"
;
1133
}
1134
}
1135
if
(opt.
Contains
(
"=workers"
) || opt.
Contains
(
"+workers"
)) {
1136
// Check if we have to add a var
1137
if
(
wrk
==
""
||
wrk
.BeginsWith(
"valgrind_opts:"
)) {
1138
wrk
.ReplaceAll(
"valgrind_opts:"
,
""
);
1139
var.
Form
(
"%s --log-file=<logfilewrk>.__valgrind__.log %s%s"
,
cmd
.Data(),
wrk
.Data(),
cq
);
1140
TProof::AddEnvVar
(
"PROOF_SLAVE_WRAPPERCMD"
, var);
1141
TString
nwrks
(
"2"
);
1142
Int_t
inw
= opt.
Index
(
'#'
);
1143
if
(
inw
!=
kNPOS
) {
1144
nwrks
= opt(
inw
+1, opt.
Length
());
1145
if
(!
nwrks
.IsDigit())
nwrks
=
"2"
;
1146
}
1147
// Set the relevant variables
1148
if
(!
IsLite
()) {
1149
TProof::AddEnvVar
(
"PROOF_NWORKERS"
,
nwrks
);
1150
}
else
{
1151
gEnv
->
SetValue
(
"ProofLite.Workers"
,
nwrks
.Atoi());
1152
}
1153
wrklab
=
nwrks
;
1154
// Register the additional worker log in the session file
1155
// (for the master this is done automatically)
1156
TProof::AddEnvVar
(
"PROOF_ADDITIONALLOG"
,
"__valgrind__.log*"
);
1157
}
else
if
(
wrk
!=
""
) {
1158
wrklab
=
"ALL"
;
1159
}
1160
}
1161
// Increase the relevant timeouts
1162
if
(!
IsLite
()) {
1163
TProof::AddEnvVar
(
"PROOF_INTWAIT"
,
"5000"
);
1164
gEnv
->
SetValue
(
"Proof.SocketActivityTimeout"
, 6000);
1165
}
else
{
1166
gEnv
->
SetValue
(
"ProofLite.StartupTimeOut"
, 5000);
1167
}
1168
// Warn for slowness
1169
Printf
(
" "
);
1170
if
(!
IsLite
()) {
1171
Printf
(
" ---> Starting a debug run with valgrind (master:%s, workers:%s)"
,
mstlab
.Data(),
wrklab
.Data());
1172
}
else
{
1173
Printf
(
" ---> Starting a debug run with valgrind (workers:%s)"
,
wrklab
.Data());
1174
}
1175
Printf
(
" ---> Please be patient: startup may be VERY slow ..."
);
1176
Printf
(
" ---> Logs will be available as special tags in the log window (from the progress dialog or TProof::LogViewer()) "
);
1177
Printf
(
" ---> (Reminder: this debug run makes sense only if you are running a debug version of ROOT)"
);
1178
Printf
(
" "
);
1179
1180
}
else
if
(opt.
BeginsWith
(
"igprof-pp"
)) {
1181
1182
// IgProf profiling on master and worker. PROOF does not set the
1183
// environment for you: proper environment variables (like PATH and
1184
// LD_LIBRARY_PATH) should be set externally
1185
1186
Printf
(
"*** Requested IgProf performance profiling ***"
);
1187
TString
addLogExt
=
"__igprof.pp__.log"
;
1188
TString
addLogFmt
=
"igprof -pk -pp -t proofserv.exe -o %s.%s"
;
1189
TString
tmp;
1190
1191
if
(
IsLite
()) {
1192
addLogFmt
.Append(
"\""
);
1193
addLogFmt
.Prepend(
"\""
);
1194
}
1195
1196
tmp.Form(
addLogFmt
.Data(),
"<logfilemst>"
,
addLogExt
.Data());
1197
TProof::AddEnvVar
(
"PROOF_MASTER_WRAPPERCMD"
, tmp.Data());
1198
1199
tmp.Form(
addLogFmt
.Data(),
"<logfilewrk>"
,
addLogExt
.Data());
1200
TProof::AddEnvVar
(
"PROOF_SLAVE_WRAPPERCMD"
, tmp.Data() );
1201
1202
TProof::AddEnvVar
(
"PROOF_ADDITIONALLOG"
,
addLogExt
.Data());
1203
1204
}
else
if
(opt.
BeginsWith
(
"cpupin="
)) {
1205
// Enable CPU pinning. Takes as argument the list of processor IDs
1206
// that will be used in order. Processor IDs are numbered from 0,
1207
// use likwid to see how they are organized. A possible parameter
1208
// format would be:
1209
//
1210
// cpupin=3+4+0+9+10+22+7
1211
//
1212
// Only the specified processor IDs will be used in a round-robin
1213
// fashion, dealing with the fact that you can request more workers
1214
// than the number of processor IDs you have specified.
1215
//
1216
// To use all available processors in their order:
1217
//
1218
// cpupin=*
1219
1220
opt.
Remove
(0, 7);
1221
1222
// Remove any char which is neither a number nor a plus '+'
1223
for
(
Ssiz_t
i=0; i<opt.
Length
(); i++) {
1224
Char_t
c
= opt[i];
1225
if
((
c
!=
'+'
) && ((
c
<
'0'
) || (
c
>
'9'
)))
1226
opt[i] =
'_'
;
1227
}
1228
opt.
ReplaceAll
(
"_"
,
""
);
1229
TProof::AddEnvVar
(
"PROOF_SLAVE_CPUPIN_ORDER"
, opt);
1230
#ifdef R__LINUX
1231
cpuPin
=
kTRUE
;
1232
#endif
1233
}
else
if
(opt.
BeginsWith
(
"workers="
)) {
1234
1235
// Request for a given number of workers (within the max) or worker
1236
// startup combination:
1237
// workers=5 start max 5 workers (or less, if less are assigned)
1238
// workers=2x start max 2 workers per node (or less, if less are assigned)
1239
opt.
ReplaceAll
(
"workers="
,
""
);
1240
TProof::AddEnvVar
(
"PROOF_NWORKERS"
, opt);
1241
}
1242
}
1243
1244
// In case of PROOF-Lite, enable CPU pinning when requested (Linux only)
1245
#ifdef R__LINUX
1246
if
(
IsLite
() &&
cpuPin
) {
1247
Printf
(
"*** Requested CPU pinning ***"
);
1248
const
TList
*
ev
=
GetEnvVars
();
1249
const
char
*
pinCmd
=
"taskset -c <cpupin>"
;
1250
TString
val;
1251
TNamed
*
p
;
1252
if
(
ev
&& (
p
=
dynamic_cast<
TNamed
*
>
(
ev
->FindObject(
"PROOF_SLAVE_WRAPPERCMD"
)))) {
1253
val =
p
->GetTitle();
1254
val.
Insert
(val.
Length
()-1,
" "
);
1255
val.
Insert
(val.
Length
()-1,
pinCmd
);
1256
}
1257
else
{
1258
val.
Form
(
"\"%s\""
,
pinCmd
);
1259
}
1260
TProof::AddEnvVar
(
"PROOF_SLAVE_WRAPPERCMD"
, val.
Data
());
1261
}
1262
#endif
1263
}
1264
1265
////////////////////////////////////////////////////////////////////////////////
1266
/// Make sure that 'path' exists; if 'writable' is kTRUE, make also sure
1267
/// that the path is writable
1268
1269
Int_t
TProof::AssertPath
(
const
char
*
inpath
,
Bool_t
writable
)
1270
{
1271
if
(!
inpath
||
strlen
(
inpath
) <= 0) {
1272
Error
(
"AssertPath"
,
"undefined input path"
);
1273
return
-1;
1274
}
1275
1276
TString
path(
inpath
);
1277
gSystem
->
ExpandPathName
(path);
1278
1279
if
(
gSystem
->
AccessPathName
(path,
kFileExists
)) {
1280
if
(
gSystem
->
mkdir
(path,
kTRUE
) != 0) {
1281
Error
(
"AssertPath"
,
"could not create path %s"
, path.
Data
());
1282
return
-1;
1283
}
1284
}
1285
// It must be writable
1286
if
(
gSystem
->
AccessPathName
(path,
kWritePermission
) &&
writable
) {
1287
if
(
gSystem
->
Chmod
(path, 0666) != 0) {
1288
Error
(
"AssertPath"
,
"could not make path %s writable"
, path.
Data
());
1289
return
-1;
1290
}
1291
}
1292
1293
// Done
1294
return
0;
1295
}
1296
1297
////////////////////////////////////////////////////////////////////////////////
1298
/// Set manager and schedule its destruction after this for clean
1299
/// operations.
1300
1301
void
TProof::SetManager
(
TProofMgr
*
mgr
)
1302
{
1303
fManager
=
mgr
;
1304
1305
if
(
mgr
) {
1306
R__LOCKGUARD
(
gROOTMutex
);
1307
gROOT
->GetListOfSockets()->Remove(
mgr
);
1308
gROOT
->GetListOfSockets()->Add(
mgr
);
1309
}
1310
}
1311
1312
////////////////////////////////////////////////////////////////////////////////
1313
/// Works on the master node only.
1314
/// It starts workers on the machines in workerList and sets the paths,
1315
/// packages and macros as on the master.
1316
/// It is a subbstitute for StartSlaves(...)
1317
/// The code is mostly the master part of StartSlaves,
1318
/// with the parallel startup removed.
1319
1320
Int_t
TProof::AddWorkers
(
TList
*
workerList
)
1321
{
1322
if
(!
IsMaster
()) {
1323
Error
(
"AddWorkers"
,
"AddWorkers can only be called on the master!"
);
1324
return
-1;
1325
}
1326
1327
if
(!
workerList
|| !(
workerList
->GetSize())) {
1328
Error
(
"AddWorkers"
,
"empty list of workers!"
);
1329
return
-2;
1330
}
1331
1332
// Code taken from master part of StartSlaves with the parllel part removed
1333
1334
fImage
=
gProofServ
->
GetImage
();
1335
if
(
fImage
.
IsNull
())
1336
fImage
.
Form
(
"%s:%s"
,
TUrl
(
gSystem
->
HostName
()).
GetHostFQDN
(),
gProofServ
->
GetWorkDir
());
1337
1338
// Get all workers
1339
UInt_t
nSlaves
=
workerList
->GetSize();
1340
UInt_t
nSlavesDone
= 0;
1341
Int_t
ord
= 0;
1342
1343
// Loop over all new workers and start them (if we had already workers it means we are
1344
// increasing parallelism or that is not the first time we are called)
1345
Bool_t
goMoreParallel
= (
fSlaves
->
GetEntries
() > 0) ?
kTRUE
:
kFALSE
;
1346
1347
// A list of TSlave objects for workers that are being added
1348
TList
*
addedWorkers
=
new
TList
();
1349
if
(!
addedWorkers
) {
1350
// This is needed to silence Coverity ...
1351
Error
(
"AddWorkers"
,
"cannot create new list for the workers to be added"
);
1352
return
-2;
1353
}
1354
addedWorkers
->SetOwner(
kFALSE
);
1355
TListIter
next(
workerList
);
1356
TObject
*to;
1357
TProofNodeInfo
*worker;
1358
TSlaveInfo
*
dummysi
=
new
TSlaveInfo
();
1359
while
((to = next())) {
1360
// Get the next worker from the list
1361
worker = (
TProofNodeInfo
*)to;
1362
1363
// Read back worker node info
1364
const
Char_t
*
image
= worker->
GetImage
().Data();
1365
const
Char_t
*
workdir
= worker->
GetWorkDir
().Data();
1366
Int_t
perfidx
= worker->
GetPerfIndex
();
1367
Int_t
sport
= worker->
GetPort
();
1368
if
(
sport
== -1)
1369
sport
=
fUrl
.
GetPort
();
1370
1371
// Create worker server
1372
TString
fullord
;
1373
if
(worker->
GetOrdinal
().Length() > 0) {
1374
fullord
.Form(
"%s.%s"
,
gProofServ
->
GetOrdinal
(), worker->
GetOrdinal
().Data());
1375
}
else
{
1376
fullord
.Form(
"%s.%d"
,
gProofServ
->
GetOrdinal
(),
ord
);
1377
}
1378
1379
// Remove worker from the list of workers terminated gracefully
1380
dummysi
->SetOrdinal(
fullord
);
1381
TSlaveInfo
*
rmsi
= (
TSlaveInfo
*)
fTerminatedSlaveInfos
->
Remove
(
dummysi
);
1382
SafeDelete
(
rmsi
);
1383
1384
// Create worker server
1385
TString
wn
(worker->
GetNodeName
());
1386
if
(
wn
==
"localhost"
||
wn
.BeginsWith(
"localhost."
))
wn
=
gSystem
->
HostName
();
1387
TUrl
u
(
TString::Format
(
"%s:%d"
,
wn
.Data(),
sport
));
1388
// Add group info in the password firdl, if any
1389
if
(
strlen
(
gProofServ
->
GetGroup
()) > 0) {
1390
// Set also the user, otherwise the password is not exported
1391
if
(
strlen
(
u
.GetUser()) <= 0)
1392
u
.SetUser(
gProofServ
->
GetUser
());
1393
u
.SetPasswd(
gProofServ
->
GetGroup
());
1394
}
1395
TSlave
*
slave
= 0;
1396
if
(worker->
IsWorker
()) {
1397
slave
=
CreateSlave
(
u
.GetUrl(),
fullord
,
perfidx
,
image
,
workdir
);
1398
}
else
{
1399
slave
=
CreateSubmaster
(
u
.GetUrl(),
fullord
,
1400
image
, worker->
GetMsd
(), worker->
GetNWrks
());
1401
}
1402
1403
// Add to global list (we will add to the monitor list after
1404
// finalizing the server startup)
1405
Bool_t
slaveOk
=
kTRUE
;
1406
fSlaves
->
Add
(
slave
);
1407
if
(
slave
->IsValid()) {
1408
addedWorkers
->Add(
slave
);
1409
}
else
{
1410
slaveOk
=
kFALSE
;
1411
fBadSlaves
->
Add
(
slave
);
1412
Warning
(
"AddWorkers"
,
"worker '%s' is invalid"
,
slave
->GetOrdinal());
1413
}
1414
1415
PDB
(kGlobal,3)
1416
Info
(
"AddWorkers"
,
"worker on host %s created"
1417
" and added to list (ord: %s)"
, worker->
GetName
(),
slave
->GetOrdinal());
1418
1419
// Notify opening of connection
1420
nSlavesDone
++;
1421
TMessage
m
(
kPROOF_SERVERSTARTED
);
1422
m
<<
TString
(
"Opening connections to workers"
) <<
nSlaves
1423
<<
nSlavesDone
<<
slaveOk
;
1424
gProofServ
->
GetSocket
()->
Send
(
m
);
1425
1426
ord
++;
1427
}
//end of the worker loop
1428
SafeDelete
(
dummysi
);
1429
1430
// Cleanup
1431
SafeDelete
(
workerList
);
1432
1433
nSlavesDone
= 0;
1434
1435
// Here we finalize the server startup: in this way the bulk
1436
// of remote operations are almost parallelized
1437
TIter
nxsl
(
addedWorkers
);
1438
TSlave
*
sl
= 0;
1439
while
((
sl
= (
TSlave
*)
nxsl
())) {
1440
1441
// Finalize setup of the server
1442
if
(
sl
->IsValid())
1443
sl
->SetupServ(
TSlave::kSlave
, 0);
1444
1445
// Monitor good slaves
1446
Bool_t
slaveOk
=
kTRUE
;
1447
if
(
sl
->IsValid()) {
1448
fAllMonitor
->
Add
(
sl
->GetSocket());
1449
PDB
(kGlobal,3)
1450
Info
(
"AddWorkers"
,
"worker on host %s finalized"
1451
" and added to list"
,
sl
->GetOrdinal());
1452
}
else
{
1453
slaveOk
=
kFALSE
;
1454
fBadSlaves
->
Add
(
sl
);
1455
}
1456
1457
// Notify end of startup operations
1458
nSlavesDone
++;
1459
TMessage
m
(
kPROOF_SERVERSTARTED
);
1460
m
<<
TString
(
"Setting up worker servers"
) <<
nSlaves
1461
<<
nSlavesDone
<<
slaveOk
;
1462
gProofServ
->
GetSocket
()->
Send
(
m
);
1463
}
1464
1465
// Now set new state on the added workers (on all workers for simplicity)
1466
// use fEnabledPackages, fLoadedMacros,
1467
// gSystem->GetDynamicPath() and gSystem->GetIncludePath()
1468
// no need to load packages that are only loaded and not enabled (dyn mode)
1469
Int_t
nwrk
=
GetRemoteProtocol
() > 35 ? -1 : 9999;
1470
TNamed
*
n
= 0;
1471
if
(
TProof::GetEnvVars
() &&
1472
(
n
= (
TNamed
*)
TProof::GetEnvVars
()->
FindObject
(
"PROOF_NWORKERS"
))) {
1473
TString
s(
n
->GetTitle());
1474
if
(s.
IsDigit
())
nwrk
= s.
Atoi
();
1475
}
1476
1477
if
(
fDynamicStartup
&&
goMoreParallel
) {
1478
1479
PDB
(kGlobal, 3)
1480
Info
(
"AddWorkers"
,
"will invoke GoMoreParallel()"
);
1481
Int_t
nw
=
GoMoreParallel
(
nwrk
);
1482
PDB
(kGlobal, 3)
1483
Info
(
"AddWorkers"
,
"GoMoreParallel()=%d"
,
nw
);
1484
1485
}
1486
else
{
1487
// Not in Dynamic Workers mode
1488
PDB
(kGlobal, 3)
1489
Info
(
"AddWorkers"
,
"will invoke GoParallel()"
);
1490
GoParallel
(
nwrk
,
kFALSE
, 0);
1491
}
1492
1493
// Set worker processing environment
1494
SetupWorkersEnv
(
addedWorkers
,
goMoreParallel
);
1495
1496
// Update list of current workers
1497
PDB
(kGlobal, 3)
1498
Info
(
"AddWorkers"
,
"will invoke SaveWorkerInfo()"
);
1499
SaveWorkerInfo
();
1500
1501
// Inform the client that the number of workers has changed
1502
if
(
fDynamicStartup
&&
gProofServ
) {
1503
PDB
(kGlobal, 3)
1504
Info
(
"AddWorkers"
,
"will invoke SendParallel()"
);
1505
gProofServ
->
SendParallel
(
kTRUE
);
1506
1507
if
(
goMoreParallel
&&
fPlayer
) {
1508
// In case we are adding workers dynamically to an existing process, we
1509
// should invoke a special player's Process() to set only added workers
1510
// to the proper state
1511
PDB
(kGlobal, 3)
1512
Info
(
"AddWorkers"
,
"will send the PROCESS message to selected workers"
);
1513
fPlayer
->
JoinProcess
(
addedWorkers
);
1514
// Update merger counters (new workers are not yet active)
1515
fMergePrg
.
SetNWrks
(
fActiveSlaves
->
GetSize
() +
addedWorkers
->GetSize());
1516
}
1517
}
1518
1519
// Cleanup
1520
delete
addedWorkers
;
1521
1522
return
0;
1523
}
1524
1525
////////////////////////////////////////////////////////////////////////////////
1526
/// Set up packages, loaded macros, include and lib paths ...
1527
1528
void
TProof::SetupWorkersEnv
(
TList
*
addedWorkers
,
Bool_t
increasingWorkers
)
1529
{
1530
TList
*
server_packs
=
gProofServ
?
gProofServ
->
GetEnabledPackages
() :
nullptr
;
1531
// Packages
1532
TList
*
packs
=
server_packs
?
server_packs
:
GetEnabledPackages
();
1533
if
(
packs
&&
packs
->GetSize() > 0) {
1534
TIter
nxp
(
packs
);
1535
TPair
*
pck
= 0;
1536
while
((
pck
= (
TPair
*)
nxp
())) {
1537
// Upload and Enable methods are intelligent and avoid
1538
// re-uploading or re-enabling of a package to a node that has it.
1539
if
(
fDynamicStartup
&&
increasingWorkers
) {
1540
// Upload only on added workers
1541
PDB
(kGlobal, 3)
1542
Info
(
"SetupWorkersEnv"
,
"will invoke UploadPackage() and EnablePackage() on added workers"
);
1543
if
(
UploadPackage
(
pck
->GetName(),
kUntar
,
addedWorkers
) >= 0)
1544
EnablePackage
(
pck
->GetName(), (
TList
*)
pck
->Value(),
kTRUE
,
addedWorkers
);
1545
}
else
{
1546
PDB
(kGlobal, 3)
1547
Info
(
"SetupWorkersEnv"
,
"will invoke UploadPackage() and EnablePackage() on all workers"
);
1548
if
(
UploadPackage
(
pck
->GetName()) >= 0)
1549
EnablePackage
(
pck
->GetName(), (
TList
*)
pck
->Value(),
kTRUE
);
1550
}
1551
}
1552
}
1553
1554
if
(
server_packs
) {
1555
server_packs
->Delete();
1556
delete
server_packs
;
1557
}
1558
1559
// Loaded macros
1560
if
(
fLoadedMacros
) {
1561
TIter
nxp
(
fLoadedMacros
);
1562
TObjString
*os = 0;
1563
while
((os = (
TObjString
*)
nxp
())) {
1564
PDB
(kGlobal, 3) {
1565
Info
(
"SetupWorkersEnv"
,
"will invoke Load() on selected workers"
);
1566
Printf
(
"Loading a macro : %s"
, os->GetName());
1567
}
1568
Load
(os->GetName(),
kTRUE
,
kTRUE
,
addedWorkers
);
1569
}
1570
}
1571
1572
// Dynamic path
1573
TString
dyn
=
gSystem
->
GetDynamicPath
();
1574
dyn
.ReplaceAll(
":"
,
" "
);
1575
dyn
.ReplaceAll(
"\""
,
" "
);
1576
PDB
(kGlobal, 3)
1577
Info
(
"SetupWorkersEnv"
,
"will invoke AddDynamicPath() on selected workers"
);
1578
AddDynamicPath
(
dyn
,
kFALSE
,
addedWorkers
,
kFALSE
);
// Do not Collect
1579
1580
// Include path
1581
TString
inc
=
gSystem
->
GetIncludePath
();
1582
inc
.ReplaceAll(
"-I"
,
" "
);
1583
inc
.ReplaceAll(
"\""
,
" "
);
1584
PDB
(kGlobal, 3)
1585
Info
(
"SetupWorkersEnv"
,
"will invoke AddIncludePath() on selected workers"
);
1586
AddIncludePath
(
inc
,
kFALSE
,
addedWorkers
,
kFALSE
);
// Do not Collect
1587
1588
// Done
1589
return
;
1590
}
1591
1592
////////////////////////////////////////////////////////////////////////////////
1593
/// Used for shuting down the workres after a query is finished.
1594
/// Sends each of the workers from the workerList, a kPROOF_STOP message.
1595
/// If the workerList == 0, shutdown all the workers.
1596
1597
Int_t
TProof::RemoveWorkers
(
TList
*
workerList
)
1598
{
1599
if
(!
IsMaster
()) {
1600
Error
(
"RemoveWorkers"
,
"RemoveWorkers can only be called on the master!"
);
1601
return
-1;
1602
}
1603
1604
fFileMap
.clear();
// This could be avoided if CopyFromCache was used in SendFile
1605
1606
if
(!
workerList
) {
1607
// shutdown all the workers
1608
TIter
nxsl
(
fSlaves
);
1609
TSlave
*
sl
= 0;
1610
while
((
sl
= (
TSlave
*)
nxsl
())) {
1611
// Shut down the worker assumig that it is not processing
1612
TerminateWorker
(
sl
);
1613
}
1614
1615
}
else
{
1616
if
(!(
workerList
->GetSize())) {
1617
Error
(
"RemoveWorkers"
,
"The list of workers should not be empty!"
);
1618
return
-2;
1619
}
1620
1621
// Loop over all the workers and stop them
1622
TListIter
next(
workerList
);
1623
TObject
*to;
1624
TProofNodeInfo
*worker;
1625
while
((to = next())) {
1626
TSlave
*
sl
= 0;
1627
if
(!
strcmp
(to->
ClassName
(),
"TProofNodeInfo"
)) {
1628
// Get the next worker from the list
1629
worker = (
TProofNodeInfo
*)to;
1630
TIter
nxsl
(
fSlaves
);
1631
while
((
sl
= (
TSlave
*)
nxsl
())) {
1632
// Shut down the worker assumig that it is not processing
1633
if
(
sl
->GetName() == worker->
GetNodeName
())
1634
break
;
1635
}
1636
}
else
if
(to->
InheritsFrom
(
TSlave::Class
())) {
1637
sl
= (
TSlave
*) to;
1638
}
else
{
1639
Warning
(
"RemoveWorkers"
,
"unknown object type: %s - it should be"
1640
" TProofNodeInfo or inheriting from TSlave"
, to->
ClassName
());
1641
}
1642
// Shut down the worker assumig that it is not processing
1643
if
(
sl
) {
1644
if
(
gDebug
> 0)
1645
Info
(
"RemoveWorkers"
,
"terminating worker %s"
,
sl
->GetOrdinal());
1646
TerminateWorker
(
sl
);
1647
}
1648
}
1649
}
1650
1651
// Update also the master counter
1652
if
(
gProofServ
&&
fSlaves
->
GetSize
() <= 0)
gProofServ
->
ReleaseWorker
(
"master"
);
1653
1654
return
0;
1655
}
1656
1657
////////////////////////////////////////////////////////////////////////////////
1658
/// Start up PROOF slaves.
1659
1660
Bool_t
TProof::StartSlaves
(
Bool_t
attach)
1661
{
1662
// If this is a master server, find the config file and start slave
1663
// servers as specified in the config file
1664
if
(
TestBit
(
TProof::kIsMaster
)) {
1665
1666
Int_t
pc = 0;
1667
TList
*
workerList
=
new
TList
;
1668
// Get list of workers
1669
if
(
gProofServ
->
GetWorkers
(
workerList
, pc) ==
TProofServ::kQueryStop
) {
1670
TString
emsg
(
"no resource currently available for this session: please retry later"
);
1671
if
(
gDebug
> 0)
Info
(
"StartSlaves"
,
"%s"
,
emsg
.Data());
1672
gProofServ
->
SendAsynMessage
(
emsg
.Data());
1673
return
kFALSE
;
1674
}
1675
// Setup the workers
1676
if
(
AddWorkers
(
workerList
) < 0)
1677
return
kFALSE
;
1678
1679
}
else
{
1680
1681
// create master server
1682
Printf
(
"Starting master: opening connection ..."
);
1683
TSlave
*
slave
=
CreateSubmaster
(
fUrl
.
GetUrl
(),
"0"
,
"master"
, 0);
1684
1685
if
(
slave
->IsValid()) {
1686
1687
// Notify
1688
fprintf
(
stderr
,
"Starting master:"
1689
" connection open: setting up server ... \r"
);
1690
StartupMessage
(
"Connection to master opened"
,
kTRUE
, 1, 1);
1691
1692
if
(!attach) {
1693
1694
// Set worker interrupt handler
1695
slave
->SetInterruptHandler(
kTRUE
);
1696
1697
// Finalize setup of the server
1698
slave
->SetupServ(
TSlave::kMaster
,
fConfFile
);
1699
1700
if
(
slave
->IsValid()) {
1701
1702
// Notify
1703
Printf
(
"Starting master: OK "
);
1704
StartupMessage
(
"Master started"
,
kTRUE
, 1, 1);
1705
1706
// check protocol compatibility
1707
// protocol 1 is not supported anymore
1708
if
(
fProtocol
== 1) {
1709
Error
(
"StartSlaves"
,
1710
"client and remote protocols not compatible (%d and %d)"
,
1711
kPROOF_Protocol
,
fProtocol
);
1712
slave
->Close(
"S"
);
1713
delete
slave
;
1714
return
kFALSE
;
1715
}
1716
1717
fSlaves
->
Add
(
slave
);
1718
fAllMonitor
->
Add
(
slave
->GetSocket());
1719
1720
// Unset worker interrupt handler
1721
slave
->SetInterruptHandler(
kFALSE
);
1722
1723
// Set interrupt PROOF handler from now on
1724
fIntHandler
=
new
TProofInterruptHandler
(
this
);
1725
1726
// Give-up after 5 minutes
1727
Int_t
rc
=
Collect
(
slave
, 300);
1728
Int_t
slStatus
=
slave
->GetStatus();
1729
if
(
slStatus
== -99 ||
slStatus
== -98 ||
rc
== 0) {
1730
fSlaves
->
Remove
(
slave
);
1731
fAllMonitor
->
Remove
(
slave
->GetSocket());
1732
if
(
slStatus
== -99)
1733
Error
(
"StartSlaves"
,
"no resources available or problems setting up workers (check logs)"
);
1734
else
if
(
slStatus
== -98)
1735
Error
(
"StartSlaves"
,
"could not setup output redirection on master"
);
1736
else
1737
Error
(
"StartSlaves"
,
"setting up master"
);
1738
slave
->Close(
"S"
);
1739
delete
slave
;
1740
return
0;
1741
}
1742
1743
if
(!
slave
->IsValid()) {
1744
fSlaves
->
Remove
(
slave
);
1745
fAllMonitor
->
Remove
(
slave
->GetSocket());
1746
slave
->Close(
"S"
);
1747
delete
slave
;
1748
Error
(
"StartSlaves"
,
1749
"failed to setup connection with PROOF master server"
);
1750
return
kFALSE
;
1751
}
1752
1753
if
(!
gROOT
->IsBatch() &&
TestBit
(
kUseProgressDialog
)) {
1754
if
((
fProgressDialog
=
1755
gROOT
->GetPluginManager()->FindHandler(
"TProofProgressDialog"
)))
1756
if
(
fProgressDialog
->
LoadPlugin
() == -1)
1757
fProgressDialog
= 0;
1758
}
1759
}
else
{
1760
// Notify
1761
Printf
(
"Starting master: failure"
);
1762
}
1763
}
else
{
1764
1765
// Notify
1766
Printf
(
"Starting master: OK "
);
1767
StartupMessage
(
"Master attached"
,
kTRUE
, 1, 1);
1768
1769
if
(!
gROOT
->IsBatch() &&
TestBit
(
kUseProgressDialog
)) {
1770
if
((
fProgressDialog
=
1771
gROOT
->GetPluginManager()->FindHandler(
"TProofProgressDialog"
)))
1772
if
(
fProgressDialog
->
LoadPlugin
() == -1)
1773
fProgressDialog
= 0;
1774
}
1775
1776
fSlaves
->
Add
(
slave
);
1777
fIntHandler
=
new
TProofInterruptHandler
(
this
);
1778
}
1779
1780
}
else
{
1781
delete
slave
;
1782
// Notify only if verbosity is on: most likely the failure has already been notified
1783
if
(
gDebug
> 0)
1784
Error
(
"StartSlaves"
,
"failed to create (or connect to) the PROOF master server"
);
1785
return
kFALSE
;
1786
}
1787
}
1788
1789
return
kTRUE
;
1790
}
1791
1792
////////////////////////////////////////////////////////////////////////////////
1793
/// Close all open slave servers.
1794
/// Client can decide to shutdown the remote session by passing option is 'S'
1795
/// or 's'. Default for clients is detach, if supported. Masters always
1796
/// shutdown the remote counterpart.
1797
1798
void
TProof::Close
(
Option_t
*opt)
1799
{
1800
{ std::lock_guard<std::recursive_mutex> lock(
fCloseMutex
);
1801
1802
fValid
=
kFALSE
;
1803
if
(
fSlaves
) {
1804
if
(
fIntHandler
)
1805
fIntHandler
->
Remove
();
1806
1807
TIter
nxs
(
fSlaves
);
1808
TSlave
*
sl
= 0;
1809
while
((
sl
= (
TSlave
*)
nxs
()))
1810
sl
->Close(opt);
1811
1812
fActiveSlaves
->
Clear
(
"nodelete"
);
1813
fUniqueSlaves
->
Clear
(
"nodelete"
);
1814
fAllUniqueSlaves
->
Clear
(
"nodelete"
);
1815
fNonUniqueMasters
->
Clear
(
"nodelete"
);
1816
fBadSlaves
->
Clear
(
"nodelete"
);
1817
fInactiveSlaves
->
Clear
(
"nodelete"
);
1818
fSlaves
->
Delete
();
1819
}
1820
}
1821
1822
{
R__LOCKGUARD
(
gROOTMutex
);
1823
gROOT
->GetListOfSockets()->Remove(
this
);
1824
1825
if
(
fChains
) {
1826
while
(
TChain
*
chain
=
dynamic_cast<
TChain
*
>
(
fChains
->
First
()) ) {
1827
// remove "chain" from list
1828
chain
->SetProof(0);
1829
RemoveChain
(
chain
);
1830
}
1831
}
1832
1833
if
(
IsProofd
()) {
1834
1835
gROOT
->GetListOfProofs()->Remove(
this
);
1836
if
(
gProof
&&
gProof
==
this
) {
1837
// Set previous proofd-related as default
1838
TIter
pvp
(
gROOT
->GetListOfProofs(),
kIterBackward
);
1839
while
((
gProof
= (
TProof
*)
pvp
())) {
1840
if
(
gProof
->
IsProofd
())
1841
break
;
1842
}
1843
}
1844
}
1845
}
1846
}
1847
1848
////////////////////////////////////////////////////////////////////////////////
1849
/// Create a new TSlave of type TSlave::kSlave.
1850
/// Note: creation of TSlave is private with TProof as a friend.
1851
/// Derived classes must use this function to create slaves.
1852
1853
TSlave
*
TProof::CreateSlave
(
const
char
*
url
,
const
char
*
ord
,
1854
Int_t
perf
,
const
char
*
image
,
const
char
*
workdir
)
1855
{
1856
TSlave
*
sl
=
TSlave::Create
(
url
,
ord
,
perf
,
image
,
1857
this
,
TSlave::kSlave
,
workdir
, 0);
1858
1859
if
(
sl
->IsValid()) {
1860
sl
->SetInputHandler(
new
TProofInputHandler
(
this
,
sl
->GetSocket()));
1861
// must set fParallel to 1 for slaves since they do not
1862
// report their fParallel with a LOG_DONE message
1863
sl
->fParallel = 1;
1864
}
1865
1866
return
sl
;
1867
}
1868
1869
1870
////////////////////////////////////////////////////////////////////////////////
1871
/// Create a new TSlave of type TSlave::kMaster.
1872
/// Note: creation of TSlave is private with TProof as a friend.
1873
/// Derived classes must use this function to create slaves.
1874
1875
TSlave
*
TProof::CreateSubmaster
(
const
char
*
url
,
const
char
*
ord
,
1876
const
char
*
image
,
const
char
*
msd
,
Int_t
nwk
)
1877
{
1878
TSlave
*
sl
=
TSlave::Create
(
url
,
ord
, 100,
image
,
this
,
1879
TSlave::kMaster
, 0,
msd
,
nwk
);
1880
1881
if
(
sl
->IsValid()) {
1882
sl
->SetInputHandler(
new
TProofInputHandler
(
this
,
sl
->GetSocket()));
1883
}
1884
1885
return
sl
;
1886
}
1887
1888
////////////////////////////////////////////////////////////////////////////////
1889
/// Find slave that has TSocket s. Returns 0 in case slave is not found.
1890
1891
TSlave
*
TProof::FindSlave
(
TSocket
*s)
const
1892
{
1893
TSlave
*
sl
;
1894
TIter
next(
fSlaves
);
1895
1896
while
((
sl
= (
TSlave
*)next())) {
1897
if
(
sl
->IsValid() &&
sl
->GetSocket() == s)
1898
return
sl
;
1899
}
1900
return
0;
1901
}
1902
1903
////////////////////////////////////////////////////////////////////////////////
1904
/// Add to the fUniqueSlave list the active slaves that have a unique
1905
/// (user) file system image. This information is used to transfer files
1906
/// only once to nodes that share a file system (an image). Submasters
1907
/// which are not in fUniqueSlaves are put in the fNonUniqueMasters
1908
/// list. That list is used to trigger the transferring of files to
1909
/// the submaster's unique slaves without the need to transfer the file
1910
/// to the submaster.
1911
1912
void
TProof::FindUniqueSlaves
()
1913
{
1914
fUniqueSlaves
->
Clear
();
1915
fUniqueMonitor
->
RemoveAll
();
1916
fAllUniqueSlaves
->
Clear
();
1917
fAllUniqueMonitor
->
RemoveAll
();
1918
fNonUniqueMasters
->
Clear
();
1919
1920
TIter
next(
fActiveSlaves
);
1921
1922
while
(
TSlave
*
sl
=
dynamic_cast<
TSlave
*
>
(next())) {
1923
if
(
fImage
==
sl
->fImage) {
1924
if
(
sl
->GetSlaveType() ==
TSlave::kMaster
) {
1925
fNonUniqueMasters
->
Add
(
sl
);
1926
fAllUniqueSlaves
->
Add
(
sl
);
1927
fAllUniqueMonitor
->
Add
(
sl
->GetSocket());
1928
}
1929
continue
;
1930
}
1931
1932
TIter
next2
(
fUniqueSlaves
);
1933
TSlave
*
replace_slave
= 0;
1934
Bool_t
add =
kTRUE
;
1935
while
(
TSlave
*
sl2
=
dynamic_cast<
TSlave
*
>
(
next2
())) {
1936
if
(
sl
->fImage ==
sl2
->fImage) {
1937
add =
kFALSE
;
1938
if
(
sl
->GetSlaveType() ==
TSlave::kMaster
) {
1939
if
(
sl2
->GetSlaveType() ==
TSlave::kSlave
) {
1940
// give preference to master
1941
replace_slave
=
sl2
;
1942
add =
kTRUE
;
1943
}
else
if
(
sl2
->GetSlaveType() ==
TSlave::kMaster
) {
1944
fNonUniqueMasters
->
Add
(
sl
);
1945
fAllUniqueSlaves
->
Add
(
sl
);
1946
fAllUniqueMonitor
->
Add
(
sl
->GetSocket());
1947
}
else
{
1948
Error
(
"FindUniqueSlaves"
,
"TSlave is neither Master nor Slave"
);
1949
R__ASSERT
(0);
1950
}
1951
}
1952
break
;
1953
}
1954
}
1955
1956
if
(add) {
1957
fUniqueSlaves
->
Add
(
sl
);
1958
fAllUniqueSlaves
->
Add
(
sl
);
1959
fUniqueMonitor
->
Add
(
sl
->GetSocket());
1960
fAllUniqueMonitor
->
Add
(
sl
->GetSocket());
1961
if
(
replace_slave
) {
1962
fUniqueSlaves
->
Remove
(
replace_slave
);
1963
fAllUniqueSlaves
->
Remove
(
replace_slave
);
1964
fUniqueMonitor
->
Remove
(
replace_slave
->GetSocket());
1965
fAllUniqueMonitor
->
Remove
(
replace_slave
->GetSocket());
1966
}
1967
}
1968
}
1969
1970
// will be actiavted in Collect()
1971
fUniqueMonitor
->
DeActivateAll
();
1972
fAllUniqueMonitor
->
DeActivateAll
();
1973
}
1974
1975
////////////////////////////////////////////////////////////////////////////////
1976
/// Return number of slaves as described in the config file.
1977
1978
Int_t
TProof::GetNumberOfSlaves
()
const
1979
{
1980
return
fSlaves
->
GetSize
();
1981
}
1982
1983
////////////////////////////////////////////////////////////////////////////////
1984
/// Return number of active slaves, i.e. slaves that are valid and in
1985
/// the current computing group.
1986
1987
Int_t
TProof::GetNumberOfActiveSlaves
()
const
1988
{
1989
return
fActiveSlaves
->
GetSize
();
1990
}
1991
1992
////////////////////////////////////////////////////////////////////////////////
1993
/// Return number of inactive slaves, i.e. slaves that are valid but not in
1994
/// the current computing group.
1995
1996
Int_t
TProof::GetNumberOfInactiveSlaves
()
const
1997
{
1998
return
fInactiveSlaves
->
GetSize
();
1999
}
2000
2001
////////////////////////////////////////////////////////////////////////////////
2002
/// Return number of unique slaves, i.e. active slaves that have each a
2003
/// unique different user files system.
2004
2005
Int_t
TProof::GetNumberOfUniqueSlaves
()
const
2006
{
2007
return
fUniqueSlaves
->
GetSize
();
2008
}
2009
2010
////////////////////////////////////////////////////////////////////////////////
2011
/// Return number of bad slaves. This are slaves that we in the config
2012
/// file, but refused to startup or that died during the PROOF session.
2013
2014
Int_t
TProof::GetNumberOfBadSlaves
()
const
2015
{
2016
return
fBadSlaves
->
GetSize
();
2017
}
2018
2019
////////////////////////////////////////////////////////////////////////////////
2020
/// Ask the for the statistics of the slaves.
2021
2022
void
TProof::AskStatistics
()
2023
{
2024
if
(!
IsValid
())
return
;
2025
2026
Broadcast
(
kPROOF_GETSTATS
,
kActive
);
2027
Collect
(
kActive
,
fCollectTimeout
);
2028
}
2029
2030
////////////////////////////////////////////////////////////////////////////////
2031
/// Get statistics about CPU time, real time and bytes read.
2032
/// If verbose, print the resuls (always available via GetCpuTime(), GetRealTime()
2033
/// and GetBytesRead()
2034
2035
void
TProof::GetStatistics
(
Bool_t
verbose)
2036
{
2037
if
(
fProtocol
> 27) {
2038
// This returns the correct result
2039
AskStatistics
();
2040
}
else
{
2041
// AskStatistics is buggy: parse the output of Print()
2042
RedirectHandle_t
rh
;
2043
gSystem
->
RedirectOutput
(
fLogFileName
,
"a"
, &
rh
);
2044
Print
();
2045
gSystem
->
RedirectOutput
(0, 0, &
rh
);
2046
TMacro
*
mp
=
GetLastLog
();
2047
if
(
mp
) {
2048
// Look for global directories
2049
TIter
nxl
(
mp
->GetListOfLines());
2050
TObjString
*os = 0;
2051
while
((os = (
TObjString
*)
nxl
())) {
2052
TString
s(os->GetName());
2053
if
(s.
Contains
(
"Total MB's processed:"
)) {
2054
s.
ReplaceAll
(
"Total MB's processed:"
,
""
);
2055
if
(s.
IsFloat
())
fBytesRead
= (
Long64_t
) s.
Atof
() * (1024*1024);
2056
}
else
if
(s.
Contains
(
"Total real time used (s):"
)) {
2057
s.
ReplaceAll
(
"Total real time used (s):"
,
""
);
2058
if
(s.
IsFloat
())
fRealTime
= s.
Atof
();
2059
}
else
if
(s.
Contains
(
"Total CPU time used (s):"
)) {
2060
s.
ReplaceAll
(
"Total CPU time used (s):"
,
""
);
2061
if
(s.
IsFloat
())
fCpuTime
= s.
Atof
();
2062
}
2063
}
2064
delete
mp
;
2065
}
2066
}
2067
2068
if
(verbose) {
2069
Printf
(
" Real/CPU time (s): %.3f / %.3f; workers: %d; processed: %.2f MBs"
,
2070
GetRealTime
(),
GetCpuTime
(),
GetParallel
(),
float
(
GetBytesRead
())/(1024*1024));
2071
}
2072
}
2073
2074
////////////////////////////////////////////////////////////////////////////////
2075
/// Ask the for the number of parallel slaves.
2076
2077
void
TProof::AskParallel
()
2078
{
2079
if
(!
IsValid
())
return
;
2080
2081
Broadcast
(
kPROOF_GETPARALLEL
,
kActive
);
2082
Collect
(
kActive
,
fCollectTimeout
);
2083
}
2084
2085
////////////////////////////////////////////////////////////////////////////////
2086
/// Ask the master for the list of queries.
2087
2088
TList
*
TProof::GetListOfQueries
(
Option_t
*opt)
2089
{
2090
if
(!
IsValid
() ||
TestBit
(
TProof::kIsMaster
))
return
(
TList
*)0;
2091
2092
Bool_t
all
= ((
strchr
(opt,
'A'
) ||
strchr
(opt,
'a'
))) ?
kTRUE
:
kFALSE
;
2093
TMessage
m
(
kPROOF_QUERYLIST
);
2094
m
<<
all
;
2095
Broadcast
(
m
,
kActive
);
2096
Collect
(
kActive
,
fCollectTimeout
);
2097
2098
// This should have been filled by now
2099
return
fQueries
;
2100
}
2101
2102
////////////////////////////////////////////////////////////////////////////////
2103
/// Number of queries processed by this session
2104
2105
Int_t
TProof::GetNumberOfQueries
()
2106
{
2107
if
(
fQueries
)
2108
return
fQueries
->
GetSize
() -
fOtherQueries
;
2109
return
0;
2110
}
2111
2112
////////////////////////////////////////////////////////////////////////////////
2113
/// Set max number of draw queries whose results are saved
2114
2115
void
TProof::SetMaxDrawQueries
(
Int_t
max)
2116
{
2117
if
(max > 0) {
2118
if
(
fPlayer
)
2119
fPlayer
->
SetMaxDrawQueries
(max);
2120
fMaxDrawQueries
= max;
2121
}
2122
}
2123
2124
////////////////////////////////////////////////////////////////////////////////
2125
/// Get max number of queries whose full results are kept in the
2126
/// remote sandbox
2127
2128
void
TProof::GetMaxQueries
()
2129
{
2130
TMessage
m
(
kPROOF_MAXQUERIES
);
2131
m
<<
kFALSE
;
2132
Broadcast
(
m
,
kActive
);
2133
Collect
(
kActive
,
fCollectTimeout
);
2134
}
2135
2136
////////////////////////////////////////////////////////////////////////////////
2137
/// Return pointer to the list of query results in the player
2138
2139
TList
*
TProof::GetQueryResults
()
2140
{
2141
return
(
fPlayer
?
fPlayer
->
GetListOfResults
() : (
TList
*)0);
2142
}
2143
2144
////////////////////////////////////////////////////////////////////////////////
2145
/// Return pointer to the full TQueryResult instance owned by the player
2146
/// and referenced by 'ref'. If ref = 0 or "", return the last query result.
2147
2148
TQueryResult
*
TProof::GetQueryResult
(
const
char
*ref)
2149
{
2150
return
(
fPlayer
?
fPlayer
->
GetQueryResult
(ref) : (
TQueryResult
*)0);
2151
}
2152
2153
////////////////////////////////////////////////////////////////////////////////
2154
/// Ask the master for the list of queries.
2155
/// Options:
2156
/// "A" show information about all the queries known to the
2157
/// server, i.e. even those processed by other sessions
2158
/// "L" show only information about queries locally available
2159
/// i.e. already retrieved. If "L" is specified, "A" is
2160
/// ignored.
2161
/// "F" show all details available about queries
2162
/// "H" print help menu
2163
/// Default ""
2164
2165
void
TProof::ShowQueries
(
Option_t
*opt)
2166
{
2167
Bool_t
help = ((
strchr
(opt,
'H'
) ||
strchr
(opt,
'h'
))) ?
kTRUE
:
kFALSE
;
2168
if
(help) {
2169
2170
// Help
2171
2172
Printf
(
"+++"
);
2173
Printf
(
"+++ Options: \"A\" show all queries known to server"
);
2174
Printf
(
"+++ \"L\" show retrieved queries"
);
2175
Printf
(
"+++ \"F\" full listing of query info"
);
2176
Printf
(
"+++ \"H\" print this menu"
);
2177
Printf
(
"+++"
);
2178
Printf
(
"+++ (case insensitive)"
);
2179
Printf
(
"+++"
);
2180
Printf
(
"+++ Use Retrieve(<#>) to retrieve the full"
2181
" query results from the master"
);
2182
Printf
(
"+++ e.g. Retrieve(8)"
);
2183
2184
Printf
(
"+++"
);
2185
2186
return
;
2187
}
2188
2189
if
(!
IsValid
())
return
;
2190
2191
Bool_t
local
= ((
strchr
(opt,
'L'
) ||
strchr
(opt,
'l'
))) ?
kTRUE
:
kFALSE
;
2192
2193
TObject
*
pq
= 0;
2194
if
(!
local
) {
2195
GetListOfQueries
(opt);
2196
2197
if
(!
fQueries
)
return
;
2198
2199
TIter
nxq
(
fQueries
);
2200
2201
// Queries processed by other sessions
2202
if
(
fOtherQueries
> 0) {
2203
Printf
(
"+++"
);
2204
Printf
(
"+++ Queries processed during other sessions: %d"
,
fOtherQueries
);
2205
Int_t
nq
= 0;
2206
while
(
nq
++ <
fOtherQueries
&& (
pq
=
nxq
()))
2207
pq
->Print(opt);
2208
}
2209
2210
// Queries processed by this session
2211
Printf
(
"+++"
);
2212
Printf
(
"+++ Queries processed during this session: selector: %d, draw: %d"
,
2213
GetNumberOfQueries
(),
fDrawQueries
);
2214
while
((
pq
=
nxq
()))
2215
pq
->Print(opt);
2216
2217
}
else
{
2218
2219
// Queries processed by this session
2220
Printf
(
"+++"
);
2221
Printf
(
"+++ Queries processed during this session: selector: %d, draw: %d"
,
2222
GetNumberOfQueries
(),
fDrawQueries
);
2223
2224
// Queries available locally
2225
TList
*
listlocal
=
fPlayer
?
fPlayer
->
GetListOfResults
() : (
TList
*)0;
2226
if
(
listlocal
) {
2227
Printf
(
"+++"
);
2228
Printf
(
"+++ Queries available locally: %d"
,
listlocal
->GetSize());
2229
TIter
nxlq
(
listlocal
);
2230
while
((
pq
=
nxlq
()))
2231
pq
->Print(opt);
2232
}
2233
}
2234
Printf
(
"+++"
);
2235
}
2236
2237
////////////////////////////////////////////////////////////////////////////////
2238
/// See if the data is ready to be analyzed.
2239
2240
Bool_t
TProof::IsDataReady
(
Long64_t
&
totalbytes
,
Long64_t
&
bytesready
)
2241
{
2242
if
(!
IsValid
())
return
kFALSE
;
2243
2244
TList
submasters
;
2245
TIter
nextSlave
(
GetListOfActiveSlaves
());
2246
while
(
TSlave
*
sl
=
dynamic_cast<
TSlave
*
>
(
nextSlave
())) {
2247
if
(
sl
->GetSlaveType() ==
TSlave::kMaster
) {
2248
submasters
.Add(
sl
);
2249
}
2250
}
2251
2252
fDataReady
=
kTRUE
;
//see if any submasters set it to false
2253
fBytesReady
= 0;
2254
fTotalBytes
= 0;
2255
//loop over submasters and see if data is ready
2256
if
(
submasters
.GetSize() > 0) {
2257
Broadcast
(
kPROOF_DATA_READY
, &
submasters
);
2258
Collect
(&
submasters
);
2259
}
2260
2261
bytesready
=
fBytesReady
;
2262
totalbytes
=
fTotalBytes
;
2263
2264
EmitVA
(
"IsDataReady(Long64_t,Long64_t)"
, 2,
totalbytes
,
bytesready
);
2265
2266
PDB
(kGlobal,2)
2267
Info
(
"IsDataReady"
,
"%lld / %lld (%s)"
,
2268
bytesready
,
totalbytes
,
fDataReady
?
"READY"
:
"NOT READY"
);
2269
2270
return
fDataReady
;
2271
}
2272
2273
////////////////////////////////////////////////////////////////////////////////
2274
/// Send interrupt to master or slave servers.
2275
2276
void
TProof::Interrupt
(
EUrgent
type
,
ESlaves
list)
2277
{
2278
if
(!
IsValid
())
return
;
2279
2280
TList
*
slaves
= 0;
2281
if
(list ==
kAll
)
slaves
=
fSlaves
;
2282
if
(list ==
kActive
)
slaves
=
fActiveSlaves
;
2283
if
(list ==
kUnique
)
slaves
=
fUniqueSlaves
;
2284
if
(list ==
kAllUnique
)
slaves
=
fAllUniqueSlaves
;
2285
2286
if
(
slaves
->GetSize() == 0)
return
;
2287
2288
TSlave
*
sl
;
2289
TIter
next(
slaves
);
2290
2291
while
((
sl
= (
TSlave
*)next())) {
2292
if
(
sl
->IsValid()) {
2293
2294
// Ask slave to progate the interrupt request
2295
sl
->Interrupt((
Int_t
)
type
);
2296
}
2297
}
2298
}
2299
2300
////////////////////////////////////////////////////////////////////////////////
2301
/// Returns number of slaves active in parallel mode. Returns 0 in case
2302
/// there are no active slaves. Returns -1 in case of error.
2303
2304
Int_t
TProof::GetParallel
()
const
2305
{
2306
if
(!
IsValid
())
return
-1;
2307
2308
// iterate over active slaves and return total number of slaves
2309
TIter
nextSlave
(
GetListOfActiveSlaves
());
2310
Int_t
nparallel
= 0;
2311
while
(
TSlave
*
sl
=
dynamic_cast<
TSlave
*
>
(
nextSlave
()))
2312
if
(
sl
->GetParallel() >= 0)
2313
nparallel
+=
sl
->GetParallel();
2314
2315
return
nparallel
;
2316
}
2317
2318
////////////////////////////////////////////////////////////////////////////////
2319
/// Returns list of TSlaveInfo's. In case of error return 0.
2320
2321
TList
*
TProof::GetListOfSlaveInfos
()
2322
{
2323
if
(!
IsValid
())
return
0;
2324
2325
if
(
fSlaveInfo
== 0) {
2326
fSlaveInfo
=
new
TSortedList
(
kSortDescending
);
2327
fSlaveInfo
->
SetOwner
();
2328
}
else
{
2329
fSlaveInfo
->
Delete
();
2330
}
2331
2332
TList
masters
;
2333
TIter
next(
GetListOfSlaves
());
2334
TSlave
*
slave
;
2335
2336
while
((
slave
= (
TSlave
*) next()) != 0) {
2337
if
(
slave
->GetSlaveType() ==
TSlave::kSlave
) {
2338
const
char
*
name
=
IsLite
() ?
gSystem
->
HostName
() :
slave
->GetName();
2339
TSlaveInfo
*
slaveinfo
=
new
TSlaveInfo
(
slave
->GetOrdinal(),
2340
name
,
2341
slave
->GetPerfIdx());
2342
fSlaveInfo
->
Add
(
slaveinfo
);
2343
2344
TIter
nextactive
(
GetListOfActiveSlaves
());
2345
TSlave
*
activeslave
;
2346
while
((
activeslave
= (
TSlave
*)
nextactive
())) {
2347
if
(
TString
(
slaveinfo
->GetOrdinal()) ==
activeslave
->GetOrdinal()) {
2348
slaveinfo
->SetStatus(
TSlaveInfo::kActive
);
2349
break
;
2350
}
2351
}
2352
2353
TIter
nextbad
(
GetListOfBadSlaves
());
2354
TSlave
*
badslave
;
2355
while
((
badslave
= (
TSlave
*)
nextbad
())) {
2356
if
(
TString
(
slaveinfo
->GetOrdinal()) ==
badslave
->GetOrdinal()) {
2357
slaveinfo
->SetStatus(
TSlaveInfo::kBad
);
2358
break
;
2359
}
2360
}
2361
// Get system info if supported
2362
if
(
slave
->IsValid()) {
2363
if
(
slave
->GetSocket()->Send(
kPROOF_GETSLAVEINFO
) == -1)
2364
MarkBad
(
slave
,
"could not send kPROOF_GETSLAVEINFO message"
);
2365
else
2366
masters
.Add(
slave
);
2367
}
2368
2369
}
else
if
(
slave
->GetSlaveType() ==
TSlave::kMaster
) {
2370
if
(
slave
->IsValid()) {
2371
if
(
slave
->GetSocket()->Send(
kPROOF_GETSLAVEINFO
) == -1)
2372
MarkBad
(
slave
,
"could not send kPROOF_GETSLAVEINFO message"
);
2373
else
2374
masters
.Add(
slave
);
2375
}
2376
}
else
{
2377
Error
(
"GetSlaveInfo"
,
"TSlave is neither Master nor Slave"
);
2378
R__ASSERT
(0);
2379
}
2380
}
2381
if
(
masters
.GetSize() > 0)
Collect
(&
masters
);
2382
2383
return
fSlaveInfo
;
2384
}
2385
2386
////////////////////////////////////////////////////////////////////////////////
2387
/// Activate slave server list.
2388
2389
void
TProof::Activate
(
TList
*
slaves
)
2390
{
2391
TMonitor
*
mon
=
fAllMonitor
;
2392
mon
->DeActivateAll();
2393
2394
slaves
= !
slaves
?
fActiveSlaves
:
slaves
;
2395
2396
TIter
next(
slaves
);
2397
TSlave
*
sl
;
2398
while
((
sl
= (
TSlave
*) next())) {
2399
if
(
sl
->IsValid())
2400
mon
->Activate(
sl
->GetSocket());
2401
}
2402
}
2403
2404
////////////////////////////////////////////////////////////////////////////////
2405
/// Activate (on == TRUE) or deactivate (on == FALSE) all sockets
2406
/// monitored by 'mon'.
2407
2408
void
TProof::SetMonitor
(
TMonitor
*
mon
,
Bool_t
on
)
2409
{
2410
TMonitor
*
m
= (
mon
) ?
mon
:
fCurrentMonitor
;
2411
if
(
m
) {
2412
if
(
on
)
2413
m
->ActivateAll();
2414
else
2415
m
->DeActivateAll();
2416
}
2417
}
2418
2419
////////////////////////////////////////////////////////////////////////////////
2420
/// Broadcast the group priority to all workers in the specified list. Returns
2421
/// the number of workers the message was successfully sent to.
2422
/// Returns -1 in case of error.
2423
2424
Int_t
TProof::BroadcastGroupPriority
(
const
char
*
grp
,
Int_t
priority
,
TList
*
workers
)
2425
{
2426
if
(!
IsValid
())
return
-1;
2427
2428
if
(
workers
->GetSize() == 0)
return
0;
2429
2430
int
nsent
= 0;
2431
TIter
next(
workers
);
2432
2433
TSlave
*
wrk
;
2434
while
((
wrk
= (
TSlave
*)next())) {
2435
if
(
wrk
->IsValid()) {
2436
if
(
wrk
->SendGroupPriority(
grp
,
priority
) == -1)
2437
MarkBad
(
wrk
,
"could not send group priority"
);
2438
else
2439
nsent
++;
2440
}
2441
}
2442
2443
return
nsent
;
2444
}
2445
2446
////////////////////////////////////////////////////////////////////////////////
2447
/// Broadcast the group priority to all workers in the specified list. Returns
2448
/// the number of workers the message was successfully sent to.
2449
/// Returns -1 in case of error.
2450
2451
Int_t
TProof::BroadcastGroupPriority
(
const
char
*
grp
,
Int_t
priority
,
ESlaves
list)
2452
{
2453
TList
*
workers
= 0;
2454
if
(list ==
kAll
)
workers
=
fSlaves
;
2455
if
(list ==
kActive
)
workers
=
fActiveSlaves
;
2456
if
(list ==
kUnique
)
workers
=
fUniqueSlaves
;
2457
if
(list ==
kAllUnique
)
workers
=
fAllUniqueSlaves
;
2458
2459
return
BroadcastGroupPriority
(
grp
,
priority
,
workers
);
2460
}
2461
2462
////////////////////////////////////////////////////////////////////////////////
2463
/// Reset the merge progress notificator
2464
2465
void
TProof::ResetMergePrg
()
2466
{
2467
fMergePrg
.
Reset
(
fActiveSlaves
->
GetSize
());
2468
}
2469
2470
////////////////////////////////////////////////////////////////////////////////
2471
/// Broadcast a message to all slaves in the specified list. Returns
2472
/// the number of slaves the message was successfully sent to.
2473
/// Returns -1 in case of error.
2474
2475
Int_t
TProof::Broadcast
(
const
TMessage
&
mess
,
TList
*
slaves
)
2476
{
2477
if
(!
IsValid
())
return
-1;
2478
2479
if
(!
slaves
||
slaves
->GetSize() == 0)
return
0;
2480
2481
int
nsent
= 0;
2482
TIter
next(
slaves
);
2483
2484
TSlave
*
sl
;
2485
while
((
sl
= (
TSlave
*)next())) {
2486
if
(
sl
->IsValid()) {
2487
if
(
sl
->GetSocket()->Send(
mess
) == -1)
2488
MarkBad
(
sl
,
"could not broadcast request"
);
2489
else
2490
nsent
++;
2491
}
2492
}
2493
2494
return
nsent
;
2495
}
2496
2497
////////////////////////////////////////////////////////////////////////////////
2498
/// Broadcast a message to all slaves in the specified list (either
2499
/// all slaves or only the active slaves). Returns the number of slaves
2500
/// the message was successfully sent to. Returns -1 in case of error.
2501
2502
Int_t
TProof::Broadcast
(
const
TMessage
&
mess
,
ESlaves
list)
2503
{
2504
TList
*
slaves
= 0;
2505
if
(list ==
kAll
)
slaves
=
fSlaves
;
2506
if
(list ==
kActive
)
slaves
=
fActiveSlaves
;
2507
if
(list ==
kUnique
)
slaves
=
fUniqueSlaves
;
2508
if
(list ==
kAllUnique
)
slaves
=
fAllUniqueSlaves
;
2509
2510
return
Broadcast
(
mess
,
slaves
);
2511
}
2512
2513
////////////////////////////////////////////////////////////////////////////////
2514
/// Broadcast a character string buffer to all slaves in the specified
2515
/// list. Use kind to set the TMessage what field. Returns the number of
2516
/// slaves the message was sent to. Returns -1 in case of error.
2517
2518
Int_t
TProof::Broadcast
(
const
char
*str,
Int_t
kind,
TList
*
slaves
)
2519
{
2520
TMessage
mess
(kind);
2521
if
(str)
mess
.WriteString(str);
2522
return
Broadcast
(
mess
,
slaves
);
2523
}
2524
2525
////////////////////////////////////////////////////////////////////////////////
2526
/// Broadcast a character string buffer to all slaves in the specified
2527
/// list (either all slaves or only the active slaves). Use kind to
2528
/// set the TMessage what field. Returns the number of slaves the message
2529
/// was sent to. Returns -1 in case of error.
2530
2531
Int_t
TProof::Broadcast
(
const
char
*str,
Int_t
kind,
ESlaves
list)
2532
{
2533
TMessage
mess
(kind);
2534
if
(str)
mess
.WriteString(str);
2535
return
Broadcast
(
mess
, list);
2536
}
2537
2538
////////////////////////////////////////////////////////////////////////////////
2539
/// Broadcast an object to all slaves in the specified list. Use kind to
2540
/// set the TMEssage what field. Returns the number of slaves the message
2541
/// was sent to. Returns -1 in case of error.
2542
2543
Int_t
TProof::BroadcastObject
(
const
TObject
*obj,
Int_t
kind,
TList
*
slaves
)
2544
{
2545
TMessage
mess
(kind);
2546
mess
.WriteObject(obj);
2547
return
Broadcast
(
mess
,
slaves
);
2548
}
2549
2550
////////////////////////////////////////////////////////////////////////////////
2551
/// Broadcast an object to all slaves in the specified list. Use kind to
2552
/// set the TMEssage what field. Returns the number of slaves the message
2553
/// was sent to. Returns -1 in case of error.
2554
2555
Int_t
TProof::BroadcastObject
(
const
TObject
*obj,
Int_t
kind,
ESlaves
list)
2556
{
2557
TMessage
mess
(kind);
2558
mess
.WriteObject(obj);
2559
return
Broadcast
(
mess
, list);
2560
}
2561
2562
////////////////////////////////////////////////////////////////////////////////
2563
/// Broadcast a raw buffer of specified length to all slaves in the
2564
/// specified list. Returns the number of slaves the buffer was sent to.
2565
/// Returns -1 in case of error.
2566
2567
Int_t
TProof::BroadcastRaw
(
const
void
*buffer,
Int_t
length
,
TList
*
slaves
)
2568
{
2569
if
(!
IsValid
())
return
-1;
2570
2571
if
(
slaves
->GetSize() == 0)
return
0;
2572
2573
int
nsent
= 0;
2574
TIter
next(
slaves
);
2575
2576
TSlave
*
sl
;
2577
while
((
sl
= (
TSlave
*)next())) {
2578
if
(
sl
->IsValid()) {
2579
if
(
sl
->GetSocket()->SendRaw(buffer,
length
) == -1)
2580
MarkBad
(
sl
,
"could not send broadcast-raw request"
);
2581
else
2582
nsent
++;
2583
}
2584
}
2585
2586
return
nsent
;
2587
}
2588
2589
////////////////////////////////////////////////////////////////////////////////
2590
/// Broadcast a raw buffer of specified length to all slaves in the
2591
/// specified list. Returns the number of slaves the buffer was sent to.
2592
/// Returns -1 in case of error.
2593
2594
Int_t
TProof::BroadcastRaw
(
const
void
*buffer,
Int_t
length
,
ESlaves
list)
2595
{
2596
TList
*
slaves
= 0;
2597
if
(list ==
kAll
)
slaves
=
fSlaves
;
2598
if
(list ==
kActive
)
slaves
=
fActiveSlaves
;
2599
if
(list ==
kUnique
)
slaves
=
fUniqueSlaves
;
2600
if
(list ==
kAllUnique
)
slaves
=
fAllUniqueSlaves
;
2601
2602
return
BroadcastRaw
(buffer,
length
,
slaves
);
2603
}
2604
2605
////////////////////////////////////////////////////////////////////////////////
2606
/// Broadcast file to all workers in the specified list. Returns the number of workers
2607
/// the buffer was sent to.
2608
/// Returns -1 in case of error.
2609
2610
Int_t
TProof::BroadcastFile
(
const
char
*file,
Int_t
opt,
const
char
*
rfile
,
TList
*
wrks
)
2611
{
2612
if
(!
IsValid
())
return
-1;
2613
2614
if
(
wrks
->GetSize() == 0)
return
0;
2615
2616
int
nsent
= 0;
2617
TIter
next(
wrks
);
2618
2619
TSlave
*
wrk
;
2620
while
((
wrk
= (
TSlave
*)next())) {
2621
if
(
wrk
->IsValid()) {
2622
if
(
SendFile
(file, opt,
rfile
,
wrk
) < 0)
2623
Error
(
"BroadcastFile"
,
2624
"problems sending file to worker %s (%s)"
,
2625
wrk
->GetOrdinal(),
wrk
->GetName());
2626
else
2627
nsent
++;
2628
}
2629
}
2630
2631
return
nsent
;
2632
}
2633
2634
////////////////////////////////////////////////////////////////////////////////
2635
/// Broadcast file to all workers in the specified list. Returns the number of workers
2636
/// the buffer was sent to.
2637
/// Returns -1 in case of error.
2638
2639
Int_t
TProof::BroadcastFile
(
const
char
*file,
Int_t
opt,
const
char
*
rfile
,
ESlaves
list)
2640
{
2641
TList
*
wrks
= 0;
2642
if
(list ==
kAll
)
wrks
=
fSlaves
;
2643
if
(list ==
kActive
)
wrks
=
fActiveSlaves
;
2644
if
(list ==
kUnique
)
wrks
=
fUniqueSlaves
;
2645
if
(list ==
kAllUnique
)
wrks
=
fAllUniqueSlaves
;
2646
2647
return
BroadcastFile
(file, opt,
rfile
,
wrks
);
2648
}
2649
2650
////////////////////////////////////////////////////////////////////////////////
2651
/// Release the used monitor to be used, making sure to delete newly created
2652
/// monitors.
2653
2654
void
TProof::ReleaseMonitor
(
TMonitor
*
mon
)
2655
{
2656
if
(
mon
&& (
mon
!=
fAllMonitor
) && (
mon
!=
fActiveMonitor
)
2657
&& (
mon
!=
fUniqueMonitor
) && (
mon
!=
fAllUniqueMonitor
)) {
2658
delete
mon
;
2659
}
2660
}
2661
2662
////////////////////////////////////////////////////////////////////////////////
2663
/// Collect responses from slave sl. Returns the number of slaves that
2664
/// responded (=1).
2665
/// If timeout >= 0, wait at most timeout seconds (timeout = -1 by default,
2666
/// which means wait forever).
2667
/// If defined (>= 0) endtype is the message that stops this collection.
2668
2669
Int_t
TProof::Collect
(
const
TSlave
*
sl
,
Long_t
timeout
,
Int_t
endtype
,
Bool_t
deactonfail
)
2670
{
2671
Int_t
rc
= 0;
2672
2673
TMonitor
*
mon
= 0;
2674
if
(!
sl
->IsValid())
return
0;
2675
2676
if
(
fCurrentMonitor
==
fAllMonitor
) {
2677
mon
=
new
TMonitor
;
2678
}
else
{
2679
mon
=
fAllMonitor
;
2680
mon
->DeActivateAll();
2681
}
2682
mon
->Activate(
sl
->GetSocket());
2683
2684
rc
=
Collect
(
mon
,
timeout
,
endtype
,
deactonfail
);
2685
ReleaseMonitor
(
mon
);
2686
return
rc
;
2687
}
2688
2689
////////////////////////////////////////////////////////////////////////////////
2690
/// Collect responses from the slave servers. Returns the number of slaves
2691
/// that responded.
2692
/// If timeout >= 0, wait at most timeout seconds (timeout = -1 by default,
2693
/// which means wait forever).
2694
/// If defined (>= 0) endtype is the message that stops this collection.
2695
2696
Int_t
TProof::Collect
(
TList
*
slaves
,
Long_t
timeout
,
Int_t
endtype
,
Bool_t
deactonfail
)
2697
{
2698
Int_t
rc
= 0;
2699
2700
TMonitor
*
mon
= 0;
2701
2702
if
(
fCurrentMonitor
==
fAllMonitor
) {
2703
mon
=
new
TMonitor
;
2704
}
else
{
2705
mon
=
fAllMonitor
;
2706
mon
->DeActivateAll();
2707
}
2708
TIter
next(
slaves
);
2709
TSlave
*
sl
;
2710
while
((
sl
= (
TSlave
*) next())) {
2711
if
(
sl
->IsValid())
2712
mon
->Activate(
sl
->GetSocket());
2713
}
2714
2715
rc
=
Collect
(
mon
,
timeout
,
endtype
,
deactonfail
);
2716
ReleaseMonitor
(
mon
);
2717
return
rc
;
2718
}
2719
2720
////////////////////////////////////////////////////////////////////////////////
2721
/// Collect responses from the slave servers. Returns the number of slaves
2722
/// that responded.
2723
/// If timeout >= 0, wait at most timeout seconds (timeout = -1 by default,
2724
/// which means wait forever).
2725
/// If defined (>= 0) endtype is the message that stops this collection.
2726
2727
Int_t
TProof::Collect
(
ESlaves
list,
Long_t
timeout
,
Int_t
endtype
,
Bool_t
deactonfail
)
2728
{
2729
Int_t
rc
= 0;
2730
TMonitor
*
mon
= 0;
2731
2732
if
(list ==
kAll
)
mon
=
fAllMonitor
;
2733
if
(list ==
kActive
)
mon
=
fActiveMonitor
;
2734
if
(list ==
kUnique
)
mon
=
fUniqueMonitor
;
2735
if
(list ==
kAllUnique
)
mon
=
fAllUniqueMonitor
;
2736
if
(
fCurrentMonitor
==
mon
) {
2737
// Get a copy
2738
mon
=
new
TMonitor
(*
mon
);
2739
}
2740
mon
->ActivateAll();
2741
2742
rc
=
Collect
(
mon
,
timeout
,
endtype
,
deactonfail
);
2743
ReleaseMonitor
(
mon
);
2744
return
rc
;
2745
}
2746
2747
////////////////////////////////////////////////////////////////////////////////
2748
/// Collect responses from the slave servers. Returns the number of messages
2749
/// received. Can be 0 if there are no active slaves.
2750
/// If timeout >= 0, wait at most timeout seconds (timeout = -1 by default,
2751
/// which means wait forever).
2752
/// If defined (>= 0) endtype is the message that stops this collection.
2753
/// Collect also stops its execution from time to time to check for new
2754
/// workers in Dynamic Startup mode.
2755
2756
Int_t
TProof::Collect
(
TMonitor
*
mon
,
Long_t
timeout
,
Int_t
endtype
,
Bool_t
deactonfail
)
2757
{
2758
Int_t
collectId
=
gRandom
->
Integer
(9999);
2759
2760
PDB
(kCollect, 3)
2761
Info
(
"Collect"
,
">>>>>> Entering collect responses #%04d"
,
collectId
);
2762
2763
// Reset the status flag and clear the messages in the list, if any
2764
fStatus
= 0;
2765
fRecvMessages
->
Clear
();
2766
2767
Long_t
actto
= (
Long_t
)(
gEnv
->
GetValue
(
"Proof.SocketActivityTimeout"
, -1) * 1000);
2768
2769
if
(!
mon
->GetActive(
actto
))
return
0;
2770
2771
DeActivateAsyncInput
();
2772
2773
// Used by external code to know what we are monitoring
2774
TMonitor
*
savedMonitor
= 0;
2775
if
(
fCurrentMonitor
) {
2776
savedMonitor
=
fCurrentMonitor
;
2777
fCurrentMonitor
=
mon
;
2778
}
else
{
2779
fCurrentMonitor
=
mon
;
2780
fBytesRead
= 0;
2781
fRealTime
= 0.0;
2782
fCpuTime
= 0.0;
2783
}
2784
2785
// We want messages on the main window during synchronous collection,
2786
// but we save the present status to restore it at the end
2787
Bool_t
saveRedirLog
=
fRedirLog
;
2788
if
(!
IsIdle
() && !
IsSync
())
2789
fRedirLog
=
kFALSE
;
2790
2791
int
cnt = 0,
rc
= 0;
2792
2793
// Timeout counter
2794
Long_t
nto
=
timeout
;
2795
PDB
(kCollect, 2)
2796
Info
(
"Collect"
,
"#%04d: active: %d"
,
collectId
,
mon
->GetActive());
2797
2798
// On clients, handle Ctrl-C during collection
2799
if
(
fIntHandler
)
2800
fIntHandler
->
Add
();
2801
2802
// Sockets w/o activity during the last 'sto' millisecs are deactivated
2803
Int_t
nact
= 0;
2804
Long_t
sto
= -1;
2805
Int_t
nsto
= 60;
2806
Int_t
pollint
=
gEnv
->
GetValue
(
"Proof.DynamicStartupPollInt"
, (
Int_t
)
kPROOF_DynWrkPollInt_s
);
2807
mon
->ResetInterrupt();
2808
while
((
nact
=
mon
->GetActive(
sto
)) && (
nto < 0 || nto >
0)) {
2809
2810
// Dump last waiting sockets, if in debug mode
2811
PDB
(kCollect, 2) {
2812
if
(
nact
< 4) {
2813
TList
*
al
=
mon
->GetListOfActives();
2814
if
(
al
&&
al
->GetSize() > 0) {
2815
Info
(
"Collect"
,
" %d node(s) still active:"
,
al
->GetSize());
2816
TIter
nxs
(
al
);
2817
TSocket
*
xs
= 0;
2818
while
((
xs
= (
TSocket
*)
nxs
())) {
2819
TSlave
*
wrk
=
FindSlave
(
xs
);
2820
if
(
wrk
)
2821
Info
(
"Collect"
,
" %s (%s)"
,
wrk
->GetName(),
wrk
->GetOrdinal());
2822
else
2823
Info
(
"Collect"
,
" %p: %s:%d"
,
xs
,
xs
->GetInetAddress().GetHostName(),
2824
xs
->GetInetAddress().GetPort());
2825
}
2826
}
2827
delete
al
;
2828
}
2829
}
2830
2831
// Preemptive poll for new workers on the master only in Dynamic Mode and only
2832
// during processing (TODO: should work on Top Master only)
2833
if
(
TestBit
(
TProof::kIsMaster
) && !
IsIdle
() &&
fDynamicStartup
&& !
fIsPollingWorkers
&&
2834
((
fLastPollWorkers_s
== -1) || (time(0)-
fLastPollWorkers_s
>=
pollint
))) {
2835
fIsPollingWorkers
=
kTRUE
;
2836
if
(
PollForNewWorkers
() > 0)
DeActivateAsyncInput
();
2837
fLastPollWorkers_s
= time(0);
2838
fIsPollingWorkers
=
kFALSE
;
2839
PDB
(kCollect, 1)
2840
Info
(
"Collect"
,
"#%04d: now active: %d"
,
collectId
,
mon
->GetActive());
2841
}
2842
2843
// Wait for a ready socket
2844
PDB
(kCollect, 3)
2845
Info
(
"Collect"
,
"Will invoke Select() #%04d"
,
collectId
);
2846
TSocket
*s =
mon
->Select(1000);
2847
2848
if
(s && s != (
TSocket
*)(-1)) {
2849
// Get and analyse the info it did receive
2850
rc
=
CollectInputFrom
(s,
endtype
,
deactonfail
);
2851
if
(
rc
== 1 || (
rc
== 2 && !
savedMonitor
)) {
2852
// Deactivate it if we are done with it
2853
mon
->DeActivate(s);
2854
TList
*
al
=
mon
->GetListOfActives();
2855
PDB
(kCollect, 2)
2856
Info
(
"Collect"
,
"#%04d: deactivating %p (active: %d, %p)"
,
collectId
,
2857
s,
mon
->GetActive(),
2858
al
->First());
2859
delete
al
;
2860
}
else
if
(
rc
== 2) {
2861
// This end message was for the saved monitor
2862
// Deactivate it if we are done with it
2863
if
(
savedMonitor
) {
2864
savedMonitor
->DeActivate(s);
2865
TList
*
al
=
mon
->GetListOfActives();
2866
PDB
(kCollect, 2)
2867
Info
(
"Collect"
,
"save monitor: deactivating %p (active: %d, %p)"
,
2868
s,
savedMonitor
->GetActive(),
2869
al
->First());
2870
delete
al
;
2871
}
2872
}
2873
2874
// Update counter (if no error occured)
2875
if
(
rc
>= 0)
2876
cnt++;
2877
}
else
{
2878
// If not timed-out, exit if not stopped or not aborted
2879
// (player exits status is finished in such a case); otherwise,
2880
// we still need to collect the partial output info
2881
if
(!s)
2882
if
(
fPlayer
&& (
fPlayer
->
GetExitStatus
() ==
TVirtualProofPlayer::kFinished
))
2883
mon
->DeActivateAll();
2884
// Decrease the timeout counter if requested
2885
if
(s == (
TSocket
*)(-1) &&
nto
> 0)
2886
nto
--;
2887
}
2888
2889
// Check if there are workers with ready output to be sent and ask the first to send it
2890
if
(
IsMaster
() &&
fWrksOutputReady
&&
fWrksOutputReady
->
GetSize
() > 0) {
2891
// Maximum number of concurrent sendings
2892
Int_t
mxws
=
gEnv
->
GetValue
(
"Proof.ControlSendOutput"
, 1);
2893
if
(
TProof::GetParameter
(
fPlayer
->
GetInputList
(),
"PROOF_ControlSendOutput"
,
mxws
) != 0)
2894
mxws
=
gEnv
->
GetValue
(
"Proof.ControlSendOutput"
, 1);
2895
TIter
nxwr
(
fWrksOutputReady
);
2896
TSlave
*
wrk
= 0;
2897
while
(
mxws
&& (
wrk
= (
TSlave
*)
nxwr
())) {
2898
if
(!
wrk
->TestBit(
TSlave::kOutputRequested
)) {
2899
// Ask worker for output
2900
TMessage
sendoutput
(
kPROOF_SENDOUTPUT
);
2901
PDB
(kCollect, 2)
2902
Info
(
"Collect"
,
"worker %s was asked to send its output to master"
,
2903
wrk
->GetOrdinal());
2904
if
(
wrk
->GetSocket()->Send(
sendoutput
) != 1) {
2905
wrk
->SetBit(
TSlave::kOutputRequested
);
2906
mxws
--;
2907
}
2908
}
else
{
2909
// Count
2910
mxws
--;
2911
}
2912
}
2913
}
2914
2915
// Check if we need to check the socket activity (we do it every 10 cycles ~ 10 sec)
2916
sto
= -1;
2917
if
(--
nsto
<= 0) {
2918
sto
= (
Long_t
)
actto
;
2919
nsto
= 60;
2920
}
2921
2922
}
// end loop over active monitors
2923
2924
// If timed-out, deactivate the remaining sockets
2925
if
(
nto
== 0) {
2926
TList
*
al
=
mon
->GetListOfActives();
2927
if
(
al
&&
al
->GetSize() > 0) {
2928
// Notify the name of those which did timeout
2929
Info
(
"Collect"
,
" %d node(s) went in timeout:"
,
al
->GetSize());
2930
TIter
nxs
(
al
);
2931
TSocket
*
xs
= 0;
2932
while
((
xs
= (
TSocket
*)
nxs
())) {
2933
TSlave
*
wrk
=
FindSlave
(
xs
);
2934
if
(
wrk
)
2935
Info
(
"Collect"
,
" %s"
,
wrk
->GetName());
2936
else
2937
Info
(
"Collect"
,
" %p: %s:%d"
,
xs
,
xs
->GetInetAddress().GetHostName(),
2938
xs
->GetInetAddress().GetPort());
2939
}
2940
}
2941
delete
al
;
2942
mon
->DeActivateAll();
2943
}
2944
2945
// Deactivate Ctrl-C special handler
2946
if
(
fIntHandler
)
2947
fIntHandler
->
Remove
();
2948
2949
// make sure group view is up to date
2950
SendGroupView
();
2951
2952
// Restore redirection setting
2953
fRedirLog
=
saveRedirLog
;
2954
2955
// Restore the monitor
2956
fCurrentMonitor
=
savedMonitor
;
2957
2958
ActivateAsyncInput
();
2959
2960
PDB
(kCollect, 3)
2961
Info
(
"Collect"
,
"<<<<<< Exiting collect responses #%04d"
,
collectId
);
2962
2963
return
cnt;
2964
}
2965
2966
////////////////////////////////////////////////////////////////////////////////
2967
/// Asks the PROOF Serv for new workers in Dynamic Startup mode and activates
2968
/// them. Returns the number of new workers found, or <0 on errors.
2969
2970
Int_t
TProof::PollForNewWorkers
()
2971
{
2972
// Requests for worker updates
2973
Int_t
dummy = 0;
2974
TList
*
reqWorkers
=
new
TList
();
2975
reqWorkers
->SetOwner(
kFALSE
);
2976
2977
if
(!
TestBit
(
TProof::kIsMaster
)) {
2978
Error
(
"PollForNewWorkers"
,
"Can't invoke: not on a master -- should not happen!"
);
2979
return
-1;
2980
}
2981
if
(!
gProofServ
) {
2982
Error
(
"PollForNewWorkers"
,
"No ProofServ available -- should not happen!"
);
2983
return
-1;
2984
}
2985
2986
gProofServ
->
GetWorkers
(
reqWorkers
, dummy,
kTRUE
);
// last 2 are dummy
2987
2988
// List of new workers only (TProofNodeInfo)
2989
TList
*
newWorkers
=
new
TList
();
2990
newWorkers
->SetOwner(
kTRUE
);
2991
2992
TIter
next(
reqWorkers
);
2993
TProofNodeInfo
*
ni
;
2994
TString
fullOrd
;
2995
while
((
ni
=
dynamic_cast<
TProofNodeInfo
*
>
(next()) )) {
2996
2997
// Form the full ordinal
2998
fullOrd
.Form(
"%s.%s"
,
gProofServ
->
GetOrdinal
(),
ni
->GetOrdinal().Data());
2999
3000
TIter
nextInner
(
fSlaves
);
3001
TSlave
*
sl
;
3002
Bool_t
found =
kFALSE
;
3003
while
((
sl
=
dynamic_cast<
TSlave
*
>
(
nextInner
()) )) {
3004
if
(
strcmp
(
sl
->GetOrdinal(),
fullOrd
.Data()) == 0 ) {
3005
found =
kTRUE
;
3006
break
;
3007
}
3008
}
3009
3010
if
(found)
delete
ni
;
3011
else
{
3012
newWorkers
->Add(
ni
);
3013
PDB
(kGlobal, 1)
3014
Info
(
"PollForNewWorkers"
,
"New worker found: %s:%s"
,
3015
ni
->GetNodeName().Data(),
fullOrd
.Data());
3016
}
3017
}
3018
3019
delete
reqWorkers
;
// not owner
3020
3021
Int_t
nNewWorkers
=
newWorkers
->GetEntries();
3022
3023
// Add the new workers
3024
if
(
nNewWorkers
> 0) {
3025
PDB
(kGlobal, 1)
3026
Info
(
"PollForNewWorkers"
,
"Requesting to add %d new worker(s)"
,
newWorkers
->GetEntries());
3027
Int_t
rv
=
AddWorkers
(
newWorkers
);
3028
if
(
rv
< 0) {
3029
Error
(
"PollForNewWorkers"
,
"Call to AddWorkers() failed (got %d < 0)"
,
rv
);
3030
return
-1;
3031
}
3032
// Don't delete newWorkers: AddWorkers() will do that
3033
}
3034
else
{
3035
PDB
(kGlobal, 2)
3036
Info
(
"PollForNewWorkers"
,
"No new worker found"
);
3037
delete
newWorkers
;
3038
}
3039
3040
return
nNewWorkers
;
3041
}
3042
3043
////////////////////////////////////////////////////////////////////////////////
3044
/// Remove links to objects in list 'ol' from gDirectory
3045
3046
void
TProof::CleanGDirectory
(
TList
*
ol
)
3047
{
3048
if
(
ol
) {
3049
TIter
nxo
(
ol
);
3050
TObject
*o = 0;
3051
while
((o =
nxo
()))
3052
gDirectory
->
RecursiveRemove
(o);
3053
}
3054
}
3055
3056
////////////////////////////////////////////////////////////////////////////////
3057
/// Collect and analyze available input from socket s.
3058
/// Returns 0 on success, -1 if any failure occurs.
3059
3060
Int_t
TProof::CollectInputFrom
(
TSocket
*s,
Int_t
endtype
,
Bool_t
deactonfail
)
3061
{
3062
TMessage
*
mess
;
3063
3064
Int_t
recvrc
= 0;
3065
if
((
recvrc
= s->
Recv
(
mess
)) < 0) {
3066
PDB
(kCollect,2)
3067
Info
(
"CollectInputFrom"
,
"%p: got %d from Recv()"
, s,
recvrc
);
3068
Bool_t
bad
=
kTRUE
;
3069
if
(
recvrc
== -5) {
3070
// Broken connection: try reconnection
3071
if
(
fCurrentMonitor
)
fCurrentMonitor
->
Remove
(s);
3072
if
(s->
Reconnect
() == 0) {
3073
if
(
fCurrentMonitor
)
fCurrentMonitor
->
Add
(s);
3074
bad
=
kFALSE
;
3075
}
3076
}
3077
if
(
bad
)
3078
MarkBad
(s,
"problems receiving a message in TProof::CollectInputFrom(...)"
);
3079
// Ignore this wake up
3080
return
-1;
3081
}
3082
if
(!
mess
) {
3083
// we get here in case the remote server died
3084
MarkBad
(s,
"undefined message in TProof::CollectInputFrom(...)"
);
3085
return
-1;
3086
}
3087
Int_t
rc
= 0;
3088
3089
Int_t
what
=
mess
->What();
3090
TSlave
*
sl
=
FindSlave
(s);
3091
rc
=
HandleInputMessage
(
sl
,
mess
,
deactonfail
);
3092
if
(
rc
== 1 && (
endtype
>= 0) && (
what
!=
endtype
))
3093
// This message was for the base monitor in recursive case
3094
rc
= 2;
3095
3096
// We are done successfully
3097
return
rc
;
3098
}
3099
3100
////////////////////////////////////////////////////////////////////////////////
3101
/// Analyze the received message.
3102
/// Returns 0 on success (1 if this the last message from this socket), -1 if
3103
/// any failure occurs.
3104
3105
Int_t
TProof::HandleInputMessage
(
TSlave
*
sl
,
TMessage
*
mess
,
Bool_t
deactonfail
)
3106
{
3107
char
str[512];
3108
TObject
*obj;
3109
Int_t
rc
= 0;
3110
3111
if
(!
mess
|| !
sl
) {
3112
Warning
(
"HandleInputMessage"
,
"given an empty message or undefined worker"
);
3113
return
-1;
3114
}
3115
Bool_t
delete_mess
=
kTRUE
;
3116
TSocket
*s =
sl
->GetSocket();
3117
if
(!s) {
3118
Warning
(
"HandleInputMessage"
,
"worker socket is undefined"
);
3119
return
-1;
3120
}
3121
3122
// The message type
3123
Int_t
what
=
mess
->What();
3124
3125
PDB
(kCollect,3)
3126
Info
(
"HandleInputMessage"
,
"got type %d from '%s'"
,
what
,
sl
->GetOrdinal());
3127
3128
switch
(
what
) {
3129
3130
case
kMESS_OK
:
3131
// Add the message to the list
3132
fRecvMessages
->
Add
(
mess
);
3133
delete_mess
=
kFALSE
;
3134
break
;
3135
3136
case
kMESS_OBJECT
:
3137
if
(
fPlayer
)
fPlayer
->
HandleRecvHisto
(
mess
);
3138
break
;
3139
3140
case
kPROOF_FATAL
:
3141
{
TString
msg
;
3142
if
((
mess
->BufferSize() >
mess
->Length()))
3143
(*mess) >>
msg
;
3144
if
(
msg
.IsNull()) {
3145
MarkBad
(s,
"received kPROOF_FATAL"
);
3146
}
else
{
3147
MarkBad
(s,
msg
);
3148
}
3149
}
3150
if
(
fProgressDialogStarted
) {
3151
// Finalize the progress dialog
3152
Emit
(
"StopProcess(Bool_t)"
,
kTRUE
);
3153
}
3154
break
;
3155
3156
case
kPROOF_STOP
:
3157
// Stop collection from this worker
3158
Info
(
"HandleInputMessage"
,
"received kPROOF_STOP from %s: disabling any further collection this worker"
,
3159
sl
->GetOrdinal());
3160
rc
= 1;
3161
break
;
3162
3163
case
kPROOF_GETTREEHEADER
:
3164
// Add the message to the list
3165
fRecvMessages
->
Add
(
mess
);
3166
delete_mess
=
kFALSE
;
3167
rc
= 1;
3168
break
;
3169
3170
case
kPROOF_TOUCH
:
3171
// send a request for touching the remote admin file
3172
{
3173
sl
->Touch();
3174
}
3175
break
;
3176
3177
case
kPROOF_GETOBJECT
:
3178
// send slave object it asks for
3179
mess
->ReadString(str,
sizeof
(str));
3180
obj =
gDirectory
->Get(str);
3181
if
(obj)
3182
s->
SendObject
(obj);
3183
else
3184
s->
Send
(
kMESS_NOTOK
);
3185
break
;
3186
3187
case
kPROOF_GETPACKET
:
3188
{
3189
PDB
(kGlobal,2)
3190
Info
(
"HandleInputMessage"
,
"%s: kPROOF_GETPACKET"
,
sl
->GetOrdinal());
3191
TDSetElement
*
elem
= 0;
3192
elem
=
fPlayer
?
fPlayer
->
GetNextPacket
(
sl
,
mess
) : 0;
3193
3194
if
(
elem
!= (
TDSetElement
*) -1) {
3195
TMessage
answ
(
kPROOF_GETPACKET
);
3196
answ
<<
elem
;
3197
s->
Send
(
answ
);
3198
3199
while
(
fWaitingSlaves
!= 0 &&
fWaitingSlaves
->
GetSize
()) {
3200
TPair
*
p
= (
TPair
*)
fWaitingSlaves
->
First
();
3201
s = (
TSocket
*)
p
->Key();
3202
TMessage
*
m
= (
TMessage
*)
p
->Value();
3203
3204
elem
=
fPlayer
?
fPlayer
->
GetNextPacket
(
sl
,
m
) : 0;
3205
if
(
elem
!= (
TDSetElement
*) -1) {
3206
TMessage
a
(
kPROOF_GETPACKET
);
3207
a
<<
elem
;
3208
s->
Send
(
a
);
3209
// remove has to happen via Links because TPair does not have
3210
// a Compare() function and therefore RemoveFirst() and
3211
// Remove(TObject*) do not work
3212
fWaitingSlaves
->
Remove
(
fWaitingSlaves
->
FirstLink
());
3213
delete
p
;
3214
delete
m
;
3215
}
else
{
3216
break
;
3217
}
3218
}
3219
}
else
{
3220
if
(
fWaitingSlaves
== 0)
fWaitingSlaves
=
new
TList
;
3221
fWaitingSlaves
->
Add
(
new
TPair
(s,
mess
));
3222
delete_mess
=
kFALSE
;
3223
}
3224
}
3225
break
;
3226
3227
case
kPROOF_LOGFILE
:
3228
{
3229
Int_t
size
;
3230
(*mess) >>
size
;
3231
PDB
(kGlobal,2)
3232
Info
(
"HandleInputMessage"
,
"%s: kPROOF_LOGFILE: size: %d"
,
sl
->GetOrdinal(),
size
);
3233
RecvLogFile
(s,
size
);
3234
}
3235
break
;
3236
3237
case
kPROOF_LOGDONE
:
3238
(*mess) >>
sl
->fStatus >>
sl
->fParallel;
3239
PDB
(kCollect,2)
3240
Info
(
"HandleInputMessage"
,
"%s: kPROOF_LOGDONE: status %d parallel %d"
,
3241
sl
->GetOrdinal(),
sl
->fStatus,
sl
->fParallel);
3242
if
(
sl
->fStatus != 0) {
3243
// Return last nonzero status
3244
fStatus
=
sl
->fStatus;
3245
// Deactivate the worker, if required
3246
if
(
deactonfail
)
DeactivateWorker
(
sl
->fOrdinal);
3247
}
3248
// Remove from the workers-ready list
3249
if
(
fWrksOutputReady
&&
fWrksOutputReady
->
FindObject
(
sl
)) {
3250
sl
->ResetBit(
TSlave::kOutputRequested
);
3251
fWrksOutputReady
->
Remove
(
sl
);
3252
}
3253
rc
= 1;
3254
break
;
3255
3256
case
kPROOF_GETSTATS
:
3257
{
3258
(*mess) >>
sl
->fBytesRead >>
sl
->fRealTime >>
sl
->fCpuTime
3259
>>
sl
->fWorkDir >>
sl
->fProofWorkDir;
3260
PDB
(kCollect,2)
3261
Info
(
"HandleInputMessage"
,
"kPROOF_GETSTATS: %s"
,
sl
->fWorkDir.Data());
3262
TString
img
;
3263
if
((
mess
->BufferSize() >
mess
->Length()))
3264
(*mess) >>
img
;
3265
// Set image
3266
if
(
img
.IsNull()) {
3267
if
(
sl
->fImage.IsNull())
3268
sl
->fImage.Form(
"%s:%s"
,
TUrl
(
sl
->fName).
GetHostFQDN
(),
3269
sl
->fProofWorkDir.Data());
3270
}
else
{
3271
sl
->fImage =
img
;
3272
}
3273
PDB
(kGlobal,2)
3274
Info
(
"HandleInputMessage"
,
3275
"kPROOF_GETSTATS:%s image: %s"
,
sl
->GetOrdinal(),
sl
->GetImage());
3276
3277
fBytesRead
+=
sl
->fBytesRead;
3278
fRealTime
+=
sl
->fRealTime;
3279
fCpuTime
+=
sl
->fCpuTime;
3280
rc
= 1;
3281
}
3282
break
;
3283
3284
case
kPROOF_GETPARALLEL
:
3285
{
3286
Bool_t
async
=
kFALSE
;
3287
(*mess) >>
sl
->fParallel;
3288
if
((
mess
->BufferSize() >
mess
->Length()))
3289
(*mess) >>
async
;
3290
rc
= (
async
) ? 0 : 1;
3291
}
3292
break
;
3293
3294
case
kPROOF_CHECKFILE
:
3295
{
// New servers (>= 5.22) send the status
3296
if
((
mess
->BufferSize() >
mess
->Length())) {
3297
(*mess) >>
fCheckFileStatus
;
3298
}
else
{
3299
// Form old servers this meant success (failure was signaled with the
3300
// dangerous kPROOF_FATAL)
3301
fCheckFileStatus
= 1;
3302
}
3303
rc
= 1;
3304
}
3305
break
;
3306
3307
case
kPROOF_SENDFILE
:
3308
{
// New server: signals ending of sendfile operation
3309
rc
= 1;
3310
}
3311
break
;
3312
3313
case
kPROOF_PACKAGE_LIST
:
3314
{
3315
PDB
(kGlobal,2)
Info
(
"HandleInputMessage"
,
"kPROOF_PACKAGE_LIST: enter"
);
3316
Int_t
type
= 0;
3317
(*mess) >>
type
;
3318
switch
(
type
) {
3319
case
TProof::kListEnabledPackages
:
3320
SafeDelete
(
fEnabledPackages
);
3321
fEnabledPackages
= (
TList
*)
mess
->ReadObject(
TList::Class
());
3322
if
(
fEnabledPackages
) {
3323
fEnabledPackages
->
SetOwner
();
3324
}
else
{
3325
Error
(
"HandleInputMessage"
,
3326
"kPROOF_PACKAGE_LIST: kListEnabledPackages: TList not found in message!"
);
3327
}
3328
break
;
3329
case
TProof::kListPackages
:
3330
SafeDelete
(
fAvailablePackages
);
3331
fAvailablePackages
= (
TList
*)
mess
->ReadObject(
TList::Class
());
3332
if
(
fAvailablePackages
) {
3333
fAvailablePackages
->
SetOwner
();
3334
}
else
{
3335
Error
(
"HandleInputMessage"
,
3336
"kPROOF_PACKAGE_LIST: kListPackages: TList not found in message!"
);
3337
}
3338
break
;
3339
default
:
3340
Error
(
"HandleInputMessage"
,
"kPROOF_PACKAGE_LIST: unknown type: %d"
,
type
);
3341
}
3342
}
3343
break
;
3344
3345
case
kPROOF_SENDOUTPUT
:
3346
{
3347
// We start measuring the merging time
3348
fPlayer
->
SetMerging
();
3349
3350
// Worker is ready to send output: make sure the relevant bit is reset
3351
sl
->ResetBit(
TSlave::kOutputRequested
);
3352
PDB
(kGlobal,2)
3353
Info
(
"HandleInputMessage"
,
"kPROOF_SENDOUTPUT: enter (%s)"
,
sl
->GetOrdinal());
3354
// Create the list if not yet done
3355
if
(!
fWrksOutputReady
) {
3356
fWrksOutputReady
=
new
TList
;
3357
fWrksOutputReady
->
SetOwner
(
kFALSE
);
3358
}
3359
fWrksOutputReady
->
Add
(
sl
);
3360
}
3361
break
;
3362
3363
case
kPROOF_OUTPUTOBJECT
:
3364
{
3365
// We start measuring the merging time
3366
fPlayer
->
SetMerging
();
3367
3368
PDB
(kGlobal,2)
3369
Info
(
"HandleInputMessage"
,
"kPROOF_OUTPUTOBJECT: enter"
);
3370
Int_t
type
= 0;
3371
const
char
*prefix =
gProofServ
?
gProofServ
->
GetPrefix
() :
"Lite-0"
;
3372
if
(!
TestBit
(
TProof::kIsClient
) && !
fMergersSet
&& !
fFinalizationRunning
) {
3373
Info
(
"HandleInputMessage"
,
"finalization on %s started ..."
, prefix);
3374
fFinalizationRunning
=
kTRUE
;
3375
}
3376
3377
while
((
mess
->BufferSize() >
mess
->Length())) {
3378
(*mess) >>
type
;
3379
// If a query result header, add it to the player list
3380
if
(
fPlayer
) {
3381
if
(
type
== 0) {
3382
// Retrieve query result instance (output list not filled)
3383
TQueryResult
*
pq
=
3384
(
TQueryResult
*)
mess
->ReadObject(
TQueryResult::Class
());
3385
if
(
pq
) {
3386
// Add query to the result list in TProofPlayer
3387
fPlayer
->
AddQueryResult
(
pq
);
3388
fPlayer
->
SetCurrentQuery
(
pq
);
3389
// And clear the output list, as we start merging a new set of results
3390
if
(
fPlayer
->
GetOutputList
())
3391
fPlayer
->
GetOutputList
()->Clear();
3392
// Add the unique query tag as TNamed object to the input list
3393
// so that it is available in TSelectors for monitoring
3394
TString
qid
=
TString::Format
(
"%s:%s"
,
pq
->GetTitle(),
pq
->GetName());
3395
if
(
fPlayer
->
GetInputList
()->FindObject(
"PROOF_QueryTag"
))
3396
fPlayer
->
GetInputList
()->Remove(
fPlayer
->
GetInputList
()->FindObject(
"PROOF_QueryTag"
));
3397
fPlayer
->
AddInput
(
new
TNamed
(
"PROOF_QueryTag"
,
qid
.Data()));
3398
}
else
{
3399
Warning
(
"HandleInputMessage"
,
"kPROOF_OUTPUTOBJECT: query result missing"
);
3400
}
3401
}
else
if
(
type
> 0) {
3402
// Read object
3403
TObject
*o =
mess
->ReadObject(
TObject::Class
());
3404
// Increment counter on the client side
3405
fMergePrg
.
IncreaseIdx
();
3406
TString
msg
;
3407
Bool_t
changed
=
kFALSE
;
3408
msg
.Form(
"%s: merging output objects ... %s"
, prefix,
fMergePrg
.
Export
(
changed
));
3409
if
(
gProofServ
) {
3410
gProofServ
->
SendAsynMessage
(
msg
.Data(),
kFALSE
);
3411
}
else
if
(
IsTty
() ||
changed
) {
3412
fprintf
(
stderr
,
"%s\r"
,
msg
.Data());
3413
}
3414
// Add or merge it
3415
if
((
fPlayer
->
AddOutputObject
(o) == 1)) {
3416
// Remove the object if it has been merged
3417
SafeDelete
(o);
3418
}
3419
if
(
type
> 1) {
3420
// Update the merger progress info
3421
fMergePrg
.
DecreaseNWrks
();
3422
if
(
TestBit
(
TProof::kIsClient
) && !
IsLite
()) {
3423
// In PROOFLite this has to be done once only in TProofLite::Process
3424
TQueryResult
*
pq
=
fPlayer
->
GetCurrentQuery
();
3425
if
(
pq
) {
3426
pq
->SetOutputList(
fPlayer
->
GetOutputList
(),
kFALSE
);
3427
// Add input objects (do not override remote settings, if any)
3428
TObject
*
xo
= 0;
3429
TIter
nxin
(
fPlayer
->
GetInputList
());
3430
// Servers prior to 5.28/00 do not create the input list in the TQueryResult
3431
if
(!
pq
->GetInputList())
pq
->SetInputList(
new
TList
());
3432
while
((
xo
=
nxin
()))
3433
if
(!
pq
->GetInputList()->FindObject(
xo
->GetName()))
3434
pq
->AddInput(
xo
->Clone());
3435
// If the last object, notify the GUI that the result arrived
3436
QueryResultReady
(
TString::Format
(
"%s:%s"
,
pq
->GetTitle(),
pq
->GetName()));
3437
}
3438
// Processing is over
3439
UpdateDialog
();
3440
}
3441
}
3442
}
3443
}
else
{
3444
Warning
(
"HandleInputMessage"
,
"kPROOF_OUTPUTOBJECT: player undefined!"
);
3445
}
3446
}
3447
}
3448
break
;
3449
3450
case
kPROOF_OUTPUTLIST
:
3451
{
3452
// We start measuring the merging time
3453
3454
PDB
(kGlobal,2)
3455
Info
(
"HandleInputMessage"
,
"%s: kPROOF_OUTPUTLIST: enter"
,
sl
->GetOrdinal());
3456
TList
*out = 0;
3457
if
(
fPlayer
) {
3458
fPlayer
->
SetMerging
();
3459
if
(
TestBit
(
TProof::kIsMaster
) ||
fProtocol
< 7) {
3460
out = (
TList
*)
mess
->ReadObject(
TList::Class
());
3461
}
else
{
3462
TQueryResult
*
pq
=
3463
(
TQueryResult
*)
mess
->ReadObject(
TQueryResult::Class
());
3464
if
(
pq
) {
3465
// Add query to the result list in TProofPlayer
3466
fPlayer
->
AddQueryResult
(
pq
);
3467
fPlayer
->
SetCurrentQuery
(
pq
);
3468
// To avoid accidental cleanups from anywhere else
3469
// remove objects from gDirectory and clone the list
3470
out =
pq
->GetOutputList();
3471
CleanGDirectory
(out);
3472
out = (
TList
*) out->Clone();
3473
// Notify the GUI that the result arrived
3474
QueryResultReady
(
TString::Format
(
"%s:%s"
,
pq
->GetTitle(),
pq
->GetName()));
3475
}
else
{
3476
PDB
(kGlobal,2)
3477
Info
(
"HandleInputMessage"
,
3478
"%s: kPROOF_OUTPUTLIST: query result missing"
,
sl
->GetOrdinal());
3479
}
3480
}
3481
if
(out) {
3482
out->SetOwner();
3483
fPlayer
->
AddOutput
(out);
// Incorporate the list
3484
SafeDelete
(out);
3485
}
else
{
3486
PDB
(kGlobal,2)
3487
Info
(
"HandleInputMessage"
,
3488
"%s: kPROOF_OUTPUTLIST: outputlist is empty"
,
sl
->GetOrdinal());
3489
}
3490
}
else
{
3491
Warning
(
"HandleInputMessage"
,
3492
"%s: kPROOF_OUTPUTLIST: player undefined!"
,
sl
->GetOrdinal());
3493
}
3494
// On clients at this point processing is over
3495
if
(
TestBit
(
TProof::kIsClient
) && !
IsLite
())
3496
UpdateDialog
();
3497
}
3498
break
;
3499
3500
case
kPROOF_QUERYLIST
:
3501
{
3502
PDB
(kGlobal,2)
Info
(
"HandleInputMessage"
,
"kPROOF_QUERYLIST: enter"
);
3503
(*mess) >>
fOtherQueries
>>
fDrawQueries
;
3504
if
(
fQueries
) {
3505
fQueries
->
Delete
();
3506
delete
fQueries
;
3507
fQueries
= 0;
3508
}
3509
fQueries
= (
TList
*)
mess
->ReadObject(
TList::Class
());
3510
}
3511
break
;
3512
3513
case
kPROOF_RETRIEVE
:
3514
{
3515
PDB
(kGlobal,2)
Info
(
"HandleInputMessage"
,
"kPROOF_RETRIEVE: enter"
);
3516
TQueryResult
*
pq
=
3517
(
TQueryResult
*)
mess
->ReadObject(
TQueryResult::Class
());
3518
if
(
pq
&&
fPlayer
) {
3519
fPlayer
->
AddQueryResult
(
pq
);
3520
// Notify the GUI that the result arrived
3521
QueryResultReady
(
TString::Format
(
"%s:%s"
,
pq
->GetTitle(),
pq
->GetName()));
3522
}
else
{
3523
PDB
(kGlobal,2)
3524
Info
(
"HandleInputMessage"
,
3525
"kPROOF_RETRIEVE: query result missing or player undefined"
);
3526
}
3527
}
3528
break
;
3529
3530
case
kPROOF_MAXQUERIES
:
3531
{
3532
PDB
(kGlobal,2)
Info
(
"HandleInputMessage"
,
"kPROOF_MAXQUERIES: enter"
);
3533
Int_t
max = 0;
3534
3535
(*mess) >> max;
3536
Printf
(
"Number of queries fully kept remotely: %d"
, max);
3537
}
3538
break
;
3539
3540
case
kPROOF_SERVERSTARTED
:
3541
{
3542
PDB
(kGlobal,2)
Info
(
"HandleInputMessage"
,
"kPROOF_SERVERSTARTED: enter"
);
3543
3544
UInt_t
tot
= 0,
done
= 0;
3545
TString
action
;
3546
Bool_t
st
=
kTRUE
;
3547
3548
(*mess) >>
action
>>
tot
>>
done
>>
st
;
3549
3550
if
(
TestBit
(
TProof::kIsClient
)) {
3551
if
(
tot
) {
3552
TString
type
= (
action
.Contains(
"submas"
)) ?
"submasters"
3553
:
"workers"
;
3554
Int_t
frac = (
Int_t
) (
done
*100.)/
tot
;
3555
char
msg
[512] = {0};
3556
if
(frac >= 100) {
3557
snprintf
(
msg
, 512,
"%s: OK (%d %s) \n"
,
3558
action
.Data(),
tot
,
type
.Data());
3559
}
else
{
3560
snprintf
(
msg
, 512,
"%s: %d out of %d (%d %%)\r"
,
3561
action
.Data(),
done
,
tot
, frac);
3562
}
3563
if
(
fSync
)
3564
fprintf
(
stderr
,
"%s"
,
msg
);
3565
else
3566
NotifyLogMsg
(
msg
, 0);
3567
}
3568
// Notify GUIs
3569
StartupMessage
(
action
.Data(),
st
, (
Int_t
)
done
, (
Int_t
)
tot
);
3570
}
else
{
3571
3572
// Just send the message one level up
3573
TMessage
m
(
kPROOF_SERVERSTARTED
);
3574
m
<<
action
<<
tot
<<
done
<<
st
;
3575
gProofServ
->
GetSocket
()->
Send
(
m
);
3576
}
3577
}
3578
break
;
3579
3580
case
kPROOF_DATASET_STATUS
:
3581
{
3582
PDB
(kGlobal,2)
Info
(
"HandleInputMessage"
,
"kPROOF_DATASET_STATUS: enter"
);
3583
3584
UInt_t
tot
= 0,
done
= 0;
3585
TString
action
;
3586
Bool_t
st
=
kTRUE
;
3587
3588
(*mess) >>
action
>>
tot
>>
done
>>
st
;
3589
3590
if
(
TestBit
(
TProof::kIsClient
)) {
3591
if
(
tot
) {
3592
TString
type
=
"files"
;
3593
Int_t
frac = (
Int_t
) (
done
*100.)/
tot
;
3594
char
msg
[512] = {0};
3595
if
(frac >= 100) {
3596
snprintf
(
msg
, 512,
"%s: OK (%d %s) \n"
,
3597
action
.Data(),
tot
,
type
.Data());
3598
}
else
{
3599
snprintf
(
msg
, 512,
"%s: %d out of %d (%d %%)\r"
,
3600
action
.Data(),
done
,
tot
, frac);
3601
}
3602
if
(
fSync
)
3603
fprintf
(
stderr
,
"%s"
,
msg
);
3604
else
3605
NotifyLogMsg
(
msg
, 0);
3606
}
3607
// Notify GUIs
3608
DataSetStatus
(
action
.Data(),
st
, (
Int_t
)
done
, (
Int_t
)
tot
);
3609
}
else
{
3610
3611
// Just send the message one level up
3612
TMessage
m
(
kPROOF_DATASET_STATUS
);
3613
m
<<
action
<<
tot
<<
done
<<
st
;
3614
gProofServ
->
GetSocket
()->
Send
(
m
);
3615
}
3616
}
3617
break
;
3618
3619
case
kPROOF_STARTPROCESS
:
3620
{
3621
PDB
(kGlobal,2)
Info
(
"HandleInputMessage"
,
"kPROOF_STARTPROCESS: enter"
);
3622
3623
// For Proof-Lite this variable is the number of workers and is set
3624
// by the player
3625
if
(!
IsLite
()) {
3626
fNotIdle
= 1;
3627
fIsWaiting
=
kFALSE
;
3628
}
3629
3630
// Redirect the output, if needed
3631
fRedirLog
= (
fSync
) ?
fRedirLog
:
kTRUE
;
3632
3633
// The signal is used on masters by XrdProofdProtocol to catch
3634
// the start of processing; on clients it allows to update the
3635
// progress dialog
3636
if
(!
TestBit
(
TProof::kIsMaster
)) {
3637
3638
// This is the end of preparation
3639
fQuerySTW
.
Stop
();
3640
fPrepTime
=
fQuerySTW
.
RealTime
();
3641
PDB
(kGlobal,2)
Info
(
"HandleInputMessage"
,
"Preparation time: %f s"
,
fPrepTime
);
3642
3643
TString
selec
;
3644
Int_t
dsz
= -1;
3645
Long64_t
first = -1,
nent
= -1;
3646
(*mess) >>
selec
>>
dsz
>> first >>
nent
;
3647
// Start or reset the progress dialog
3648
if
(!
gROOT
->IsBatch()) {
3649
if
(
fProgressDialog
&&
3650
!
TestBit
(
kUsingSessionGui
) &&
TestBit
(
kUseProgressDialog
)) {
3651
if
(!
fProgressDialogStarted
) {
3652
fProgressDialog
->
ExecPlugin
(5,
this
,
3653
selec
.Data(),
dsz
, first,
nent
);
3654
fProgressDialogStarted
=
kTRUE
;
3655
}
else
{
3656
ResetProgressDialog
(
selec
,
dsz
, first,
nent
);
3657
}
3658
}
3659
ResetBit
(
kUsingSessionGui
);
3660
}
3661
}
3662
}
3663
break
;
3664
3665
case
kPROOF_ENDINIT
:
3666
{
3667
PDB
(kGlobal,2)
Info
(
"HandleInputMessage"
,
"kPROOF_ENDINIT: enter"
);
3668
3669
if
(
TestBit
(
TProof::kIsMaster
)) {
3670
if
(
fPlayer
)
3671
fPlayer
->
SetInitTime
();
3672
}
3673
}
3674
break
;
3675
3676
case
kPROOF_SETIDLE
:
3677
{
3678
PDB
(kGlobal,2)
3679
Info
(
"HandleInputMessage"
,
"kPROOF_SETIDLE from '%s': enter (%d)"
,
sl
->GetOrdinal(),
fNotIdle
);
3680
3681
// The session is idle
3682
if
(
IsLite
()) {
3683
if
(
fNotIdle
> 0) {
3684
fNotIdle
--;
3685
PDB
(kGlobal,2)
3686
Info
(
"HandleInputMessage"
,
"%s: got kPROOF_SETIDLE"
,
sl
->GetOrdinal());
3687
}
else
{
3688
Warning
(
"HandleInputMessage"
,
3689
"%s: got kPROOF_SETIDLE but no running workers ! protocol error?"
,
3690
sl
->GetOrdinal());
3691
}
3692
}
else
{
3693
fNotIdle
= 0;
3694
// Check if the query has been enqueued
3695
if
((
mess
->BufferSize() >
mess
->Length()))
3696
(*mess) >>
fIsWaiting
;
3697
}
3698
}
3699
break
;
3700
3701
case
kPROOF_QUERYSUBMITTED
:
3702
{
3703
PDB
(kGlobal,2)
Info
(
"HandleInputMessage"
,
"kPROOF_QUERYSUBMITTED: enter"
);
3704
3705
// We have received the sequential number
3706
(*mess) >>
fSeqNum
;
3707
Bool_t
sync
=
fSync
;
3708
if
((
mess
->BufferSize() >
mess
->Length()))
3709
(*mess) >>
sync
;
3710
if
(
sync
!=
fSync
&&
fSync
) {
3711
// The server required to switch to asynchronous mode
3712
Activate
();
3713
fSync
=
kFALSE
;
3714
}
3715
DisableGoAsyn
();
3716
// Check if the query has been enqueued
3717
fIsWaiting
=
kTRUE
;
3718
// For Proof-Lite this variable is the number of workers and is set by the player
3719
if
(!
IsLite
())
3720
fNotIdle
= 1;
3721
3722
rc
= 1;
3723
}
3724
break
;
3725
3726
case
kPROOF_SESSIONTAG
:
3727
{
3728
PDB
(kGlobal,2)
Info
(
"HandleInputMessage"
,
"kPROOF_SESSIONTAG: enter"
);
3729
3730
// We have received the unique tag and save it as name of this object
3731
TString
stag
;
3732
(*mess) >>
stag
;
3733
SetName
(
stag
);
3734
// In the TSlave object
3735
sl
->SetSessionTag(
stag
);
3736
// Server may have also sent the group
3737
if
((
mess
->BufferSize() >
mess
->Length()))
3738
(*mess) >>
fGroup
;
3739
// Server may have also sent the user
3740
if
((
mess
->BufferSize() >
mess
->Length())) {
3741
TString
usr
;
3742
(*mess) >>
usr
;
3743
if
(!
usr
.IsNull())
fUrl
.
SetUser
(
usr
.Data());
3744
}
3745
}
3746
break
;
3747
3748
case
kPROOF_FEEDBACK
:
3749
{
3750
PDB
(kGlobal,2)
3751
Info
(
"HandleInputMessage"
,
"kPROOF_FEEDBACK: enter"
);
3752
TList
*out = (
TList
*)
mess
->ReadObject(
TList::Class
());
3753
out->SetOwner();
3754
if
(
fPlayer
)
3755
fPlayer
->
StoreFeedback
(
sl
, out);
// Adopts the list
3756
else
3757
// Not yet ready: stop collect asap
3758
rc
= 1;
3759
}
3760
break
;
3761
3762
case
kPROOF_AUTOBIN
:
3763
{
3764
PDB
(kGlobal,2)
Info
(
"HandleInputMessage"
,
"kPROOF_AUTOBIN: enter"
);
3765
3766
TString
name
;
3767
Double_t
xmin
,
xmax
,
ymin
,
ymax
, zmin, zmax;
3768
3769
(*mess) >>
name
>>
xmin
>>
xmax
>>
ymin
>>
ymax
>> zmin >> zmax;
3770
3771
if
(
fPlayer
)
fPlayer
->
UpdateAutoBin
(
name
,
xmin
,
xmax
,
ymin
,
ymax
,zmin,zmax);
3772
3773
TMessage
answ
(
kPROOF_AUTOBIN
);
3774
3775
answ
<<
name
<<
xmin
<<
xmax
<<
ymin
<<
ymax
<< zmin << zmax;
3776
3777
s->
Send
(
answ
);
3778
}
3779
break
;
3780
3781
case
kPROOF_PROGRESS
:
3782
{
3783
PDB
(kGlobal,2)
Info
(
"HandleInputMessage"
,
"kPROOF_PROGRESS: enter"
);
3784
3785
if
(
GetRemoteProtocol
() > 25) {
3786
// New format
3787
TProofProgressInfo
*pi = 0;
3788
(*mess) >> pi;
3789
fPlayer
->
Progress
(
sl
,pi);
3790
}
else
if
(
GetRemoteProtocol
() > 11) {
3791
Long64_t
total
,
processed
,
bytesread
;
3792
Float_t
initTime
,
procTime
,
evtrti
,
mbrti
;
3793
(*mess) >>
total
>>
processed
>>
bytesread
3794
>>
initTime
>>
procTime
3795
>>
evtrti
>>
mbrti
;
3796
if
(
fPlayer
)
3797
fPlayer
->
Progress
(
sl
,
total
,
processed
,
bytesread
,
3798
initTime
,
procTime
,
evtrti
,
mbrti
);
3799
3800
}
else
{
3801
// Old format
3802
Long64_t
total
,
processed
;
3803
(*mess) >>
total
>>
processed
;
3804
if
(
fPlayer
)
3805
fPlayer
->
Progress
(
sl
,
total
,
processed
);
3806
}
3807
}
3808
break
;
3809
3810
case
kPROOF_STOPPROCESS
:
3811
{
3812
// This message is sent from a worker that finished processing.
3813
// We determine whether it was asked to finish by the
3814
// packetizer or stopped during processing a packet
3815
// (by TProof::RemoveWorkers() or by an external signal).
3816
// In the later case call packetizer->MarkBad.
3817
PDB
(kGlobal,2)
Info
(
"HandleInputMessage"
,
"kPROOF_STOPPROCESS: enter"
);
3818
3819
Long64_t
events = 0;
3820
Bool_t
abort =
kFALSE
;
3821
TProofProgressStatus
*status = 0;
3822
3823
if
((
mess
->BufferSize() >
mess
->Length()) && (
fProtocol
> 18)) {
3824
(*mess) >> status >> abort;
3825
}
else
if
((
mess
->BufferSize() >
mess
->Length()) && (
fProtocol
> 8)) {
3826
(*mess) >> events >> abort;
3827
}
else
{
3828
(*mess) >> events;
3829
}
3830
if
(
fPlayer
) {
3831
if
(
fProtocol
> 18) {
3832
TList
*
listOfMissingFiles
= 0;
3833
if
(!(
listOfMissingFiles
= (
TList
*)
GetOutput
(
"MissingFiles"
))) {
3834
listOfMissingFiles
=
new
TList
();
3835
listOfMissingFiles
->SetName(
"MissingFiles"
);
3836
if
(
fPlayer
)
3837
fPlayer
->
AddOutputObject
(
listOfMissingFiles
);
3838
}
3839
if
(
fPlayer
->
GetPacketizer
()) {
3840
Int_t
ret
=
3841
fPlayer
->
GetPacketizer
()->AddProcessed(
sl
, status, 0, &
listOfMissingFiles
);
3842
if
(
ret
> 0)
3843
fPlayer
->
GetPacketizer
()->MarkBad(
sl
, status, &
listOfMissingFiles
);
3844
// This object is now owned by the packetizer
3845
status = 0;
3846
}
3847
if
(status)
fPlayer
->
AddEventsProcessed
(status->
GetEntries
());
3848
}
else
{
3849
fPlayer
->
AddEventsProcessed
(events);
3850
}
3851
}
3852
SafeDelete
(status);
3853
if
(!
TestBit
(
TProof::kIsMaster
))
3854
Emit
(
"StopProcess(Bool_t)"
, abort);
3855
break
;
3856
}
3857
3858
case
kPROOF_SUBMERGER
:
3859
{
3860
PDB
(kGlobal,2)
Info
(
"HandleInputMessage"
,
"kPROOF_SUBMERGER: enter"
);
3861
HandleSubmerger
(
mess
,
sl
);
3862
}
3863
break
;
3864
3865
case
kPROOF_GETSLAVEINFO
:
3866
{
3867
PDB
(kGlobal,2)
Info
(
"HandleInputMessage"
,
"kPROOF_GETSLAVEINFO: enter"
);
3868
3869
Bool_t
active = (
GetListOfActiveSlaves
()->
FindObject
(
sl
) != 0);
3870
Bool_t
bad
= (
GetListOfBadSlaves
()->
FindObject
(
sl
) != 0);
3871
TList
*
tmpinfo
= 0;
3872
(*mess) >>
tmpinfo
;
3873
if
(
tmpinfo
== 0) {
3874
Error
(
"HandleInputMessage"
,
"kPROOF_GETSLAVEINFO: no list received!"
);
3875
}
else
{
3876
tmpinfo
->SetOwner(
kFALSE
);
3877
Int_t
nentries
=
tmpinfo
->GetSize();
3878
for
(
Int_t
i=0; i<
nentries
; i++) {
3879
TSlaveInfo
*
slinfo
=
3880
dynamic_cast<
TSlaveInfo
*
>
(
tmpinfo
->At(i));
3881
if
(
slinfo
) {
3882
// If PROOF-Lite
3883
if
(
IsLite
())
slinfo
->fHostName =
gSystem
->
HostName
();
3884
// Check if we have already a instance for this worker
3885
TIter
nxw
(
fSlaveInfo
);
3886
TSlaveInfo
*
ourwi
= 0;
3887
while
((
ourwi
= (
TSlaveInfo
*)
nxw
())) {
3888
if
(!
strcmp
(
ourwi
->GetOrdinal(),
slinfo
->GetOrdinal())) {
3889
ourwi
->SetSysInfo(
slinfo
->GetSysInfo());
3890
ourwi
->fHostName =
slinfo
->GetName();
3891
if
(
slinfo
->GetDataDir() && (
strlen
(
slinfo
->GetDataDir()) > 0))
3892
ourwi
->fDataDir =
slinfo
->GetDataDir();
3893
break
;
3894
}
3895
}
3896
if
(!
ourwi
) {
3897
fSlaveInfo
->
Add
(
slinfo
);
3898
}
else
{
3899
slinfo
=
ourwi
;
3900
}
3901
if
(
slinfo
->fStatus !=
TSlaveInfo::kBad
) {
3902
if
(!active)
slinfo
->SetStatus(
TSlaveInfo::kNotActive
);
3903
if
(
bad
)
slinfo
->SetStatus(
TSlaveInfo::kBad
);
3904
}
3905
if
(
sl
->GetMsd() && (
strlen
(
sl
->GetMsd()) > 0))
3906
slinfo
->fMsd =
sl
->GetMsd();
3907
}
3908
}
3909
delete
tmpinfo
;
3910
rc
= 1;
3911
}
3912
}
3913
break
;
3914
3915
case
kPROOF_VALIDATE_DSET
:
3916
{
3917
PDB
(kGlobal,2)
3918
Info
(
"HandleInputMessage"
,
"kPROOF_VALIDATE_DSET: enter"
);
3919
TDSet
*
dset
= 0;
3920
(*mess) >>
dset
;
3921
if
(!
fDSet
)
3922
Error
(
"HandleInputMessage"
,
"kPROOF_VALIDATE_DSET: fDSet not set"
);
3923
else
3924
fDSet
->
Validate
(
dset
);
3925
delete
dset
;
3926
}
3927
break
;
3928
3929
case
kPROOF_DATA_READY
:
3930
{
3931
PDB
(kGlobal,2)
Info
(
"HandleInputMessage"
,
"kPROOF_DATA_READY: enter"
);
3932
Bool_t
dataready
=
kFALSE
;
3933
Long64_t
totalbytes
,
bytesready
;
3934
(*mess) >>
dataready
>>
totalbytes
>>
bytesready
;
3935
fTotalBytes
+=
totalbytes
;
3936
fBytesReady
+=
bytesready
;
3937
if
(
dataready
==
kFALSE
)
fDataReady
=
dataready
;
3938
}
3939
break
;
3940
3941
case
kPROOF_PING
:
3942
// do nothing (ping is already acknowledged)
3943
break
;
3944
3945
case
kPROOF_MESSAGE
:
3946
{
3947
PDB
(kGlobal,2)
Info
(
"HandleInputMessage"
,
"kPROOF_MESSAGE: enter"
);
3948
3949
// We have received the unique tag and save it as name of this object
3950
TString
msg
;
3951
(*mess) >>
msg
;
3952
Bool_t
lfeed
=
kTRUE
;
3953
if
((
mess
->BufferSize() >
mess
->Length()))
3954
(*mess) >>
lfeed
;
3955
3956
if
(
TestBit
(
TProof::kIsClient
)) {
3957
3958
if
(
fSync
) {
3959
// Notify locally
3960
fprintf
(
stderr
,
"%s%c"
,
msg
.Data(), (
lfeed
?
'\n'
:
'\r'
));
3961
}
else
{
3962
// Notify locally taking care of redirection, windows logs, ...
3963
NotifyLogMsg
(
msg
, (
lfeed
?
"\n"
:
"\r"
));
3964
}
3965
}
else
{
3966
3967
// The message is logged for debugging purposes.
3968
fprintf
(
stderr
,
"%s%c"
,
msg
.Data(), (
lfeed
?
'\n'
:
'\r'
));
3969
if
(
gProofServ
) {
3970
// We hide it during normal operations
3971
gProofServ
->
FlushLogFile
();
3972
3973
// And send the message one level up
3974
gProofServ
->
SendAsynMessage
(
msg
,
lfeed
);
3975
}
3976
}
3977
}
3978
break
;
3979
3980
case
kPROOF_VERSARCHCOMP
:
3981
{
3982
TString
vac
;
3983
(*mess) >>
vac
;
3984
PDB
(kGlobal,2)
Info
(
"HandleInputMessage"
,
"kPROOF_VERSARCHCOMP: %s"
,
vac
.Data());
3985
Int_t
from = 0;
3986
TString
vers
,
archcomp
;
3987
if
(
vac
.Tokenize(
vers
, from,
"|"
))
3988
vac
.Tokenize(
archcomp
, from,
"|"
);
3989
sl
->SetArchCompiler(
archcomp
);
3990
vers
.ReplaceAll(
":"
,
"|"
);
3991
sl
->SetROOTVersion(
vers
);
3992
}
3993
break
;
3994
3995
default
:
3996
{
3997
Error
(
"HandleInputMessage"
,
"unknown command received from '%s' (what = %d)"
,
3998
sl
->GetOrdinal(),
what
);
3999
}
4000
break
;
4001
}
4002
4003
// Cleanup
4004
if
(
delete_mess
)
4005
delete
mess
;
4006
4007
// We are done successfully
4008
return
rc
;
4009
}
4010
4011
////////////////////////////////////////////////////////////////////////////////
4012
/// Process a message of type kPROOF_SUBMERGER
4013
4014
void
TProof::HandleSubmerger
(
TMessage
*
mess
,
TSlave
*
sl
)
4015
{
4016
// Message sub-type
4017
Int_t
type
= 0;
4018
(*mess) >>
type
;
4019
TSocket
*s =
sl
->GetSocket();
4020
4021
switch
(
type
) {
4022
case
kOutputSent
:
4023
{
4024
if
(
IsEndMaster
()) {
4025
Int_t
merger_id
= -1;
4026
(*mess) >>
merger_id
;
4027
4028
PDB
(kSubmerger, 2)
4029
Info
(
"HandleSubmerger"
,
"kOutputSent: Worker %s:%d:%s had sent its output to merger #%d"
,
4030
sl
->GetName(),
sl
->GetPort(),
sl
->GetOrdinal(),
merger_id
);
4031
4032
if
(!
fMergers
||
fMergers
->
GetSize
() <=
merger_id
) {
4033
Error
(
"HandleSubmerger"
,
"kOutputSize: #%d not in list "
,
merger_id
);
4034
break
;
4035
}
4036
TMergerInfo
*
mi
= (
TMergerInfo
*)
fMergers
->
At
(
merger_id
);
4037
mi
->SetMergedWorker();
4038
if
(
mi
->AreAllWorkersMerged()) {
4039
mi
->Deactivate();
4040
if
(
GetActiveMergersCount
() == 0) {
4041
fMergers
->
Clear
();
4042
delete
fMergers
;
4043
fMergersSet
=
kFALSE
;
4044
fMergersCount
= -1;
4045
fLastAssignedMerger
= 0;
4046
PDB
(kSubmerger, 2)
Info
(
"HandleSubmerger"
,
"all mergers removed ... "
);
4047
}
4048
}
4049
}
else
{
4050
PDB
(kSubmerger, 2)
Error
(
"HandleSubmerger"
,
"kOutputSent: received not on endmaster!"
);
4051
}
4052
}
4053
break
;
4054
4055
case
kMergerDown
:
4056
{
4057
Int_t
merger_id
= -1;
4058
(*mess) >>
merger_id
;
4059
4060
PDB
(kSubmerger, 2)
Info
(
"HandleSubmerger"
,
"kMergerDown: #%d "
,
merger_id
);
4061
4062
if
(!
fMergers
||
fMergers
->
GetSize
() <=
merger_id
) {
4063
Error
(
"HandleSubmerger"
,
"kMergerDown: #%d not in list "
,
merger_id
);
4064
break
;
4065
}
4066
4067
TMergerInfo
*
mi
= (
TMergerInfo
*)
fMergers
->
At
(
merger_id
);
4068
if
(!
mi
->IsActive()) {
4069
break
;
4070
}
else
{
4071
mi
->Deactivate();
4072
}
4073
4074
// Stop the invalid merger in the case it is still listening
4075
TMessage
stop(
kPROOF_SUBMERGER
);
4076
stop <<
Int_t
(
kStopMerging
);
4077
stop << 0;
4078
s->
Send
(stop);
4079
4080
// Ask for results from merger (only original results from this node as worker are returned)
4081
AskForOutput
(
mi
->GetMerger());
4082
4083
// Ask for results from all workers assigned to this merger
4084
TIter
nxo
(
mi
->GetWorkers());
4085
TObject
* o = 0;
4086
while
((o =
nxo
())) {
4087
AskForOutput
((
TSlave
*)o);
4088
}
4089
PDB
(kSubmerger, 2)
Info
(
"HandleSubmerger"
,
"kMergerDown:%d: exit"
,
merger_id
);
4090
}
4091
break
;
4092
4093
case
kOutputSize
:
4094
{
4095
if
(
IsEndMaster
()) {
4096
PDB
(kSubmerger, 2)
4097
Info
(
"HandleSubmerger"
,
"worker %s reported as finished "
,
sl
->GetOrdinal());
4098
4099
const
char
*prefix =
gProofServ
?
gProofServ
->
GetPrefix
() :
"Lite-0"
;
4100
if
(!
fFinalizationRunning
) {
4101
Info
(
"HandleSubmerger"
,
"finalization on %s started ..."
, prefix);
4102
fFinalizationRunning
=
kTRUE
;
4103
}
4104
4105
Int_t
output_size
= 0;
4106
Int_t
merging_port
= 0;
4107
(*mess) >>
output_size
>>
merging_port
;
4108
4109
PDB
(kSubmerger, 2)
Info
(
"HandleSubmerger"
,
4110
"kOutputSize: Worker %s:%d:%s reports %d output objects (+ available port %d)"
,
4111
sl
->GetName(),
sl
->GetPort(),
sl
->GetOrdinal(),
output_size
,
merging_port
);
4112
TString
msg
;
4113
if
(!
fMergersSet
) {
4114
4115
Int_t
activeWorkers
=
fCurrentMonitor
?
fCurrentMonitor
->
GetActive
() :
GetNumberOfActiveSlaves
();
4116
4117
// First pass - setting number of mergers according to user or dynamically
4118
fMergersCount
= -1;
// No mergers used if not set by user
4119
TParameter<Int_t>
*
mc
=
dynamic_cast<
TParameter<Int_t>
*
>
(
GetParameter
(
"PROOF_UseMergers"
));
4120
if
(
mc
)
fMergersCount
=
mc
->GetVal();
// Value set by user
4121
TParameter<Int_t>
*
mh
=
dynamic_cast<
TParameter<Int_t>
*
>
(
GetParameter
(
"PROOF_MergersByHost"
));
4122
if
(
mh
)
fMergersByHost
= (
mh
->GetVal() != 0) ?
kTRUE
:
kFALSE
;
// Assign submergers by hostname
4123
4124
// Mergers count specified by user but not valid
4125
if
(
fMergersCount
< 0 || (
fMergersCount
> (
activeWorkers
/2) )) {
4126
msg
.Form(
"%s: Invalid request: cannot start %d mergers for %d workers"
,
4127
prefix,
fMergersCount
,
activeWorkers
);
4128
if
(
gProofServ
)
4129
gProofServ
->
SendAsynMessage
(
msg
);
4130
else
4131
Printf
(
"%s"
,
msg
.Data());
4132
fMergersCount
= 0;
4133
}
4134
// Mergers count will be set dynamically
4135
if
((
fMergersCount
== 0) && (!
fMergersByHost
)) {
4136
if
(
activeWorkers
> 1) {
4137
fMergersCount
=
TMath::Nint
(
TMath::Sqrt
(
activeWorkers
));
4138
if
(
activeWorkers
/
fMergersCount
< 2)
4139
fMergersCount
= (
Int_t
)
TMath::Sqrt
(
activeWorkers
);
4140
}
4141
if
(
fMergersCount
> 1)
4142
msg
.Form(
"%s: Number of mergers set dynamically to %d (for %d workers)"
,
4143
prefix,
fMergersCount
,
activeWorkers
);
4144
else
{
4145
msg
.Form(
"%s: No mergers will be used for %d workers"
,
4146
prefix,
activeWorkers
);
4147
fMergersCount
= -1;
4148
}
4149
if
(
gProofServ
)
4150
gProofServ
->
SendAsynMessage
(
msg
);
4151
else
4152
Printf
(
"%s"
,
msg
.Data());
4153
}
else
if
(
fMergersByHost
) {
4154
// We force mergers at host level to minimize network traffic
4155
if
(
activeWorkers
> 1) {
4156
fMergersCount
= 0;
4157
THashList
hosts
;
4158
TIter
nxwk
(
fSlaves
);
4159
TObject
*
wrk
= 0;
4160
while
((
wrk
=
nxwk
())) {
4161
if
(!
hosts
.FindObject(
wrk
->GetName())) {
4162
hosts
.Add(
new
TObjString
(
wrk
->GetName()));
4163
fMergersCount
++;
4164
}
4165
}
4166
}
4167
if
(
fMergersCount
> 1)
4168
msg
.Form(
"%s: Number of mergers set to %d (for %d workers), one for each slave host"
,
4169
prefix,
fMergersCount
,
activeWorkers
);
4170
else
{
4171
msg
.Form(
"%s: No mergers will be used for %d workers"
,
4172
prefix,
activeWorkers
);
4173
fMergersCount
= -1;
4174
}
4175
if
(
gProofServ
)
4176
gProofServ
->
SendAsynMessage
(
msg
);
4177
else
4178
Printf
(
"%s"
,
msg
.Data());
4179
}
else
{
4180
msg
.Form(
"%s: Number of mergers set by user to %d (for %d workers)"
,
4181
prefix,
fMergersCount
,
activeWorkers
);
4182
if
(
gProofServ
)
4183
gProofServ
->
SendAsynMessage
(
msg
);
4184
else
4185
Printf
(
"%s"
,
msg
.Data());
4186
}
4187
4188
// We started merging; we call it here because fMergersCount is still the original number
4189
// and can be saved internally
4190
fPlayer
->
SetMerging
(
kTRUE
);
4191
4192
// Update merger counters (new workers are not yet active)
4193
fMergePrg
.
SetNWrks
(
fMergersCount
);
4194
4195
if
(
fMergersCount
> 0) {
4196
4197
fMergers
=
new
TList
();
4198
fLastAssignedMerger
= 0;
4199
// Total number of workers, which will not act as mergers ('pure workers')
4200
fWorkersToMerge
= (
activeWorkers
-
fMergersCount
);
4201
// Establish the first merger
4202
if
(!
CreateMerger
(
sl
,
merging_port
)) {
4203
// Cannot establish first merger
4204
AskForOutput
(
sl
);
4205
fWorkersToMerge
--;
4206
fMergersCount
--;
4207
}
4208
if
(
IsLite
())
fMergePrg
.
SetNWrks
(
fMergersCount
);
4209
}
else
{
4210
AskForOutput
(
sl
);
4211
}
4212
fMergersSet
=
kTRUE
;
4213
}
else
{
4214
// Multiple pass
4215
if
(
fMergersCount
== -1) {
4216
// No mergers. Workers send their outputs directly to master
4217
AskForOutput
(
sl
);
4218
}
else
{
4219
if
((
fRedirectNext
> 0 ) && (!
fMergersByHost
)) {
4220
RedirectWorker
(s,
sl
,
output_size
);
4221
fRedirectNext
--;
4222
}
else
{
4223
Bool_t
newMerger
=
kTRUE
;
4224
if
(
fMergersByHost
) {
4225
TIter
nxmg
(
fMergers
);
4226
TMergerInfo
*
mgi
= 0;
4227
while
((
mgi
= (
TMergerInfo
*)
nxmg
())) {
4228
if
(!
strcmp
(
sl
->GetName(),
mgi
->GetMerger()->GetName())) {
4229
newMerger
=
kFALSE
;
4230
break
;
4231
}
4232
}
4233
}
4234
if
((
fMergersCount
>
fMergers
->
GetSize
()) &&
newMerger
) {
4235
// Still not enough mergers established
4236
if
(!
CreateMerger
(
sl
,
merging_port
)) {
4237
// Cannot establish a merger
4238
AskForOutput
(
sl
);
4239
fWorkersToMerge
--;
4240
fMergersCount
--;
4241
}
4242
}
else
4243
RedirectWorker
(s,
sl
,
output_size
);
4244
}
4245
}
4246
}
4247
}
else
{
4248
Error
(
"HandleSubMerger"
,
"kOutputSize received not on endmaster!"
);
4249
}
4250
}
4251
break
;
4252
}
4253
}
4254
4255
////////////////////////////////////////////////////////////////////////////////
4256
/// Redirect output of worker sl to some merger
4257
4258
void
TProof::RedirectWorker
(
TSocket
*s,
TSlave
*
sl
,
Int_t
output_size
)
4259
{
4260
Int_t
merger_id
= -1;
4261
4262
if
(
fMergersByHost
) {
4263
for
(
Int_t
i = 0; i <
fMergers
->
GetSize
(); i++) {
4264
TMergerInfo
*
mgi
= (
TMergerInfo
*)
fMergers
->
At
(i);
4265
if
(!
strcmp
(
sl
->GetName(),
mgi
->GetMerger()->GetName())) {
4266
merger_id
= i;
4267
break
;
4268
}
4269
}
4270
}
else
{
4271
merger_id
=
FindNextFreeMerger
();
4272
}
4273
4274
if
(
merger_id
== -1) {
4275
// No free merger (probably it had crashed before)
4276
AskForOutput
(
sl
);
4277
}
else
{
4278
TMessage
sendoutput
(
kPROOF_SUBMERGER
);
4279
sendoutput
<<
Int_t
(
kSendOutput
);
4280
PDB
(kSubmerger, 2)
4281
Info
(
"RedirectWorker"
,
"redirecting worker %s to merger %d"
,
sl
->GetOrdinal(),
merger_id
);
4282
4283
PDB
(kSubmerger, 2)
Info
(
"RedirectWorker"
,
"redirecting output to merger #%d"
,
merger_id
);
4284
if
(!
fMergers
||
fMergers
->
GetSize
() <=
merger_id
) {
4285
Error
(
"RedirectWorker"
,
"#%d not in list "
,
merger_id
);
4286
return
;
4287
}
4288
TMergerInfo
*
mi
= (
TMergerInfo
*)
fMergers
->
At
(
merger_id
);
4289
4290
TString
hname
= (
IsLite
()) ?
"localhost"
:
mi
->GetMerger()->GetName();
4291
sendoutput
<<
merger_id
;
4292
sendoutput
<<
hname
;
4293
sendoutput
<<
mi
->GetPort();
4294
s->
Send
(
sendoutput
);
4295
mi
->AddMergedObjects(
output_size
);
4296
mi
->AddWorker(
sl
);
4297
}
4298
}
4299
4300
////////////////////////////////////////////////////////////////////////////////
4301
/// Return a merger, which is both active and still accepts some workers to be
4302
/// assigned to it. It works on the 'round-robin' basis.
4303
4304
Int_t
TProof::FindNextFreeMerger
()
4305
{
4306
while
(
fLastAssignedMerger < fMergers->
GetSize() &&
4307
(!((
TMergerInfo
*)
fMergers
->
At
(
fLastAssignedMerger
))->IsActive() ||
4308
((
TMergerInfo
*)
fMergers
->
At
(
fLastAssignedMerger
))->AreAllWorkersAssigned())) {
4309
fLastAssignedMerger
++;
4310
}
4311
4312
if
(
fLastAssignedMerger
==
fMergers
->
GetSize
()) {
4313
fLastAssignedMerger
= 0;
4314
}
else
{
4315
return
fLastAssignedMerger
++;
4316
}
4317
4318
while
(
fLastAssignedMerger < fMergers->
GetSize() &&
4319
(!((
TMergerInfo
*)
fMergers
->
At
(
fLastAssignedMerger
))->IsActive() ||
4320
((
TMergerInfo
*)
fMergers
->
At
(
fLastAssignedMerger
))->AreAllWorkersAssigned())) {
4321
fLastAssignedMerger
++;
4322
}
4323
4324
if
(
fLastAssignedMerger
==
fMergers
->
GetSize
()) {
4325
return
-1;
4326
}
else
{
4327
return
fLastAssignedMerger
++;
4328
}
4329
}
4330
4331
////////////////////////////////////////////////////////////////////////////////
4332
/// Master asks for output from worker sl
4333
4334
void
TProof::AskForOutput
(
TSlave
*
sl
)
4335
{
4336
TMessage
sendoutput
(
kPROOF_SUBMERGER
);
4337
sendoutput
<<
Int_t
(
kSendOutput
);
4338
4339
PDB
(kSubmerger, 2)
Info
(
"AskForOutput"
,
4340
"worker %s was asked to send its output to master"
,
4341
sl
->GetOrdinal());
4342
4343
sendoutput
<< -1;
4344
sendoutput
<<
TString
(
"master"
);
4345
sendoutput
<< -1;
4346
sl
->GetSocket()->Send(
sendoutput
);
4347
if
(
IsLite
())
fMergePrg
.
IncreaseNWrks
();
4348
}
4349
4350
////////////////////////////////////////////////////////////////////////////////
4351
/// Final update of the progress dialog
4352
4353
void
TProof::UpdateDialog
()
4354
{
4355
if
(!
fPlayer
)
return
;
4356
4357
// Handle abort ...
4358
if
(
fPlayer
->
GetExitStatus
() ==
TVirtualProofPlayer::kAborted
) {
4359
if
(
fSync
)
4360
Info
(
"UpdateDialog"
,
4361
"processing was aborted - %lld events processed"
,
4362
fPlayer
->
GetEventsProcessed
());
4363
4364
if
(
GetRemoteProtocol
() > 11) {
4365
// New format
4366
Progress
(-1,
fPlayer
->
GetEventsProcessed
(), -1, -1., -1., -1., -1.);
4367
}
else
{
4368