Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
TPacketizer.cxx
Go to the documentation of this file.
1// @(#)root/proofplayer:$Id$
2// Author: Maarten Ballintijn 18/03/02
3
4/*************************************************************************
5 * Copyright (C) 1995-2002, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12/** \class TPacketizer
13\ingroup proofkernel
14
15This class generates packets to be processed on PROOF worker servers.
16A packet is an event range (begin entry and number of entries) or
17object range (first object and number of objects) in a TTree
18(entries) or a directory (objects) in a file.
19Packets are generated taking into account the performance of the
20remote machine, the time it took to process a previous packet on
21the remote machine, the locality of the database files, etc.
22
23*/
24
25#include "TPacketizer.h"
26
27#include "Riostream.h"
28#include "TDSet.h"
29#include "TEnv.h"
30#include "TError.h"
31#include "TEventList.h"
32#include "TEntryList.h"
33#include "TMap.h"
34#include "TMessage.h"
35#include "TMonitor.h"
36#include "TNtupleD.h"
37#include "TObject.h"
38#include "TParameter.h"
39#include "TPerfStats.h"
40#include "TProofDebug.h"
41#include "TProof.h"
42#include "TProofPlayer.h"
43#include "TProofServ.h"
44#include "TSlave.h"
45#include "TSocket.h"
46#include "TTimer.h"
47#include "TUrl.h"
48#include "TClass.h"
49#include "TMath.h"
50
51//
52// The following three utility classes manage the state of the
53// work to be performed and the slaves involved in the process.
54// A list of TFileNode(s) describes the hosts with files, each
55// has a list of TFileStat(s) keeping the state for each TDSet
56// element (file).
57//
58// The list of TSlaveStat(s) keep track of the work (being) done
59// by each slave
60//
61
62
63//------------------------------------------------------------------------------
64
66
67private:
68 Bool_t fIsDone; // is this element processed
69 TFileNode *fNode; // my FileNode
70 TDSetElement *fElement; // location of the file and its range
71 Long64_t fNextEntry; // cursor in the range, -1 when done
72
73public:
74 TFileStat(TFileNode *node, TDSetElement *elem);
75
76 Bool_t IsDone() const {return fIsDone;}
77 void SetDone() {fIsDone = kTRUE;}
78 TFileNode *GetNode() const {return fNode;}
79 TDSetElement *GetElement() const {return fElement;}
81 void MoveNextEntry(Long64_t step) {fNextEntry += step;}
82};
83
84
86 : fIsDone(kFALSE), fNode(node), fElement(elem), fNextEntry(elem->GetFirst())
87{
88}
89
90
91//------------------------------------------------------------------------------
92
94
95private:
96 TString fNodeName; // FQDN of the node
97 TList *fFiles; // TDSetElements (files) stored on this node
98 TObject *fUnAllocFileNext; // cursor in fFiles
99 TList *fActFiles; // files with work remaining
100 TObject *fActFileNext; // cursor in fActFiles
101 Int_t fMySlaveCnt; // number of slaves running on this node
102 Int_t fSlaveCnt; // number of external slaves processing files on this node
103
104public:
105 TFileNode(const char *name);
106 ~TFileNode() { delete fFiles; delete fActFiles; }
107
109 void IncSlaveCnt(const char *slave) { if (fNodeName != slave) fSlaveCnt++; }
110 void DecSlaveCnt(const char *slave) { if (fNodeName != slave) fSlaveCnt--; R__ASSERT(fSlaveCnt >= 0); }
113 Bool_t IsSortable() const { return kTRUE; }
114
115 const char *GetName() const { return fNodeName.Data(); }
116
117 void Add(TDSetElement *elem)
118 {
119 TFileStat *f = new TFileStat(this,elem);
120 fFiles->Add(f);
122 }
123
125 {
127
128 if (next != 0) {
129 // make file active
130 fActFiles->Add(next);
132
133 // move cursor
135 }
136
137 return (TFileStat *) next;
138 }
139
141 {
142 TObject *next = fActFileNext;
143
144 if (fActFileNext != 0) {
147 }
148
149 return (TFileStat *) next;
150 }
151
153 {
157 }
158
159 Int_t Compare(const TObject *other) const
160 {
161 // Must return -1 if this is smaller than obj, 0 if objects are equal
162 // and 1 if this is larger than obj.
163 const TFileNode *obj = dynamic_cast<const TFileNode*>(other);
164 if (!obj) {
165 Error("Compare", "input is not a TPacketizer::TFileNode object");
166 return 0;
167 }
168
169 Int_t myVal = GetSlaveCnt();
170 Int_t otherVal = obj->GetSlaveCnt();
171 if (myVal < otherVal) {
172 return -1;
173 } else if (myVal > otherVal) {
174 return 1;
175 } else {
176 return 0;
177 }
178 }
179
180 void Print(Option_t *) const
181 {
182 std::cout << "OBJ: " << IsA()->GetName() << "\t" << fNodeName
183 << "\tMySlaveCount " << fMySlaveCnt
184 << "\tSlaveCount " << fSlaveCnt << std::endl;
185 }
186
187 void Reset()
188 {
190 fActFiles->Clear();
191 fActFileNext = 0;
192 fSlaveCnt = 0;
193 fMySlaveCnt = 0;
194 }
195};
196
197
199 : fNodeName(name), fFiles(new TList), fUnAllocFileNext(0),fActFiles(new TList),
200 fActFileNext(0), fMySlaveCnt(0), fSlaveCnt(0)
201{
202 // Constructor
203
204 fFiles->SetOwner();
206}
207
208
209//------------------------------------------------------------------------------
210
212
213friend class TPacketizer;
214
215private:
216 TFileNode *fFileNode; // corresponding node or 0
217 TFileStat *fCurFile; // file currently being processed
218 TDSetElement *fCurElem; // TDSetElement currently being processed
220public:
221 TSlaveStat(TSlave *slave);
222 ~TSlaveStat();
223
224 TFileNode *GetFileNode() const { return fFileNode; }
225
226 void SetFileNode(TFileNode *node) { fFileNode = node; }
227};
228
229
231 : fFileNode(0), fCurFile(0), fCurElem(0)
232{
233 fSlave = slave;
235}
236
237////////////////////////////////////////////////////////////////////////////////
238/// Cleanup
239
241{
242 SafeDelete(fStatus);
243}
244
246{
247 // Update the status info to the 'st'.
248 // return the difference (*st - *fStatus)
249
250 if (st) {
251 // The entriesis not correct in 'st'
252 Long64_t lastEntries = st->GetEntries() - fStatus->GetEntries();
253 // The last proc time should not be added
254 fStatus->SetLastProcTime(0.);
255 // Get the diff
256 TProofProgressStatus *diff = new TProofProgressStatus(*st - *fStatus);
257 *fStatus += *diff;
258 // Set the correct value
259 fStatus->SetLastEntries(lastEntries);
260 return diff;
261 } else {
262 Error("AddProcessed", "status arg undefined");
263 return 0;
264 }
265}
266
267//------------------------------------------------------------------------------
268
270
271////////////////////////////////////////////////////////////////////////////////
272/// Constructor
273
275 Long64_t num, TList *input, TProofProgressStatus *st)
276 : TVirtualPacketizer(input, st)
277{
278 PDB(kPacketizer,1) Info("TPacketizer", "Enter (first %lld, num %lld)", first, num);
279
280 // Init pointer members
281 fPackets = 0;
282 fUnAllocated = 0;
283 fActive = 0;
284 fFileNodes = 0;
285 fMaxPerfIdx = 1;
286 fMaxSlaveCnt = 0;
289
290 if (!fProgressStatus) {
291 Error("TPacketizer", "No progress status");
292 return;
293 }
294
295 Long_t maxSlaveCnt = 0;
296 if (TProof::GetParameter(input, "PROOF_MaxSlavesPerNode", maxSlaveCnt) == 0) {
297 if (maxSlaveCnt < 0) {
298 Warning("TPacketizer", "PROOF_MaxSlavesPerNode must be positive");
299 maxSlaveCnt = 0;
300 }
301 if (maxSlaveCnt > 0) fDefMaxWrkNode = kFALSE;
302 } else {
303 // Try also with Int_t (recently supported in TProof::SetParameter)
304 Int_t mxslcnt = -1;
305 if (TProof::GetParameter(input, "PROOF_MaxSlavesPerNode", mxslcnt) == 0) {
306 if (mxslcnt < 0) {
307 Warning("TPacketizer", "PROOF_MaxSlavesPerNode must be positive");
308 mxslcnt = 0;
309 }
310 maxSlaveCnt = (Long_t) mxslcnt;
311 if (maxSlaveCnt > 0) fDefMaxWrkNode = kFALSE;
312 }
313 }
314 if (!maxSlaveCnt) {
315 maxSlaveCnt = gEnv->GetValue("Packetizer.MaxWorkersPerNode", slaves->GetSize());
316 if (maxSlaveCnt != slaves->GetSize()) fDefMaxWrkNode = kFALSE;
317 }
318 if (maxSlaveCnt > 0) {
319 fMaxSlaveCnt = maxSlaveCnt;
320 PDB(kPacketizer,1)
321 Info("TPacketizer", "setting max number of workers per node to %ld", fMaxSlaveCnt);
322 }
323
324 fPackets = new TList;
326
327 fFileNodes = new TList;
329 fUnAllocated = new TList;
331 fActive = new TList;
333
334
335 fValid = kTRUE;
336
337 // Resolve end-point urls to optmize distribution
338 // dset->Lookup(); // moved to TProofPlayerRemote::Process
339
340 // Split into per host entries
341 dset->Reset();
343 while ((e = (TDSetElement*)dset->Next())) {
344 if (e->GetValid()) continue;
345
346 TUrl url = e->GetFileName();
347
348 // Map non URL filenames to dummy host
349 TString host;
350 if ( !url.IsValid() ||
351 (strncmp(url.GetProtocol(),"root", 4) &&
352 strncmp(url.GetProtocol(),"file", 4)) ) {
353 host = "no-host";
354 } else if ( url.IsValid() && !strncmp(url.GetProtocol(),"file", 4)) {
355 host = "localhost";
356 url.SetProtocol("root");
357 } else {
358 host = url.GetHost();
359 }
360 // Get full name for local hosts
361 if (host.Contains("localhost") || host == "127.0.0.1") {
362 url.SetHost(gSystem->HostName());
363 host = url.GetHostFQDN();
364 }
365
366 TFileNode *node = (TFileNode*) fFileNodes->FindObject( host );
367
368 if (node == 0) {
369 node = new TFileNode(host);
370 fFileNodes->Add(node);
371 }
372
373 node->Add( e );
374 }
375
376 fSlaveStats = new TMap;
378
379 // Record initial available workers
380 Int_t nwrks = AddWorkers(slaves);
381 Info("TPacketizer", "Initial number of workers: %d", nwrks);
382
383 // Setup file & filenode structure
384 Reset();
385 // Optimize the number of files to be open when running on subsample
386 Int_t validateMode = 0;
387 Int_t gprc = TProof::GetParameter(input, "PROOF_ValidateByFile", validateMode);
388 Bool_t byfile = (gprc == 0 && validateMode > 0 && num > -1) ? kTRUE : kFALSE;
389 if (num > -1)
390 PDB(kPacketizer,2)
391 Info("TPacketizer",
392 "processing subset of entries: validating by file? %s", byfile ? "yes": "no");
393 ValidateFiles(dset, slaves, num, byfile);
394
395 if (!fValid) return;
396
397 // apply global range (first,num) to dset and rebuild structure
398 // ommitting TDSet elements that are not needed
399
400 Int_t files = 0;
401 fTotalEntries = 0;
402 fUnAllocated->Clear(); // avoid dangling pointers
403 fActive->Clear();
404 fFileNodes->Clear(); // then delete all objects
405 PDB(kPacketizer,2) Info("TPacketizer", "processing range: first %lld, num %lld", first, num);
406
407 dset->Reset();
408 Long64_t cur = 0;
409 while (( e = (TDSetElement*)dset->Next())) {
410
411 // Skip invalid or missing file; It will be moved
412 // from the dset to the 'MissingFiles' list in the player.
413 if (!e->GetValid()) continue;
414
415 // The dataset name, if any
416 if (fDataSet.IsNull() && e->GetDataSet() && strlen(e->GetDataSet()))
417 fDataSet = e->GetDataSet();
418
419 TUrl url = e->GetFileName();
420 Long64_t eFirst = e->GetFirst();
421 Long64_t eNum = e->GetNum();
422 PDB(kPacketizer,2)
423 Info("TPacketizer", " --> '%s'", e->GetFileName());
424 PDB(kPacketizer,2)
425 Info("TPacketizer", " --> first %lld, num %lld (cur %lld)", eFirst, eNum, cur);
426
427 if (!e->GetEntryList()){
428 // this element is before the start of the global range, skip it
429 if (cur + eNum < first) {
430 cur += eNum;
431 PDB(kPacketizer,2)
432 Info("TPacketizer", " --> skip element cur %lld", cur);
433 continue;
434 }
435
436 // this element is after the end of the global range, skip it
437 if (num != -1 && (first+num <= cur)) {
438 cur += eNum;
439 PDB(kPacketizer,2)
440 Info("TPacketizer", " --> drop element cur %lld", cur);
441 continue; // break ??
442 }
443
444 Bool_t inRange = kFALSE;
445 if (cur <= first || (num != -1 && (first+num <= cur+eNum))) {
446
447 if (cur <= first) {
448 // If this element contains the start of the global range
449 // adjust its start and number of entries
450 e->SetFirst( eFirst + (first - cur) );
451 e->SetNum( e->GetNum() - (first - cur) );
452 PDB(kPacketizer,2)
453 Info("TPacketizer", " --> adjust start %lld and end %lld",
454 eFirst + (first - cur), first + num - cur);
455 inRange = kTRUE;
456 }
457 if (num != -1 && (first+num <= cur+eNum)) {
458 // If this element contains the end of the global range
459 // adjust its number of entries
460 e->SetNum( first + num - e->GetFirst() - cur );
461 PDB(kPacketizer,2)
462 Info("TPacketizer", " --> adjust end %lld", first + num - cur);
463 inRange = kTRUE;
464 }
465
466 } else {
467 // Increment the counter ...
468 PDB(kPacketizer,2)
469 Info("TPacketizer", " --> increment 'cur' by %lld", eNum);
470 cur += eNum;
471 }
472 // Re-adjust eNum and cur, if needed
473 if (inRange) {
474 cur += eNum;
475 eNum = e->GetNum();
476 }
477
478 } else {
479 TEntryList *enl = dynamic_cast<TEntryList *>(e->GetEntryList());
480 if (enl) {
481 eNum = enl->GetN();
482 } else {
483 TEventList *evl = dynamic_cast<TEventList *>(e->GetEntryList());
484 eNum = evl ? evl->GetN() : eNum;
485 }
486 if (!eNum)
487 continue;
488 }
489 PDB(kPacketizer,2)
490 Info("TPacketizer", " --> next cur %lld", cur);
491
492 // Map non URL filenames to dummy host
493 TString host;
494 if ( !url.IsValid() ||
495 (strncmp(url.GetProtocol(),"root", 4) &&
496 strncmp(url.GetProtocol(),"file", 4)) ) {
497 host = "no-host";
498 } else if ( url.IsValid() && !strncmp(url.GetProtocol(),"file", 4)) {
499 host = "localhost";
500 url.SetProtocol("root");
501 } else {
502 host = url.GetHostFQDN();
503 }
504 // Get full name for local hosts
505 if (host.Contains("localhost") || host == "127.0.0.1") {
506 url.SetHost(gSystem->HostName());
507 host = url.GetHostFQDN();
508 }
509
510 TFileNode *node = (TFileNode*) fFileNodes->FindObject( host );
511
512 if ( node == 0 ) {
513 node = new TFileNode( host );
514 fFileNodes->Add( node );
515 }
516
517 ++files;
518 fTotalEntries += eNum;
519 node->Add(e);
520 PDB(kPacketizer,2) e->Print("a");
521 }
522
523 PDB(kPacketizer,1)
524 Info("TPacketizer", "processing %lld entries in %d files on %d hosts",
525 fTotalEntries, files, fFileNodes->GetSize());
526
527 // Set the total number for monitoring
528 if (gPerfStats)
529 gPerfStats->SetNumEvents(fTotalEntries);
530
531 Reset();
532
533 if (fFileNodes->GetSize() == 0) {
534 Info("TPacketizer", "no valid or non-empty file found: setting invalid");
535 // No valid files: set invalid and return
536 fValid = kFALSE;
537 return;
538 }
539
540 // Below we provide a possibility to change the way packet size is
541 // calculated or define the packet size directly.
542 // fPacketAsAFraction can be interpreted as follows:
543 // assuming all slaves have equal processing rate,
544 // packet size is (#events processed by 1 slave) / fPacketSizeAsAFraction.
545 // It substitutes 20 in the old formula to calculate the fPacketSize:
546 // fPacketSize = fTotalEntries / (20 * nslaves)
547 Long_t packetAsAFraction = 20;
548 if (TProof::GetParameter(input, "PROOF_PacketAsAFraction", packetAsAFraction) == 0)
549 Info("Process", "using alternate fraction of query time as a packet Size: %ld",
550 packetAsAFraction);
551 fPacketAsAFraction = (Int_t)packetAsAFraction;
552
553 fPacketSize = 1;
554 if (TProof::GetParameter(input, "PROOF_PacketSize", fPacketSize) == 0) {
555 Info("Process","using alternate packet size: %lld", fPacketSize);
556 } else {
557 // Heuristic for starting packet size
559 Int_t nslaves = fSlaveStats->GetSize();
560 if (nslaves > 0) {
562 if (fPacketSize < 1) fPacketSize = 1;
563 } else {
564 fPacketSize = 1;
565 }
566 }
567
568 PDB(kPacketizer,1) Info("TPacketizer", "Base Packetsize = %lld", fPacketSize);
569
570 if (!fValid)
572
573 PDB(kPacketizer,1) Info("TPacketizer", "Return");
574}
575
576////////////////////////////////////////////////////////////////////////////////
577/// Destructor.
578
580{
581 if (fSlaveStats) {
583 }
584
590}
591
592////////////////////////////////////////////////////////////////////////////////
593/// Adds new workers. Returns the number of workers added, or -1 on failure.
594
596{
597 if (!workers) {
598 Error("AddWorkers", "Null list of new workers!");
599 return -1;
600 }
601
602 Int_t curNumOfWrks = fSlaveStats->GetEntries();
603
604 TSlave *sl;
605 TIter next(workers);
606 while (( sl = dynamic_cast<TSlave*>(next()) ))
607 if (!fSlaveStats->FindObject(sl)) {
608 fSlaveStats->Add(sl, new TSlaveStat(sl));
610 }
611
612 // If heuristic (and new workers) set the packet size
613 Int_t nwrks = fSlaveStats->GetSize();
614 if (fHeuristicPSiz && nwrks > curNumOfWrks) {
615 if (nwrks > 0) {
617 if (fPacketSize < 1) fPacketSize = 1;
618 } else {
619 fPacketSize = 1;
620 }
621 }
622
623 // Update the max number that can access one file node if the default is used
624 if (fDefMaxWrkNode && nwrks > fMaxSlaveCnt) fMaxSlaveCnt = nwrks;
625
626 // Done
627 return nwrks;
628}
629
630////////////////////////////////////////////////////////////////////////////////
631/// Get next unallocated file.
632
634{
635 TFileStat *file = 0;
636
637 if (node != 0) {
638 file = node->GetNextUnAlloc();
639 if (file == 0) RemoveUnAllocNode(node);
640 } else {
641 while (file == 0 && ((node = NextUnAllocNode()) != 0)) {
642 file = node->GetNextUnAlloc();
643 if (file == 0) RemoveUnAllocNode(node);
644 }
645 }
646
647 if (file != 0) {
648 // if needed make node active
649 if (fActive->FindObject(node) == 0) {
650 fActive->Add(node);
651 }
652 }
653
654 return file;
655}
656
657////////////////////////////////////////////////////////////////////////////////
658/// Get next unallocated node.
659
661{
663 PDB(kPacketizer,2) {
664 std::cout << "TPacketizer::NextUnAllocNode()" << std::endl;
666 }
667
669 if (fn != 0 && fMaxSlaveCnt > 0 && fn->GetSlaveCnt() >= fMaxSlaveCnt) {
670 PDB(kPacketizer,1) Info("NextUnAllocNode", "reached workers per node limit (%ld)",
672 fn = 0;
673 }
674
675 return fn;
676}
677
678////////////////////////////////////////////////////////////////////////////////
679/// Remove unallocated node.
680
682{
683 fUnAllocated->Remove(node);
684}
685
686////////////////////////////////////////////////////////////////////////////////
687/// Get next active file.
688
690{
691 TFileNode *node;
692 TFileStat *file = 0;
693
694 while (file == 0 && ((node = NextActiveNode()) != 0)) {
695 file = node->GetNextActive();
696 if (file == 0) RemoveActiveNode(node);
697 }
698
699 return file;
700}
701
702////////////////////////////////////////////////////////////////////////////////
703/// Get next active node.
704
706{
707 fActive->Sort();
708 PDB(kPacketizer,2) {
709 Printf("TPacketizer::NextActiveNode : ----------------------");
710 fActive->Print();
711 }
712
713 TFileNode *fn = (TFileNode*) fActive->First();
714 if (fn != 0 && fMaxSlaveCnt > 0 && fn->GetSlaveCnt() >= fMaxSlaveCnt) {
715 PDB(kPacketizer,1)
716 Info("NextActiveNode", "reached workers per node limit (%ld)", fMaxSlaveCnt);
717 fn = 0;
718 }
719
720 return fn;
721}
722
723////////////////////////////////////////////////////////////////////////////////
724/// Remove file from the list of actives.
725
727{
728 TFileNode *node = file->GetNode();
729
730 node->RemoveActive(file);
731 if (node->GetNumberOfActiveFiles() == 0) RemoveActiveNode(node);
732}
733
734////////////////////////////////////////////////////////////////////////////////
735/// Remove node from the list of actives.
736
738{
739 fActive->Remove(node);
740}
741
742////////////////////////////////////////////////////////////////////////////////
743/// Reset the internal datastructure for packet distribution.
744
746{
749
750 fActive->Clear();
751
752 TIter files(fFileNodes);
753 TFileNode *fn;
754 while ((fn = (TFileNode*) files.Next()) != 0) {
755 fn->Reset();
756 }
757
758 TIter slaves(fSlaveStats);
759 TObject *key;
760 while ((key = slaves.Next()) != 0) {
761 TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue(key);
762 if (slstat) {
763 fn = (TFileNode*) fFileNodes->FindObject(slstat->GetName());
764 if (fn != 0 ) {
765 slstat->SetFileNode(fn);
766 fn->IncMySlaveCnt();
767 }
768 slstat->fCurFile = 0;
769 } else {
770 Warning("Reset", "TSlaveStat associated to key '%s' is NULL", key->GetName());
771 }
772 }
773}
774
775////////////////////////////////////////////////////////////////////////////////
776/// Check existence of file/dir/tree an get number of entries.
777/// Assumes the files have been setup.
778
779void TPacketizer::ValidateFiles(TDSet *dset, TList *slaves, Long64_t maxent, Bool_t byfile)
780{
781 TMap slaves_by_sock;
782 TMonitor mon;
783 TList workers;
784
785
786 // Setup the communication infrastructure
787
788 workers.AddAll(slaves);
789 TIter si(slaves);
790 TSlave *slm = 0;
791 while ((slm = (TSlave*)si.Next()) != 0) {
792 PDB(kPacketizer,3)
793 Info("ValidateFiles","socket added to monitor: %p (%s)",
794 slm->GetSocket(), slm->GetName());
795 mon.Add(slm->GetSocket());
796 slaves_by_sock.Add(slm->GetSocket(), slm);
797 PDB(kPacketizer,1)
798 Info("ValidateFiles",
799 "mon: %p, wrk: %p, sck: %p", &mon, slm, slm->GetSocket());
800 }
801
802 mon.DeActivateAll();
803
804 ((TProof*)gProof)->DeActivateAsyncInput();
805
806 // Some monitoring systems (TXSocketHandler) need to know this
807 ((TProof*)gProof)->fCurrentMonitor = &mon;
808
809 // Preparing for client notification
810 TString msg("Validating files");
811 UInt_t n = 0;
812 UInt_t tot = dset->GetListOfElements()->GetSize();
813 Bool_t st = kTRUE;
814
815 Long64_t totent = 0, nopenf = 0;
816 while (kTRUE) {
817
818 // send work
819 while( TSlave *s = (TSlave*)workers.First() ) {
820
821 workers.Remove(s);
822
823 // find a file
824
826 if (!slstat) {
827 Error("ValidateFiles", "TSlaveStat associated to slave '%s' is NULL", s->GetName());
828 continue;
829 }
830 TFileNode *node = 0;
831 TFileStat *file = 0;
832
833 // try its own node first
834 if ( (node = slstat->GetFileNode()) != 0 ) {
835 file = GetNextUnAlloc(node);
836 if ( file == 0 ) {
837 slstat->SetFileNode(0);
838 }
839 }
840
841 // look for a file on any other node if necessary
842 if (file == 0) {
844 }
845
846 if ( file != 0 ) {
847 // files are done right away
849
850 slstat->fCurFile = file;
851 TDSetElement *elem = file->GetElement();
852 Long64_t entries = elem->GetEntries(kTRUE, kFALSE);
853 if (entries < 0 || strlen(elem->GetTitle()) <= 0) {
854 // This is decremented when we get the reply
855 file->GetNode()->IncSlaveCnt(slstat->GetName());
857 m << dset->IsTree()
858 << TString(elem->GetFileName())
859 << TString(elem->GetDirectory())
860 << TString(elem->GetObjName());
861
862 s->GetSocket()->Send( m );
863 mon.Activate(s->GetSocket());
864 PDB(kPacketizer,2)
865 Info("ValidateFiles",
866 "sent to worker-%s (%s) via %p GETENTRIES on %s %s %s %s",
867 s->GetOrdinal(), s->GetName(), s->GetSocket(),
868 dset->IsTree() ? "tree" : "objects", elem->GetFileName(),
869 elem->GetDirectory(), elem->GetObjName());
870 } else {
871 // Fill the info
872 elem->SetTDSetOffset(entries);
873 if (entries > 0) {
874 // Most likely valid
875 elem->SetValid();
876 if (!elem->GetEntryList()) {
877 if (elem->GetFirst() > entries) {
878 Error("ValidateFiles",
879 "first (%lld) higher then number of entries (%lld) in %s",
880 elem->GetFirst(), entries, elem->GetFileName());
881 // disable element
882 slstat->fCurFile->SetDone();
883 elem->Invalidate();
885 }
886 if (elem->GetNum() == -1) {
887 elem->SetNum(entries - elem->GetFirst());
888 } else if (elem->GetFirst() + elem->GetNum() > entries) {
889 Warning("ValidateFiles", "num (%lld) + first (%lld) larger then number of"
890 " keys/entries (%lld) in %s", elem->GetNum(), elem->GetFirst(),
891 entries, elem->GetFileName());
892 elem->SetNum(entries - elem->GetFirst());
893 }
894 PDB(kPacketizer,2)
895 Info("ValidateFiles",
896 "found elem '%s' with %lld entries", elem->GetFileName(), entries);
897 }
898 }
899 // Notify the client
900 n++;
901 gProof->SendDataSetStatus(msg, n, tot, st);
902
903 // This worker is ready for the next validation
904 workers.Add(s);
905 }
906 }
907 }
908
909 // Check if there is anything to wait for
910 if (mon.GetActive() == 0) {
911 if (byfile && maxent > 0 && totent > 0) {
912 // How many files do we still need ?
913 Long64_t nrestf = (maxent - totent) * nopenf / totent ;
914 if (nrestf <= 0 && maxent > totent) nrestf = 1;
915 if (nrestf > 0) {
916 PDB(kPacketizer,3)
917 Info("ValidateFiles", "{%lld, %lld, %lld): needs to validate %lld more files",
918 maxent, totent, nopenf, nrestf);
919 si.Reset();
920 while ((slm = (TSlave *) si.Next()) && nrestf--) {
921 workers.Add(slm);
922 }
923 continue;
924 } else {
925 PDB(kPacketizer,3)
926 Info("ValidateFiles", "no need to validate more files");
927 break;
928 }
929 } else {
930 break;
931 }
932 }
933
934 PDB(kPacketizer,3) {
935 Info("ValidateFiles", "waiting for %d workers:", mon.GetActive());
936 TList *act = mon.GetListOfActives();
937 TIter next(act);
938 TSocket *s = 0;
939 while ((s = (TSocket*) next())) {
940 Info("ValidateFiles", "found sck: %p", s);
941 TSlave *sl = (TSlave *) slaves_by_sock.GetValue(s);
942 if (sl)
943 Info("ValidateFiles", " worker-%s (%s)", sl->GetOrdinal(), sl->GetName());
944 }
945 delete act;
946 }
947
948 TSocket *sock = mon.Select();
949 // If we have been interrupted break
950 if (!sock) {
951 Error("ValidateFiles", "selection has been interrupted - STOP");
952 mon.DeActivateAll();
953 fValid = kFALSE;
954 break;
955 }
956 mon.DeActivate(sock);
957
958 PDB(kPacketizer,3) Info("ValidateFiles", "select returned: %p", sock);
959
960 TSlave *slave = (TSlave *) slaves_by_sock.GetValue( sock );
961 if (!sock->IsValid()) {
962 // A socket got invalid during validation
963 Error("ValidateFiles", "worker-%s (%s) got invalid - STOP",
964 slave->GetOrdinal(), slave->GetName());
965 ((TProof*)gProof)->MarkBad(slave);
966 fValid = kFALSE;
967 break;
968 }
969
970 TMessage *reply;
971
972 if ( sock->Recv(reply) <= 0 ) {
973 // Help! lost a slave?
974 ((TProof*)gProof)->MarkBad(slave);
975 fValid = kFALSE;
976 Error("ValidateFiles", "Recv failed! for worker-%s (%s)",
977 slave->GetOrdinal(), slave->GetName());
978 continue;
979 }
980
981 if (reply->What() != kPROOF_GETENTRIES) {
982 // Not what we want: handover processing to the central machinery
983 Int_t what = reply->What();
984 ((TProof*)gProof)->HandleInputMessage(slave, reply);
985 if (what == kPROOF_FATAL) {
986 Error("ValidateFiles", "kPROOF_FATAL from worker-%s (%s)",
987 slave->GetOrdinal(), slave->GetName());
988 fValid = kFALSE;
989 } else {
990 // Reactivate the socket
991 mon.Activate(sock);
992 }
993 // Get next message
994 continue;
995 }
996
997 TSlaveStat *slavestat = (TSlaveStat*) fSlaveStats->GetValue( slave );
998 TDSetElement *e = slavestat->fCurFile->GetElement();
999 slavestat->fCurFile->GetNode()->DecSlaveCnt(slavestat->GetName());
1000 Long64_t entries;
1001
1002 (*reply) >> entries;
1003
1004 // Extract object name, if there
1005 if ((reply->BufferSize() > reply->Length())) {
1006 TString objname;
1007 (*reply) >> objname;
1008 e->SetTitle(objname);
1009 }
1010
1011 e->SetTDSetOffset(entries);
1012 if ( entries > 0 ) {
1013
1014 // This dataset element is most likely valid
1015 e->SetValid();
1016
1017 //if (!e->GetEventList()) {
1018 if (!e->GetEntryList()){
1019 if ( e->GetFirst() > entries ) {
1020 Error("ValidateFiles", "first (%lld) higher then number of entries (%lld) in %s",
1021 e->GetFirst(), entries, e->GetFileName());
1022
1023 // Invalidate the element
1024 slavestat->fCurFile->SetDone();
1025 e->Invalidate();
1027 }
1028
1029 if ( e->GetNum() == -1 ) {
1030 e->SetNum( entries - e->GetFirst() );
1031 } else if ( e->GetFirst() + e->GetNum() > entries ) {
1032 Error("ValidateFiles",
1033 "num (%lld) + first (%lld) larger then number of keys/entries (%lld) in %s",
1034 e->GetNum(), e->GetFirst(), entries, e->GetFileName());
1035 e->SetNum(entries - e->GetFirst());
1036 }
1037 }
1038
1039 // Count
1040 totent += entries;
1041 nopenf++;
1042
1043 // Notify the client
1044 n++;
1045 gProof->SendDataSetStatus(msg, n, tot, st);
1046
1047 } else {
1048
1049 Error("ValidateFiles", "cannot get entries for %s (", e->GetFileName() );
1050 //
1051 // Need to fix this with a user option to allow incomplete file sets (rdm)
1052 //
1053 //fValid = kFALSE; // all element must be readable!
1054 if (gProofServ) {
1056 m << TString(Form("Cannot get entries for file: %s - skipping", e->GetFileName()));
1058 }
1059
1060 // Invalidate the element
1061 e->Invalidate();
1063 }
1064 PDB(kPacketizer,3) Info("ValidateFiles", " %lld events validated", totent);
1065
1066 // Ready for the next job, unless we have enough files
1067 if (maxent < 0 || ((totent < maxent) && !byfile))
1068 workers.Add(slave);
1069 }
1070
1071 // report std. output from slaves??
1072
1073 ((TProof*)gProof)->ActivateAsyncInput();
1074
1075 // This needs to be reset
1076 ((TProof*)gProof)->fCurrentMonitor = 0;
1077
1078 // No reason to continue if invalid
1079 if (!fValid)
1080 return;
1081
1082
1083 // compute the offset for each file element
1084 Long64_t offset = 0;
1085 Long64_t newOffset = 0;
1086 TIter next(dset->GetListOfElements());
1087 TDSetElement *el;
1088 while ( (el = dynamic_cast<TDSetElement*> (next())) ) {
1089 newOffset = offset + el->GetTDSetOffset();
1090 el->SetTDSetOffset(offset);
1091 offset = newOffset;
1092 }
1093}
1094
1095////////////////////////////////////////////////////////////////////////////////
1096/// Get entries processed by the specified slave.
1097
1099{
1100 if ( fSlaveStats == 0 ) return 0;
1101
1102 TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue( slave );
1103
1104 if ( slstat == 0 ) return 0;
1105
1106 return slstat->GetEntriesProcessed();
1107}
1108
1109////////////////////////////////////////////////////////////////////////////////
1110/// Get Estimation of the current rate; just summing the current rates of
1111/// the active workers
1112
1114{
1115 all = kTRUE;
1116 // Loop over the workers
1117 Float_t currate = 0.;
1118 if (fSlaveStats && fSlaveStats->GetSize() > 0) {
1119 TIter nxw(fSlaveStats);
1120 TObject *key;
1121 while ((key = nxw()) != 0) {
1122 TSlaveStat *slstat = (TSlaveStat *) fSlaveStats->GetValue(key);
1123 if (slstat && slstat->GetProgressStatus() && slstat->GetEntriesProcessed() > 0) {
1124 // Sum-up the current rates
1125 currate += slstat->GetProgressStatus()->GetCurrentRate();
1126 } else {
1127 all = kFALSE;
1128 }
1129 }
1130 }
1131 // Done
1132 return currate;
1133}
1134
1135////////////////////////////////////////////////////////////////////////////////
1136/// Get next packet
1137
1139{
1140 if ( !fValid ) {
1141 return 0;
1142 }
1143
1144 // Find worker
1145
1146 TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue( sl );
1147
1148 R__ASSERT( slstat != 0 );
1149
1150 PDB(kPacketizer,1)
1151 Info("GetNextPacket","worker-%s (%s)", sl->GetOrdinal(), sl->GetName());
1152 // update stats & free old element
1153
1154 Bool_t firstPacket = kFALSE;
1155 if ( slstat->fCurElem != 0 ) {
1156 Double_t latency = 0., proctime = 0., proccpu = 0.;
1157 Long64_t bytesRead = -1;
1158 Long64_t totalEntries = -1;
1159 Long64_t totev = 0;
1160 Long64_t numev = slstat->fCurElem->GetNum();
1161
1162 fPackets->Add(slstat->fCurElem);
1163
1164 if (sl->GetProtocol() > 18) {
1165 TProofProgressStatus *status = 0;
1166 (*r) >> latency;
1167 (*r) >> status;
1168
1169 // Calculate the progress made in the last packet
1170 TProofProgressStatus *progress = 0;
1171 if (status) {
1172 // upadte the worker status
1173 numev = status->GetEntries() - slstat->GetEntriesProcessed();
1174 progress = slstat->AddProcessed(status);
1175 if (progress) {
1176 // (*fProgressStatus) += *progress;
1177 proctime = progress->GetProcTime();
1178 proccpu = progress->GetCPUTime();
1179 totev = status->GetEntries(); // for backward compatibility
1180 bytesRead = progress->GetBytesRead();
1181 delete progress;
1182 }
1183 delete status;
1184 } else
1185 Error("GetNextPacket", "no status came in the kPROOF_GETPACKET message");
1186 } else {
1187
1188 (*r) >> latency >> proctime >> proccpu;
1189
1190 // only read new info if available
1191 if (r->BufferSize() > r->Length()) (*r) >> bytesRead;
1192 if (r->BufferSize() > r->Length()) (*r) >> totalEntries;
1193 if (r->BufferSize() > r->Length()) (*r) >> totev;
1194
1195 numev = totev - slstat->GetEntriesProcessed();
1196 if (numev > 0) slstat->GetProgressStatus()->IncEntries(numev);
1197 if (bytesRead > 0) slstat->GetProgressStatus()->IncBytesRead(bytesRead);
1198 if (numev > 0 || bytesRead > 0) slstat->GetProgressStatus()->SetLastUpdate();
1199 }
1200
1201 if (fProgressStatus) {
1202 if (numev > 0) fProgressStatus->IncEntries(numev);
1203 if (bytesRead > 0) fProgressStatus->IncBytesRead(bytesRead);
1204 if (numev > 0 || bytesRead > 0) fProgressStatus->SetLastUpdate();
1205 }
1206 PDB(kPacketizer,2)
1207 Info("GetNextPacket","worker-%s (%s): %lld %7.3lf %7.3lf %7.3lf %lld",
1208 sl->GetOrdinal(), sl->GetName(),
1209 numev, latency, proctime, proccpu, bytesRead);
1210
1211 if (gPerfStats)
1212 gPerfStats->PacketEvent(sl->GetOrdinal(), sl->GetName(), slstat->fCurElem->GetFileName(),
1213 numev, latency, proctime, proccpu, bytesRead);
1214
1215 slstat->fCurElem = 0;
1217 HandleTimer(0); // Send last timer message
1218 delete fProgress; fProgress = 0;
1219 }
1220 } else {
1221 firstPacket = kTRUE;
1222 }
1223
1224 if ( fStop ) {
1225 HandleTimer(0);
1226 return 0;
1227 }
1228
1229 // get a file if needed
1230
1231 TFileStat *file = slstat->fCurFile;
1232
1233 if ( file != 0 && file->IsDone() ) {
1234 file->GetNode()->DecSlaveCnt(slstat->GetName());
1235 if (gPerfStats)
1236 gPerfStats->FileEvent(sl->GetOrdinal(), sl->GetName(), file->GetNode()->GetName(),
1237 file->GetElement()->GetFileName(), kFALSE);
1238 file = 0;
1239 }
1240 // Reset the current file field
1241 slstat->fCurFile = file;
1242
1243 if (!file) {
1244
1245 // Try its own node first
1246 if (slstat->GetFileNode() != 0) {
1247 file = GetNextUnAlloc(slstat->GetFileNode());
1248 if (!file) {
1249 slstat->SetFileNode(0);
1250 }
1251 }
1252
1253 // try to find an unused filenode first
1254 if (!file) {
1255 file = GetNextUnAlloc();
1256 }
1257
1258 // then look at the active filenodes
1259 if (!file) {
1260 file = GetNextActive();
1261 }
1262
1263 if (!file) return 0;
1264
1265 slstat->fCurFile = file;
1266 file->GetNode()->IncSlaveCnt(slstat->GetName());
1267 if (gPerfStats)
1268 gPerfStats->FileEvent(sl->GetOrdinal(), sl->GetName(),
1269 file->GetNode()->GetName(),
1270 file->GetElement()->GetFileName(), kTRUE);
1271 }
1272
1273 // get a packet
1274
1275 TDSetElement *base = file->GetElement();
1277 if (num < 1) num = 1;
1278
1279 Long64_t first = file->GetNextEntry();
1280 Long64_t last = base->GetFirst() + base->GetNum();
1281
1282 if ( first + num >= last ) {
1283 num = last - first;
1284 file->SetDone(); // done
1285
1286 // delete file from active list (unalloc list is single pass, no delete needed)
1288
1289 } else {
1290 file->MoveNextEntry(num);
1291 }
1292
1293
1294 slstat->fCurElem = CreateNewPacket(base, first, num);
1295 if (base->GetEntryList())
1296 slstat->fCurElem->SetEntryList(base->GetEntryList(), first, num);
1297
1298 // Flag the first packet of a new run (dataset)
1299 if (firstPacket)
1301 else
1303
1304 PDB(kPacketizer,2)
1305 Info("GetNextPacket","%s: %s %lld %lld", sl->GetOrdinal(), base->GetFileName(), first, num);
1306
1307 return slstat->fCurElem;
1308}
1309
1310////////////////////////////////////////////////////////////////////////////////
1311/// Return the number of workers still processing
1312
1314{
1315 Int_t actw = 0;
1316 TIter nxw(fSlaveStats);
1317 TObject *key;
1318 while ((key = nxw())) {
1319 TSlaveStat *wrkstat = (TSlaveStat *) fSlaveStats->GetValue(key);
1320 if (wrkstat && wrkstat->fCurFile) actw++;
1321 }
1322 // Done
1323 return actw;
1324}
@ kPROOF_FATAL
@ kPROOF_GETENTRIES
@ kPROOF_MESSAGE
ROOT::R::TRInterface & r
Definition Object.C:4
#define SafeDelete(p)
Definition RConfig.hxx:537
#define f(i)
Definition RSha256.hxx:104
#define e(i)
Definition RSha256.hxx:103
int Int_t
Definition RtypesCore.h:45
const Bool_t kFALSE
Definition RtypesCore.h:101
long Long_t
Definition RtypesCore.h:54
double Double_t
Definition RtypesCore.h:59
long long Long64_t
Definition RtypesCore.h:80
float Float_t
Definition RtypesCore.h:57
const Bool_t kTRUE
Definition RtypesCore.h:100
const char Option_t
Definition RtypesCore.h:66
#define ClassImp(name)
Definition Rtypes.h:364
R__EXTERN TEnv * gEnv
Definition TEnv.h:170
#define R__ASSERT(e)
Definition TError.h:118
char name[80]
Definition TGX11.cxx:110
#define PDB(mask, level)
Definition TProofDebug.h:56
R__EXTERN TProofServ * gProofServ
Definition TProofServ.h:347
R__EXTERN TProof * gProof
Definition TProof.h:1077
char * Form(const char *fmt,...)
void Printf(const char *fmt,...)
R__EXTERN TSystem * gSystem
Definition TSystem.h:559
#define gPerfStats
Int_t BufferSize() const
Definition TBuffer.h:98
Int_t Length() const
Definition TBuffer.h:100
virtual void Print(Option_t *option="") const
Default print for collections, calls Print(option, 1).
virtual void AddAll(const TCollection *col)
Add all objects from collection col to this collection.
virtual Int_t GetEntries() const
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
virtual Int_t GetSize() const
Return the capacity of the collection, i.e.
Manages an element of a TDSet.
Definition TDSet.h:66
Long64_t GetEntries(Bool_t istree=kTRUE, Bool_t openfile=kTRUE)
Returns number of entries in tree or objects in file.
Definition TDSet.cxx:429
const char * GetObjName() const
Definition TDSet.h:120
Long64_t GetNum() const
Definition TDSet.h:114
TObject * GetEntryList() const
Definition TDSet.h:131
void Invalidate()
Definition TDSet.h:134
void SetTDSetOffset(Long64_t offset)
Definition TDSet.h:129
void SetNum(Long64_t num)
Definition TDSet.h:118
const char * GetDirectory() const
Return directory where to look for object.
Definition TDSet.cxx:253
void SetValid()
Definition TDSet.h:135
void SetEntryList(TObject *aList, Long64_t first=-1, Long64_t num=-1)
Set entry (or event) list for this element.
Definition TDSet.cxx:599
Long64_t GetTDSetOffset() const
Definition TDSet.h:128
const char * GetFileName() const
Definition TDSet.h:111
Long64_t GetFirst() const
Definition TDSet.h:112
This class implements a data set to be used for PROOF processing.
Definition TDSet.h:153
virtual TDSetElement * Next(Long64_t totalEntries=-1)
Returns next TDSetElement.
Definition TDSet.cxx:413
virtual void Reset()
Reset or initialize access to the elements.
Definition TDSet.cxx:1369
Bool_t IsTree() const
Definition TDSet.h:225
TList * GetListOfElements() const
Definition TDSet.h:231
@ kSomeInvalid
Definition TDSet.h:161
A List of entry numbers in a TTree or TChain.
Definition TEntryList.h:26
virtual Long64_t GetN() const
Definition TEntryList.h:77
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
Definition TEnv.cxx:491
A TEventList object is a list of selected events (entries) in a TTree.
Definition TEventList.h:31
virtual Int_t GetN() const
Definition TEventList.h:56
TObject * Next()
void Reset()
A doubly linked list.
Definition TList.h:38
virtual void Add(TObject *obj)
Definition TList.h:81
virtual TObject * After(const TObject *obj) const
Returns the object after object obj.
Definition TList.cxx:330
virtual TObject * Remove(TObject *obj)
Remove object from the list.
Definition TList.cxx:822
virtual TObject * FindObject(const char *name) const
Find an object in this list using its name.
Definition TList.cxx:578
virtual TObject * First() const
Return the first object in the list. Returns 0 when list is empty.
Definition TList.cxx:659
virtual void Clear(Option_t *option="")
Remove all objects from the list.
Definition TList.cxx:402
virtual void Sort(Bool_t order=kSortAscending)
Sort linked list.
Definition TList.cxx:937
TMap implements an associative array of (key,value) pairs using a THashTable for efficient retrieval ...
Definition TMap.h:40
void Add(TObject *obj)
This function may not be used (but we need to provide it since it is a pure virtual in TCollection).
Definition TMap.cxx:54
void DeleteValues()
Remove all (key,value) pairs from the map AND delete the values when they are allocated on the heap.
Definition TMap.cxx:151
TObject * GetValue(const char *keyname) const
Returns a pointer to the value associated with keyname as name of the key.
Definition TMap.cxx:236
TObject * FindObject(const char *keyname) const
Check if a (key,value) pair exists with keyname as name of the key.
Definition TMap.cxx:215
UInt_t What() const
Definition TMessage.h:75
TSocket * Select()
Return pointer to socket for which an event is waiting.
Definition TMonitor.cxx:322
virtual void Activate(TSocket *sock)
Activate a de-activated socket.
Definition TMonitor.cxx:250
virtual void Add(TSocket *sock, Int_t interest=kRead)
Add socket to the monitor's active list.
Definition TMonitor.cxx:168
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
Definition TMonitor.cxx:438
virtual void DeActivateAll()
De-activate all activated sockets.
Definition TMonitor.cxx:302
virtual void DeActivate(TSocket *sock)
De-activate a socket.
Definition TMonitor.cxx:284
TList * GetListOfActives() const
Returns a list with all active sockets.
Definition TMonitor.cxx:498
virtual const char * GetTitle() const
Returns title of object.
Definition TNamed.h:48
Mother of all ROOT objects.
Definition TObject.h:41
virtual const char * GetName() const
Returns name of object.
Definition TObject.cxx:429
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition TObject.cxx:949
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
Definition TObject.cxx:766
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition TObject.cxx:963
void ResetBit(UInt_t f)
Definition TObject.h:200
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition TObject.cxx:937
Int_t GetSlaveCnt() const
void DecSlaveCnt(const char *slave)
TFileNode(const char *name)
TFileStat * GetNextActive()
void Print(Option_t *) const
This method must be overridden when a class wants to print itself.
void Add(TDSetElement *elem)
Int_t GetNumberOfActiveFiles() const
const char * GetName() const
Returns name of object.
Int_t Compare(const TObject *other) const
Compare abstract method.
TFileStat * GetNextUnAlloc()
void IncSlaveCnt(const char *slave)
Bool_t IsSortable() const
void RemoveActive(TFileStat *file)
TDSetElement * fElement
TFileStat(TFileNode *node, TDSetElement *elem)
Long64_t GetNextEntry() const
Bool_t IsDone() const
void MoveNextEntry(Long64_t step)
TFileNode * GetNode() const
TDSetElement * GetElement() const
TSlaveStat(TSlave *slave)
TProofProgressStatus * AddProcessed(TProofProgressStatus *st)
void SetFileNode(TFileNode *node)
TFileNode * GetFileNode() const
This class generates packets to be processed on PROOF worker servers.
Definition TPacketizer.h:39
TFileStat * GetNextActive()
Get next active file.
Long_t fMaxSlaveCnt
Definition TPacketizer.h:58
TFileNode * NextActiveNode()
Get next active node.
virtual ~TPacketizer()
Destructor.
void RemoveUnAllocNode(TFileNode *)
Remove unallocated node.
Bool_t fDefMaxWrkNode
Definition TPacketizer.h:69
Int_t fMaxPerfIdx
Definition TPacketizer.h:56
Int_t AddWorkers(TList *workers)
Adds new workers. Returns the number of workers added, or -1 on failure.
TFileNode * NextUnAllocNode()
Get next unallocated node.
TList * fPackets
Definition TPacketizer.h:47
Int_t GetActiveWorkers()
Return the number of workers still processing.
TList * fUnAllocated
Definition TPacketizer.h:50
Bool_t fHeuristicPSiz
Definition TPacketizer.h:68
void Reset()
Reset the internal datastructure for packet distribution.
Int_t fPacketAsAFraction
Definition TPacketizer.h:60
TList * fActive
Definition TPacketizer.h:51
TFileStat * GetNextUnAlloc(TFileNode *node=0)
Get next unallocated file.
void RemoveActiveNode(TFileNode *)
Remove node from the list of actives.
TDSetElement * GetNextPacket(TSlave *sl, TMessage *r)
Get next packet.
Float_t GetCurrentRate(Bool_t &all)
Get Estimation of the current rate; just summing the current rates of the active workers.
TList * fFileNodes
Definition TPacketizer.h:49
Long64_t fPacketSize
Definition TPacketizer.h:53
void ValidateFiles(TDSet *dset, TList *slaves, Long64_t maxent=-1, Bool_t byfile=kFALSE)
Check existence of file/dir/tree an get number of entries.
void RemoveActive(TFileStat *file)
Remove file from the list of actives.
Container class for processing statistics.
void SetLastUpdate(Double_t updtTime=0)
Update time stamp either with the passed value (if > 0) or with the current time.
Double_t GetProcTime() const
void IncBytesRead(Long64_t bytesRead)
Long64_t GetEntries() const
Double_t GetCurrentRate() const
Get current rate. Rteunr the average rate if the current is not defined.
void SetLastEntries(Long64_t entries)
Double_t GetCPUTime() const
void IncEntries(Long64_t entries=1)
Long64_t GetBytesRead() const
TSocket * GetSocket() const
Definition TProofServ.h:257
This class controls a Parallel ROOT Facility, PROOF, cluster.
Definition TProof.h:316
TObject * GetParameter(const char *par) const
Get specified parameter.
Definition TProof.cxx:9918
void SendDataSetStatus(const char *msg, UInt_t n, UInt_t tot, Bool_t st)
Send or notify data set status.
Definition TProof.cxx:9336
Class describing a PROOF worker server.
Definition TSlave.h:46
TSocket * GetSocket() const
Definition TSlave.h:134
Int_t GetProtocol() const
Definition TSlave.h:133
const char * GetName() const
Returns name of object.
Definition TSlave.h:124
const char * GetOrdinal() const
Definition TSlave.h:131
Int_t GetPerfIdx() const
Definition TSlave.h:132
virtual Int_t Recv(TMessage *&mess)
Receive a TMessage object.
Definition TSocket.cxx:818
virtual Bool_t IsValid() const
Definition TSocket.h:132
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
Definition TSocket.cxx:522
Basic string class.
Definition TString.h:136
const char * Data() const
Definition TString.h:369
Bool_t IsNull() const
Definition TString.h:407
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
Definition TString.h:624
virtual const char * HostName()
Return the system's host name.
Definition TSystem.cxx:306
This class represents a WWW compatible URL.
Definition TUrl.h:33
void SetProtocol(const char *proto, Bool_t setDefaultPort=kFALSE)
Set protocol and, optionally, change the port accordingly.
Definition TUrl.cxx:522
Bool_t IsValid() const
Definition TUrl.h:79
const char * GetHost() const
Definition TUrl.h:67
const char * GetHostFQDN() const
Return fully qualified domain name of url host.
Definition TUrl.cxx:471
void SetHost(const char *host)
Definition TUrl.h:84
const char * GetProtocol() const
Definition TUrl.h:64
const char * GetName() const
Returns name of object.
The packetizer is a load balancing object created for each query.
TProofProgressStatus * fProgressStatus
virtual Bool_t HandleTimer(TTimer *timer)
Send progress message to client.
Long64_t GetEntriesProcessed() const
TDSetElement * CreateNewPacket(TDSetElement *base, Long64_t first, Long64_t num)
Creates a new TDSetElement from from base packet starting from the first entry with num entries.
const Int_t n
Definition legend1.C:16
Definition file.py:1
Definition first.py:1
static const char * what
Definition stlLoader.cc:6
auto * m
Definition textangle.C:8