Logo ROOT   6.10/09
Reference Guide
TPacketizer.cxx
Go to the documentation of this file.
1 // @(#)root/proofplayer:$Id$
2 // Author: Maarten Ballintijn 18/03/02
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2002, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 /** \class TPacketizer
13 \ingroup proofkernel
14 
15 This class generates packets to be processed on PROOF worker servers.
16 A packet is an event range (begin entry and number of entries) or
17 object range (first object and number of objects) in a TTree
18 (entries) or a directory (objects) in a file.
19 Packets are generated taking into account the performance of the
20 remote machine, the time it took to process a previous packet on
21 the remote machine, the locality of the database files, etc.
22 
23 */
24 
25 #include "TPacketizer.h"
26 
27 #include "Riostream.h"
28 #include "TDSet.h"
29 #include "TEnv.h"
30 #include "TError.h"
31 #include "TEventList.h"
32 #include "TEntryList.h"
33 #include "TMap.h"
34 #include "TMessage.h"
35 #include "TMonitor.h"
36 #include "TNtupleD.h"
37 #include "TObject.h"
38 #include "TParameter.h"
39 #include "TPerfStats.h"
40 #include "TProofDebug.h"
41 #include "TProof.h"
42 #include "TProofPlayer.h"
43 #include "TProofServ.h"
44 #include "TSlave.h"
45 #include "TSocket.h"
46 #include "TTimer.h"
47 #include "TUrl.h"
48 #include "TClass.h"
49 #include "TMath.h"
50 #include "TObjString.h"
51 
52 //
53 // The following three utility classes manage the state of the
54 // work to be performed and the slaves involved in the process.
55 // A list of TFileNode(s) describes the hosts with files, each
56 // has a list of TFileStat(s) keeping the state for each TDSet
57 // element (file).
58 //
59 // The list of TSlaveStat(s) keep track of the work (being) done
60 // by each slave
61 //
62 
63 
64 //------------------------------------------------------------------------------
65 
66 class TPacketizer::TFileStat : public TObject {
67 
68 private:
69  Bool_t fIsDone; // is this element processed
70  TFileNode *fNode; // my FileNode
71  TDSetElement *fElement; // location of the file and its range
72  Long64_t fNextEntry; // cursor in the range, -1 when done
73 
74 public:
75  TFileStat(TFileNode *node, TDSetElement *elem);
76 
77  Bool_t IsDone() const {return fIsDone;}
78  void SetDone() {fIsDone = kTRUE;}
79  TFileNode *GetNode() const {return fNode;}
80  TDSetElement *GetElement() const {return fElement;}
81  Long64_t GetNextEntry() const {return fNextEntry;}
82  void MoveNextEntry(Long64_t step) {fNextEntry += step;}
83 };
84 
85 
86 TPacketizer::TFileStat::TFileStat(TFileNode *node, TDSetElement *elem)
87  : fIsDone(kFALSE), fNode(node), fElement(elem), fNextEntry(elem->GetFirst())
88 {
89 }
90 
91 
92 //------------------------------------------------------------------------------
93 
94 class TPacketizer::TFileNode : public TObject {
95 
96 private:
97  TString fNodeName; // FQDN of the node
98  TList *fFiles; // TDSetElements (files) stored on this node
99  TObject *fUnAllocFileNext; // cursor in fFiles
100  TList *fActFiles; // files with work remaining
101  TObject *fActFileNext; // cursor in fActFiles
102  Int_t fMySlaveCnt; // number of slaves running on this node
103  Int_t fSlaveCnt; // number of external slaves processing files on this node
104 
105 public:
106  TFileNode(const char *name);
107  ~TFileNode() { delete fFiles; delete fActFiles; }
108 
109  void IncMySlaveCnt() { fMySlaveCnt++; }
110  void IncSlaveCnt(const char *slave) { if (fNodeName != slave) fSlaveCnt++; }
111  void DecSlaveCnt(const char *slave) { if (fNodeName != slave) fSlaveCnt--; R__ASSERT(fSlaveCnt >= 0); }
112  Int_t GetSlaveCnt() const {return fMySlaveCnt + fSlaveCnt;}
113  Int_t GetNumberOfActiveFiles() const { return fActFiles->GetSize(); }
114  Bool_t IsSortable() const { return kTRUE; }
115 
116  const char *GetName() const { return fNodeName.Data(); }
117 
118  void Add(TDSetElement *elem)
119  {
120  TFileStat *f = new TFileStat(this,elem);
121  fFiles->Add(f);
122  if (fUnAllocFileNext == 0) fUnAllocFileNext = fFiles->First();
123  }
124 
125  TFileStat *GetNextUnAlloc()
126  {
127  TObject *next = fUnAllocFileNext;
128 
129  if (next != 0) {
130  // make file active
131  fActFiles->Add(next);
132  if (fActFileNext == 0) fActFileNext = fActFiles->First();
133 
134  // move cursor
135  fUnAllocFileNext = fFiles->After(fUnAllocFileNext);
136  }
137 
138  return (TFileStat *) next;
139  }
140 
141  TFileStat *GetNextActive()
142  {
143  TObject *next = fActFileNext;
144 
145  if (fActFileNext != 0) {
146  fActFileNext = fActFiles->After(fActFileNext);
147  if (fActFileNext == 0) fActFileNext = fActFiles->First();
148  }
149 
150  return (TFileStat *) next;
151  }
152 
153  void RemoveActive(TFileStat *file)
154  {
155  if (fActFileNext == file) fActFileNext = fActFiles->After(file);
156  fActFiles->Remove(file);
157  if (fActFileNext == 0) fActFileNext = fActFiles->First();
158  }
159 
160  Int_t Compare(const TObject *other) const
161  {
162  // Must return -1 if this is smaller than obj, 0 if objects are equal
163  // and 1 if this is larger than obj.
164  const TFileNode *obj = dynamic_cast<const TFileNode*>(other);
165  if (!obj) {
166  Error("Compare", "input is not a TPacketizer::TFileNode object");
167  return 0;
168  }
169 
170  Int_t myVal = GetSlaveCnt();
171  Int_t otherVal = obj->GetSlaveCnt();
172  if (myVal < otherVal) {
173  return -1;
174  } else if (myVal > otherVal) {
175  return 1;
176  } else {
177  return 0;
178  }
179  }
180 
181  void Print(Option_t *) const
182  {
183  std::cout << "OBJ: " << IsA()->GetName() << "\t" << fNodeName
184  << "\tMySlaveCount " << fMySlaveCnt
185  << "\tSlaveCount " << fSlaveCnt << std::endl;
186  }
187 
188  void Reset()
189  {
190  fUnAllocFileNext = fFiles->First();
191  fActFiles->Clear();
192  fActFileNext = 0;
193  fSlaveCnt = 0;
194  fMySlaveCnt = 0;
195  }
196 };
197 
198 
199 TPacketizer::TFileNode::TFileNode(const char *name)
200  : fNodeName(name), fFiles(new TList), fUnAllocFileNext(0),fActFiles(new TList),
201  fActFileNext(0), fMySlaveCnt(0), fSlaveCnt(0)
202 {
203  // Constructor
204 
205  fFiles->SetOwner();
206  fActFiles->SetOwner(kFALSE);
207 }
208 
209 
210 //------------------------------------------------------------------------------
211 
212 class TPacketizer::TSlaveStat : public TVirtualPacketizer::TVirtualSlaveStat {
213 
214 friend class TPacketizer;
215 
216 private:
217  TFileNode *fFileNode; // corresponding node or 0
218  TFileStat *fCurFile; // file currently being processed
219  TDSetElement *fCurElem; // TDSetElement currently being processed
221 public:
222  TSlaveStat(TSlave *slave);
223  ~TSlaveStat();
224 
225  TFileNode *GetFileNode() const { return fFileNode; }
226 
227  void SetFileNode(TFileNode *node) { fFileNode = node; }
228 };
229 
230 
231 TPacketizer::TSlaveStat::TSlaveStat(TSlave *slave)
232  : fFileNode(0), fCurFile(0), fCurElem(0)
233 {
234  fSlave = slave;
235  fStatus = new TProofProgressStatus();
236 }
237 
238 ////////////////////////////////////////////////////////////////////////////////
239 /// Cleanup
240 
241 TPacketizer::TSlaveStat::~TSlaveStat()
242 {
243  SafeDelete(fStatus);
244 }
245 
246 TProofProgressStatus *TPacketizer::TSlaveStat::AddProcessed(TProofProgressStatus *st)
247 {
248  // Update the status info to the 'st'.
249  // return the difference (*st - *fStatus)
250 
251  if (st) {
252  // The entriesis not correct in 'st'
253  Long64_t lastEntries = st->GetEntries() - fStatus->GetEntries();
254  // The last proc time should not be added
255  fStatus->SetLastProcTime(0.);
256  // Get the diff
257  TProofProgressStatus *diff = new TProofProgressStatus(*st - *fStatus);
258  *fStatus += *diff;
259  // Set the correct value
260  fStatus->SetLastEntries(lastEntries);
261  return diff;
262  } else {
263  Error("AddProcessed", "status arg undefined");
264  return 0;
265  }
266 }
267 
268 //------------------------------------------------------------------------------
269 
271 
272 ////////////////////////////////////////////////////////////////////////////////
273 /// Constructor
274 
276  Long64_t num, TList *input, TProofProgressStatus *st)
277  : TVirtualPacketizer(input, st)
278 {
279  PDB(kPacketizer,1) Info("TPacketizer", "Enter (first %lld, num %lld)", first, num);
280 
281  // Init pointer members
282  fPackets = 0;
283  fUnAllocated = 0;
284  fActive = 0;
285  fFileNodes = 0;
286  fMaxPerfIdx = 1;
287  fMaxSlaveCnt = 0;
290 
291  if (!fProgressStatus) {
292  Error("TPacketizer", "No progress status");
293  return;
294  }
295 
296  Long_t maxSlaveCnt = 0;
297  if (TProof::GetParameter(input, "PROOF_MaxSlavesPerNode", maxSlaveCnt) == 0) {
298  if (maxSlaveCnt < 0) {
299  Warning("TPacketizer", "PROOF_MaxSlavesPerNode must be positive");
300  maxSlaveCnt = 0;
301  }
302  if (maxSlaveCnt > 0) fDefMaxWrkNode = kFALSE;
303  } else {
304  // Try also with Int_t (recently supported in TProof::SetParameter)
305  Int_t mxslcnt = -1;
306  if (TProof::GetParameter(input, "PROOF_MaxSlavesPerNode", mxslcnt) == 0) {
307  if (mxslcnt < 0) {
308  Warning("TPacketizer", "PROOF_MaxSlavesPerNode must be positive");
309  mxslcnt = 0;
310  }
311  maxSlaveCnt = (Long_t) mxslcnt;
312  if (maxSlaveCnt > 0) fDefMaxWrkNode = kFALSE;
313  }
314  }
315  if (!maxSlaveCnt) {
316  maxSlaveCnt = gEnv->GetValue("Packetizer.MaxWorkersPerNode", slaves->GetSize());
317  if (maxSlaveCnt != slaves->GetSize()) fDefMaxWrkNode = kFALSE;
318  }
319  if (maxSlaveCnt > 0) {
320  fMaxSlaveCnt = maxSlaveCnt;
321  PDB(kPacketizer,1)
322  Info("TPacketizer", "setting max number of workers per node to %ld", fMaxSlaveCnt);
323  }
324 
325  fPackets = new TList;
326  fPackets->SetOwner();
327 
328  fFileNodes = new TList;
329  fFileNodes->SetOwner();
330  fUnAllocated = new TList;
332  fActive = new TList;
334 
335 
336  fValid = kTRUE;
337 
338  // Resolve end-point urls to optmize distribution
339  // dset->Lookup(); // moved to TProofPlayerRemote::Process
340 
341  // Split into per host entries
342  dset->Reset();
343  TDSetElement *e;
344  while ((e = (TDSetElement*)dset->Next())) {
345  if (e->GetValid()) continue;
346 
347  TUrl url = e->GetFileName();
348 
349  // Map non URL filenames to dummy host
350  TString host;
351  if ( !url.IsValid() ||
352  (strncmp(url.GetProtocol(),"root", 4) &&
353  strncmp(url.GetProtocol(),"rfio", 4) &&
354  strncmp(url.GetProtocol(),"file", 4)) ) {
355  host = "no-host";
356  } else if ( url.IsValid() && !strncmp(url.GetProtocol(),"file", 4)) {
357  host = "localhost";
358  url.SetProtocol("root");
359  } else {
360  host = url.GetHost();
361  }
362  // Get full name for local hosts
363  if (host.Contains("localhost") || host == "127.0.0.1") {
364  url.SetHost(gSystem->HostName());
365  host = url.GetHostFQDN();
366  }
367 
368  TFileNode *node = (TFileNode*) fFileNodes->FindObject( host );
369 
370  if (node == 0) {
371  node = new TFileNode(host);
372  fFileNodes->Add(node);
373  }
374 
375  node->Add( e );
376  }
377 
378  fSlaveStats = new TMap;
380 
381  // Record initial available workers
382  Int_t nwrks = AddWorkers(slaves);
383  Info("TPacketizer", "Initial number of workers: %d", nwrks);
384 
385  // Setup file & filenode structure
386  Reset();
387  // Optimize the number of files to be open when running on subsample
388  Int_t validateMode = 0;
389  Int_t gprc = TProof::GetParameter(input, "PROOF_ValidateByFile", validateMode);
390  Bool_t byfile = (gprc == 0 && validateMode > 0 && num > -1) ? kTRUE : kFALSE;
391  if (num > -1)
392  PDB(kPacketizer,2)
393  Info("TPacketizer",
394  "processing subset of entries: validating by file? %s", byfile ? "yes": "no");
395  ValidateFiles(dset, slaves, num, byfile);
396 
397  if (!fValid) return;
398 
399  // apply global range (first,num) to dset and rebuild structure
400  // ommitting TDSet elements that are not needed
401 
402  Int_t files = 0;
403  fTotalEntries = 0;
404  fUnAllocated->Clear(); // avoid dangling pointers
405  fActive->Clear();
406  fFileNodes->Clear(); // then delete all objects
407  PDB(kPacketizer,2) Info("TPacketizer", "processing range: first %lld, num %lld", first, num);
408 
409  dset->Reset();
410  Long64_t cur = 0;
411  while (( e = (TDSetElement*)dset->Next())) {
412 
413  // Skip invalid or missing file; It will be moved
414  // from the dset to the 'MissingFiles' list in the player.
415  if (!e->GetValid()) continue;
416 
417  // The dataset name, if any
418  if (fDataSet.IsNull() && e->GetDataSet() && strlen(e->GetDataSet()))
419  fDataSet = e->GetDataSet();
420 
421  TUrl url = e->GetFileName();
422  Long64_t eFirst = e->GetFirst();
423  Long64_t eNum = e->GetNum();
424  PDB(kPacketizer,2)
425  Info("TPacketizer", " --> '%s'", e->GetFileName());
426  PDB(kPacketizer,2)
427  Info("TPacketizer", " --> first %lld, num %lld (cur %lld)", eFirst, eNum, cur);
428 
429  if (!e->GetEntryList()){
430  // this element is before the start of the global range, skip it
431  if (cur + eNum < first) {
432  cur += eNum;
433  PDB(kPacketizer,2)
434  Info("TPacketizer", " --> skip element cur %lld", cur);
435  continue;
436  }
437 
438  // this element is after the end of the global range, skip it
439  if (num != -1 && (first+num <= cur)) {
440  cur += eNum;
441  PDB(kPacketizer,2)
442  Info("TPacketizer", " --> drop element cur %lld", cur);
443  continue; // break ??
444  }
445 
446  Bool_t inRange = kFALSE;
447  if (cur <= first || (num != -1 && (first+num <= cur+eNum))) {
448 
449  if (cur <= first) {
450  // If this element contains the start of the global range
451  // adjust its start and number of entries
452  e->SetFirst( eFirst + (first - cur) );
453  e->SetNum( e->GetNum() - (first - cur) );
454  PDB(kPacketizer,2)
455  Info("TPacketizer", " --> adjust start %lld and end %lld",
456  eFirst + (first - cur), first + num - cur);
457  inRange = kTRUE;
458  }
459  if (num != -1 && (first+num <= cur+eNum)) {
460  // If this element contains the end of the global range
461  // adjust its number of entries
462  e->SetNum( first + num - e->GetFirst() - cur );
463  PDB(kPacketizer,2)
464  Info("TPacketizer", " --> adjust end %lld", first + num - cur);
465  inRange = kTRUE;
466  }
467 
468  } else {
469  // Increment the counter ...
470  PDB(kPacketizer,2)
471  Info("TPacketizer", " --> increment 'cur' by %lld", eNum);
472  cur += eNum;
473  }
474  // Re-adjust eNum and cur, if needed
475  if (inRange) {
476  cur += eNum;
477  eNum = e->GetNum();
478  }
479 
480  } else {
481  TEntryList *enl = dynamic_cast<TEntryList *>(e->GetEntryList());
482  if (enl) {
483  eNum = enl->GetN();
484  } else {
485  TEventList *evl = dynamic_cast<TEventList *>(e->GetEntryList());
486  eNum = evl ? evl->GetN() : eNum;
487  }
488  if (!eNum)
489  continue;
490  }
491  PDB(kPacketizer,2)
492  Info("TPacketizer", " --> next cur %lld", cur);
493 
494  // Map non URL filenames to dummy host
495  TString host;
496  if ( !url.IsValid() ||
497  (strncmp(url.GetProtocol(),"root", 4) &&
498  strncmp(url.GetProtocol(),"rfio", 4) &&
499  strncmp(url.GetProtocol(),"file", 4)) ) {
500  host = "no-host";
501  } else if ( url.IsValid() && !strncmp(url.GetProtocol(),"file", 4)) {
502  host = "localhost";
503  url.SetProtocol("root");
504  } else {
505  host = url.GetHostFQDN();
506  }
507  // Get full name for local hosts
508  if (host.Contains("localhost") || host == "127.0.0.1") {
509  url.SetHost(gSystem->HostName());
510  host = url.GetHostFQDN();
511  }
512 
513  TFileNode *node = (TFileNode*) fFileNodes->FindObject( host );
514 
515  if ( node == 0 ) {
516  node = new TFileNode( host );
517  fFileNodes->Add( node );
518  }
519 
520  ++files;
521  fTotalEntries += eNum;
522  node->Add(e);
523  PDB(kPacketizer,2) e->Print("a");
524  }
525 
526  PDB(kPacketizer,1)
527  Info("TPacketizer", "processing %lld entries in %d files on %d hosts",
528  fTotalEntries, files, fFileNodes->GetSize());
529 
530  // Set the total number for monitoring
531  if (gPerfStats)
532  gPerfStats->SetNumEvents(fTotalEntries);
533 
534  Reset();
535 
536  if (fFileNodes->GetSize() == 0) {
537  Info("TPacketizer", "no valid or non-empty file found: setting invalid");
538  // No valid files: set invalid and return
539  fValid = kFALSE;
540  return;
541  }
542 
543  // Below we provide a possibility to change the way packet size is
544  // calculated or define the packet size directly.
545  // fPacketAsAFraction can be interpreted as follows:
546  // assuming all slaves have equal processing rate,
547  // packet size is (#events processed by 1 slave) / fPacketSizeAsAFraction.
548  // It substitutes 20 in the old formula to calculate the fPacketSize:
549  // fPacketSize = fTotalEntries / (20 * nslaves)
550  Long_t packetAsAFraction = 20;
551  if (TProof::GetParameter(input, "PROOF_PacketAsAFraction", packetAsAFraction) == 0)
552  Info("Process", "using alternate fraction of query time as a packet Size: %ld",
553  packetAsAFraction);
554  fPacketAsAFraction = (Int_t)packetAsAFraction;
555 
556  fPacketSize = 1;
557  if (TProof::GetParameter(input, "PROOF_PacketSize", fPacketSize) == 0) {
558  Info("Process","using alternate packet size: %lld", fPacketSize);
559  } else {
560  // Heuristic for starting packet size
562  Int_t nslaves = fSlaveStats->GetSize();
563  if (nslaves > 0) {
565  if (fPacketSize < 1) fPacketSize = 1;
566  } else {
567  fPacketSize = 1;
568  }
569  }
570 
571  PDB(kPacketizer,1) Info("TPacketizer", "Base Packetsize = %lld", fPacketSize);
572 
573  if (!fValid)
575 
576  PDB(kPacketizer,1) Info("TPacketizer", "Return");
577 }
578 
579 ////////////////////////////////////////////////////////////////////////////////
580 /// Destructor.
581 
583 {
584  if (fSlaveStats) {
586  }
587 
593 }
594 
595 ////////////////////////////////////////////////////////////////////////////////
596 /// Adds new workers. Returns the number of workers added, or -1 on failure.
597 
599 {
600  if (!workers) {
601  Error("AddWorkers", "Null list of new workers!");
602  return -1;
603  }
604 
605  Int_t curNumOfWrks = fSlaveStats->GetEntries();
606 
607  TSlave *sl;
608  TIter next(workers);
609  while (( sl = dynamic_cast<TSlave*>(next()) ))
610  if (!fSlaveStats->FindObject(sl)) {
611  fSlaveStats->Add(sl, new TSlaveStat(sl));
613  }
614 
615  // If heuristic (and new workers) set the packet size
616  Int_t nwrks = fSlaveStats->GetSize();
617  if (fHeuristicPSiz && nwrks > curNumOfWrks) {
618  if (nwrks > 0) {
620  if (fPacketSize < 1) fPacketSize = 1;
621  } else {
622  fPacketSize = 1;
623  }
624  }
625 
626  // Update the max number that can access one file node if the default is used
627  if (fDefMaxWrkNode && nwrks > fMaxSlaveCnt) fMaxSlaveCnt = nwrks;
628 
629  // Done
630  return nwrks;
631 }
632 
633 ////////////////////////////////////////////////////////////////////////////////
634 /// Get next unallocated file.
635 
636 TPacketizer::TFileStat *TPacketizer::GetNextUnAlloc(TFileNode *node)
637 {
638  TFileStat *file = 0;
639 
640  if (node != 0) {
641  file = node->GetNextUnAlloc();
642  if (file == 0) RemoveUnAllocNode(node);
643  } else {
644  while (file == 0 && ((node = NextUnAllocNode()) != 0)) {
645  file = node->GetNextUnAlloc();
646  if (file == 0) RemoveUnAllocNode(node);
647  }
648  }
649 
650  if (file != 0) {
651  // if needed make node active
652  if (fActive->FindObject(node) == 0) {
653  fActive->Add(node);
654  }
655  }
656 
657  return file;
658 }
659 
660 ////////////////////////////////////////////////////////////////////////////////
661 /// Get next unallocated node.
662 
663 TPacketizer::TFileNode *TPacketizer::NextUnAllocNode()
664 {
665  fUnAllocated->Sort();
666  PDB(kPacketizer,2) {
667  std::cout << "TPacketizer::NextUnAllocNode()" << std::endl;
668  fUnAllocated->Print();
669  }
670 
671  TFileNode *fn = (TFileNode*) fUnAllocated->First();
672  if (fn != 0 && fMaxSlaveCnt > 0 && fn->GetSlaveCnt() >= fMaxSlaveCnt) {
673  PDB(kPacketizer,1) Info("NextUnAllocNode", "reached workers per node limit (%ld)",
674  fMaxSlaveCnt);
675  fn = 0;
676  }
677 
678  return fn;
679 }
680 
681 ////////////////////////////////////////////////////////////////////////////////
682 /// Remove unallocated node.
683 
684 void TPacketizer::RemoveUnAllocNode(TFileNode * node)
685 {
686  fUnAllocated->Remove(node);
687 }
688 
689 ////////////////////////////////////////////////////////////////////////////////
690 /// Get next active file.
691 
692 TPacketizer::TFileStat *TPacketizer::GetNextActive()
693 {
694  TFileNode *node;
695  TFileStat *file = 0;
696 
697  while (file == 0 && ((node = NextActiveNode()) != 0)) {
698  file = node->GetNextActive();
699  if (file == 0) RemoveActiveNode(node);
700  }
701 
702  return file;
703 }
704 
705 ////////////////////////////////////////////////////////////////////////////////
706 /// Get next active node.
707 
708 TPacketizer::TFileNode *TPacketizer::NextActiveNode()
709 {
710  fActive->Sort();
711  PDB(kPacketizer,2) {
712  Printf("TPacketizer::NextActiveNode : ----------------------");
713  fActive->Print();
714  }
715 
716  TFileNode *fn = (TFileNode*) fActive->First();
717  if (fn != 0 && fMaxSlaveCnt > 0 && fn->GetSlaveCnt() >= fMaxSlaveCnt) {
718  PDB(kPacketizer,1)
719  Info("NextActiveNode", "reached workers per node limit (%ld)", fMaxSlaveCnt);
720  fn = 0;
721  }
722 
723  return fn;
724 }
725 
726 ////////////////////////////////////////////////////////////////////////////////
727 /// Remove file from the list of actives.
728 
729 void TPacketizer::RemoveActive(TFileStat *file)
730 {
731  TFileNode *node = file->GetNode();
732 
733  node->RemoveActive(file);
734  if (node->GetNumberOfActiveFiles() == 0) RemoveActiveNode(node);
735 }
736 
737 ////////////////////////////////////////////////////////////////////////////////
738 /// Remove node from the list of actives.
739 
740 void TPacketizer::RemoveActiveNode(TFileNode *node)
741 {
742  fActive->Remove(node);
743 }
744 
745 ////////////////////////////////////////////////////////////////////////////////
746 /// Reset the internal datastructure for packet distribution.
747 
749 {
750  fUnAllocated->Clear();
752 
753  fActive->Clear();
754 
755  TIter files(fFileNodes);
756  TFileNode *fn;
757  while ((fn = (TFileNode*) files.Next()) != 0) {
758  fn->Reset();
759  }
760 
761  TIter slaves(fSlaveStats);
762  TObject *key;
763  while ((key = slaves.Next()) != 0) {
764  TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue(key);
765  if (slstat) {
766  fn = (TFileNode*) fFileNodes->FindObject(slstat->GetName());
767  if (fn != 0 ) {
768  slstat->SetFileNode(fn);
769  fn->IncMySlaveCnt();
770  }
771  slstat->fCurFile = 0;
772  } else {
773  Warning("Reset", "TSlaveStat associated to key '%s' is NULL", key->GetName());
774  }
775  }
776 }
777 
778 ////////////////////////////////////////////////////////////////////////////////
779 /// Check existence of file/dir/tree an get number of entries.
780 /// Assumes the files have been setup.
781 
782 void TPacketizer::ValidateFiles(TDSet *dset, TList *slaves, Long64_t maxent, Bool_t byfile)
783 {
784  TMap slaves_by_sock;
785  TMonitor mon;
786  TList workers;
787 
788 
789  // Setup the communication infrastructure
790 
791  workers.AddAll(slaves);
792  TIter si(slaves);
793  TSlave *slm = 0;
794  while ((slm = (TSlave*)si.Next()) != 0) {
795  PDB(kPacketizer,3)
796  Info("ValidateFiles","socket added to monitor: %p (%s)",
797  slm->GetSocket(), slm->GetName());
798  mon.Add(slm->GetSocket());
799  slaves_by_sock.Add(slm->GetSocket(), slm);
800  PDB(kPacketizer,1)
801  Info("ValidateFiles",
802  "mon: %p, wrk: %p, sck: %p", &mon, slm, slm->GetSocket());
803  }
804 
805  mon.DeActivateAll();
806 
807  ((TProof*)gProof)->DeActivateAsyncInput();
808 
809  // Some monitoring systems (TXSocketHandler) need to know this
810  ((TProof*)gProof)->fCurrentMonitor = &mon;
811 
812  // Preparing for client notification
813  TString msg("Validating files");
814  UInt_t n = 0;
815  UInt_t tot = dset->GetListOfElements()->GetSize();
816  Bool_t st = kTRUE;
817 
818  Long64_t totent = 0, nopenf = 0;
819  while (kTRUE) {
820 
821  // send work
822  while( TSlave *s = (TSlave*)workers.First() ) {
823 
824  workers.Remove(s);
825 
826  // find a file
827 
828  TSlaveStat *slstat = (TSlaveStat*)fSlaveStats->GetValue(s);
829  if (!slstat) {
830  Error("ValidateFiles", "TSlaveStat associated to slave '%s' is NULL", s->GetName());
831  continue;
832  }
833  TFileNode *node = 0;
834  TFileStat *file = 0;
835 
836  // try its own node first
837  if ( (node = slstat->GetFileNode()) != 0 ) {
838  file = GetNextUnAlloc(node);
839  if ( file == 0 ) {
840  slstat->SetFileNode(0);
841  }
842  }
843 
844  // look for a file on any other node if necessary
845  if (file == 0) {
846  file = GetNextUnAlloc();
847  }
848 
849  if ( file != 0 ) {
850  // files are done right away
851  RemoveActive(file);
852 
853  slstat->fCurFile = file;
854  TDSetElement *elem = file->GetElement();
855  Long64_t entries = elem->GetEntries(kTRUE, kFALSE);
856  if (entries < 0 || strlen(elem->GetTitle()) <= 0) {
857  // This is decremented when we get the reply
858  file->GetNode()->IncSlaveCnt(slstat->GetName());
860  m << dset->IsTree()
861  << TString(elem->GetFileName())
862  << TString(elem->GetDirectory())
863  << TString(elem->GetObjName());
864 
865  s->GetSocket()->Send( m );
866  mon.Activate(s->GetSocket());
867  PDB(kPacketizer,2)
868  Info("ValidateFiles",
869  "sent to worker-%s (%s) via %p GETENTRIES on %s %s %s %s",
870  s->GetOrdinal(), s->GetName(), s->GetSocket(),
871  dset->IsTree() ? "tree" : "objects", elem->GetFileName(),
872  elem->GetDirectory(), elem->GetObjName());
873  } else {
874  // Fill the info
875  elem->SetTDSetOffset(entries);
876  if (entries > 0) {
877  // Most likely valid
878  elem->SetValid();
879  if (!elem->GetEntryList()) {
880  if (elem->GetFirst() > entries) {
881  Error("ValidateFiles",
882  "first (%lld) higher then number of entries (%lld) in %s",
883  elem->GetFirst(), entries, elem->GetFileName());
884  // disable element
885  slstat->fCurFile->SetDone();
886  elem->Invalidate();
888  }
889  if (elem->GetNum() == -1) {
890  elem->SetNum(entries - elem->GetFirst());
891  } else if (elem->GetFirst() + elem->GetNum() > entries) {
892  Warning("ValidateFiles", "num (%lld) + first (%lld) larger then number of"
893  " keys/entries (%lld) in %s", elem->GetNum(), elem->GetFirst(),
894  entries, elem->GetFileName());
895  elem->SetNum(entries - elem->GetFirst());
896  }
897  PDB(kPacketizer,2)
898  Info("ValidateFiles",
899  "found elem '%s' with %lld entries", elem->GetFileName(), entries);
900  }
901  }
902  // Notify the client
903  n++;
904  gProof->SendDataSetStatus(msg, n, tot, st);
905 
906  // This worker is ready for the next validation
907  workers.Add(s);
908  }
909  }
910  }
911 
912  // Check if there is anything to wait for
913  if (mon.GetActive() == 0) {
914  if (byfile && maxent > 0 && totent > 0) {
915  // How many files do we still need ?
916  Long64_t nrestf = (maxent - totent) * nopenf / totent ;
917  if (nrestf <= 0 && maxent > totent) nrestf = 1;
918  if (nrestf > 0) {
919  PDB(kPacketizer,3)
920  Info("ValidateFiles", "{%lld, %lld, %lld): needs to validate %lld more files",
921  maxent, totent, nopenf, nrestf);
922  si.Reset();
923  while ((slm = (TSlave *) si.Next()) && nrestf--) {
924  workers.Add(slm);
925  }
926  continue;
927  } else {
928  PDB(kPacketizer,3)
929  Info("ValidateFiles", "no need to validate more files");
930  break;
931  }
932  } else {
933  break;
934  }
935  }
936 
937  PDB(kPacketizer,3) {
938  Info("ValidateFiles", "waiting for %d workers:", mon.GetActive());
939  TList *act = mon.GetListOfActives();
940  TIter next(act);
941  TSocket *s = 0;
942  while ((s = (TSocket*) next())) {
943  Info("ValidateFiles", "found sck: %p", s);
944  TSlave *sl = (TSlave *) slaves_by_sock.GetValue(s);
945  if (sl)
946  Info("ValidateFiles", " worker-%s (%s)", sl->GetOrdinal(), sl->GetName());
947  }
948  delete act;
949  }
950 
951  TSocket *sock = mon.Select();
952  // If we have been interrupted break
953  if (!sock) {
954  Error("ValidateFiles", "selection has been interrupted - STOP");
955  mon.DeActivateAll();
956  fValid = kFALSE;
957  break;
958  }
959  mon.DeActivate(sock);
960 
961  PDB(kPacketizer,3) Info("ValidateFiles", "select returned: %p", sock);
962 
963  TSlave *slave = (TSlave *) slaves_by_sock.GetValue( sock );
964  if (!sock->IsValid()) {
965  // A socket got invalid during validation
966  Error("ValidateFiles", "worker-%s (%s) got invalid - STOP",
967  slave->GetOrdinal(), slave->GetName());
968  ((TProof*)gProof)->MarkBad(slave);
969  fValid = kFALSE;
970  break;
971  }
972 
973  TMessage *reply;
974 
975  if ( sock->Recv(reply) <= 0 ) {
976  // Help! lost a slave?
977  ((TProof*)gProof)->MarkBad(slave);
978  fValid = kFALSE;
979  Error("ValidateFiles", "Recv failed! for worker-%s (%s)",
980  slave->GetOrdinal(), slave->GetName());
981  continue;
982  }
983 
984  if (reply->What() != kPROOF_GETENTRIES) {
985  // Not what we want: handover processing to the central machinery
986  Int_t what = reply->What();
987  ((TProof*)gProof)->HandleInputMessage(slave, reply);
988  if (what == kPROOF_FATAL) {
989  Error("ValidateFiles", "kPROOF_FATAL from worker-%s (%s)",
990  slave->GetOrdinal(), slave->GetName());
991  fValid = kFALSE;
992  } else {
993  // Reactivate the socket
994  mon.Activate(sock);
995  }
996  // Get next message
997  continue;
998  }
999 
1000  TSlaveStat *slavestat = (TSlaveStat*) fSlaveStats->GetValue( slave );
1001  TDSetElement *e = slavestat->fCurFile->GetElement();
1002  slavestat->fCurFile->GetNode()->DecSlaveCnt(slavestat->GetName());
1003  Long64_t entries;
1004 
1005  (*reply) >> entries;
1006 
1007  // Extract object name, if there
1008  if ((reply->BufferSize() > reply->Length())) {
1009  TString objname;
1010  (*reply) >> objname;
1011  e->SetTitle(objname);
1012  }
1013 
1014  e->SetTDSetOffset(entries);
1015  if ( entries > 0 ) {
1016 
1017  // This dataset element is most likely valid
1018  e->SetValid();
1019 
1020  //if (!e->GetEventList()) {
1021  if (!e->GetEntryList()){
1022  if ( e->GetFirst() > entries ) {
1023  Error("ValidateFiles", "first (%lld) higher then number of entries (%lld) in %s",
1024  e->GetFirst(), entries, e->GetFileName());
1025 
1026  // Invalidate the element
1027  slavestat->fCurFile->SetDone();
1028  e->Invalidate();
1029  dset->SetBit(TDSet::kSomeInvalid);
1030  }
1031 
1032  if ( e->GetNum() == -1 ) {
1033  e->SetNum( entries - e->GetFirst() );
1034  } else if ( e->GetFirst() + e->GetNum() > entries ) {
1035  Error("ValidateFiles",
1036  "num (%lld) + first (%lld) larger then number of keys/entries (%lld) in %s",
1037  e->GetNum(), e->GetFirst(), entries, e->GetFileName());
1038  e->SetNum(entries - e->GetFirst());
1039  }
1040  }
1041 
1042  // Count
1043  totent += entries;
1044  nopenf++;
1045 
1046  // Notify the client
1047  n++;
1048  gProof->SendDataSetStatus(msg, n, tot, st);
1049 
1050  } else {
1051 
1052  Error("ValidateFiles", "cannot get entries for %s (", e->GetFileName() );
1053  //
1054  // Need to fix this with a user option to allow incomplete file sets (rdm)
1055  //
1056  //fValid = kFALSE; // all element must be readable!
1057  if (gProofServ) {
1059  m << TString(Form("Cannot get entries for file: %s - skipping", e->GetFileName()));
1060  gProofServ->GetSocket()->Send(m);
1061  }
1062 
1063  // Invalidate the element
1064  e->Invalidate();
1065  dset->SetBit(TDSet::kSomeInvalid);
1066  }
1067  PDB(kPacketizer,3) Info("ValidateFiles", " %lld events validated", totent);
1068 
1069  // Ready for the next job, unless we have enough files
1070  if (maxent < 0 || ((totent < maxent) && !byfile))
1071  workers.Add(slave);
1072  }
1073 
1074  // report std. output from slaves??
1075 
1076  ((TProof*)gProof)->ActivateAsyncInput();
1077 
1078  // This needs to be reset
1079  ((TProof*)gProof)->fCurrentMonitor = 0;
1080 
1081  // No reason to continue if invalid
1082  if (!fValid)
1083  return;
1084 
1085 
1086  // compute the offset for each file element
1087  Long64_t offset = 0;
1088  Long64_t newOffset = 0;
1089  TIter next(dset->GetListOfElements());
1090  TDSetElement *el;
1091  while ( (el = dynamic_cast<TDSetElement*> (next())) ) {
1092  newOffset = offset + el->GetTDSetOffset();
1093  el->SetTDSetOffset(offset);
1094  offset = newOffset;
1095  }
1096 }
1097 
1098 ////////////////////////////////////////////////////////////////////////////////
1099 /// Get entries processed by the specified slave.
1100 
1102 {
1103  if ( fSlaveStats == 0 ) return 0;
1104 
1105  TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue( slave );
1106 
1107  if ( slstat == 0 ) return 0;
1108 
1109  return slstat->GetEntriesProcessed();
1110 }
1111 
1112 ////////////////////////////////////////////////////////////////////////////////
1113 /// Get Estimation of the current rate; just summing the current rates of
1114 /// the active workers
1115 
1117 {
1118  all = kTRUE;
1119  // Loop over the workers
1120  Float_t currate = 0.;
1121  if (fSlaveStats && fSlaveStats->GetSize() > 0) {
1122  TIter nxw(fSlaveStats);
1123  TObject *key;
1124  while ((key = nxw()) != 0) {
1125  TSlaveStat *slstat = (TSlaveStat *) fSlaveStats->GetValue(key);
1126  if (slstat && slstat->GetProgressStatus() && slstat->GetEntriesProcessed() > 0) {
1127  // Sum-up the current rates
1128  currate += slstat->GetProgressStatus()->GetCurrentRate();
1129  } else {
1130  all = kFALSE;
1131  }
1132  }
1133  }
1134  // Done
1135  return currate;
1136 }
1137 
1138 ////////////////////////////////////////////////////////////////////////////////
1139 /// Get next packet
1140 
1142 {
1143  if ( !fValid ) {
1144  return 0;
1145  }
1146 
1147  // Find worker
1148 
1149  TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue( sl );
1150 
1151  R__ASSERT( slstat != 0 );
1152 
1153  PDB(kPacketizer,1)
1154  Info("GetNextPacket","worker-%s (%s)", sl->GetOrdinal(), sl->GetName());
1155  // update stats & free old element
1156 
1157  Bool_t firstPacket = kFALSE;
1158  if ( slstat->fCurElem != 0 ) {
1159  Double_t latency = 0., proctime = 0., proccpu = 0.;
1160  Long64_t bytesRead = -1;
1161  Long64_t totalEntries = -1;
1162  Long64_t totev = 0;
1163  Long64_t numev = slstat->fCurElem->GetNum();
1164 
1165  fPackets->Add(slstat->fCurElem);
1166 
1167  if (sl->GetProtocol() > 18) {
1168  TProofProgressStatus *status = 0;
1169  (*r) >> latency;
1170  (*r) >> status;
1171 
1172  // Calculate the progress made in the last packet
1173  TProofProgressStatus *progress = 0;
1174  if (status) {
1175  // upadte the worker status
1176  numev = status->GetEntries() - slstat->GetEntriesProcessed();
1177  progress = slstat->AddProcessed(status);
1178  if (progress) {
1179  // (*fProgressStatus) += *progress;
1180  proctime = progress->GetProcTime();
1181  proccpu = progress->GetCPUTime();
1182  totev = status->GetEntries(); // for backward compatibility
1183  bytesRead = progress->GetBytesRead();
1184  delete progress;
1185  }
1186  delete status;
1187  } else
1188  Error("GetNextPacket", "no status came in the kPROOF_GETPACKET message");
1189  } else {
1190 
1191  (*r) >> latency >> proctime >> proccpu;
1192 
1193  // only read new info if available
1194  if (r->BufferSize() > r->Length()) (*r) >> bytesRead;
1195  if (r->BufferSize() > r->Length()) (*r) >> totalEntries;
1196  if (r->BufferSize() > r->Length()) (*r) >> totev;
1197 
1198  numev = totev - slstat->GetEntriesProcessed();
1199  if (numev > 0) slstat->GetProgressStatus()->IncEntries(numev);
1200  if (bytesRead > 0) slstat->GetProgressStatus()->IncBytesRead(bytesRead);
1201  if (numev > 0 || bytesRead > 0) slstat->GetProgressStatus()->SetLastUpdate();
1202  }
1203 
1204  if (fProgressStatus) {
1205  if (numev > 0) fProgressStatus->IncEntries(numev);
1206  if (bytesRead > 0) fProgressStatus->IncBytesRead(bytesRead);
1207  if (numev > 0 || bytesRead > 0) fProgressStatus->SetLastUpdate();
1208  }
1209  PDB(kPacketizer,2)
1210  Info("GetNextPacket","worker-%s (%s): %lld %7.3lf %7.3lf %7.3lf %lld",
1211  sl->GetOrdinal(), sl->GetName(),
1212  numev, latency, proctime, proccpu, bytesRead);
1213 
1214  if (gPerfStats)
1215  gPerfStats->PacketEvent(sl->GetOrdinal(), sl->GetName(), slstat->fCurElem->GetFileName(),
1216  numev, latency, proctime, proccpu, bytesRead);
1217 
1218  slstat->fCurElem = 0;
1220  HandleTimer(0); // Send last timer message
1221  delete fProgress; fProgress = 0;
1222  }
1223  } else {
1224  firstPacket = kTRUE;
1225  }
1226 
1227  if ( fStop ) {
1228  HandleTimer(0);
1229  return 0;
1230  }
1231 
1232  // get a file if needed
1233 
1234  TFileStat *file = slstat->fCurFile;
1235 
1236  if ( file != 0 && file->IsDone() ) {
1237  file->GetNode()->DecSlaveCnt(slstat->GetName());
1238  if (gPerfStats)
1239  gPerfStats->FileEvent(sl->GetOrdinal(), sl->GetName(), file->GetNode()->GetName(),
1240  file->GetElement()->GetFileName(), kFALSE);
1241  file = 0;
1242  }
1243  // Reset the current file field
1244  slstat->fCurFile = file;
1245 
1246  if (!file) {
1247 
1248  // Try its own node first
1249  if (slstat->GetFileNode() != 0) {
1250  file = GetNextUnAlloc(slstat->GetFileNode());
1251  if (!file) {
1252  slstat->SetFileNode(0);
1253  }
1254  }
1255 
1256  // try to find an unused filenode first
1257  if (!file) {
1258  file = GetNextUnAlloc();
1259  }
1260 
1261  // then look at the active filenodes
1262  if (!file) {
1263  file = GetNextActive();
1264  }
1265 
1266  if (!file) return 0;
1267 
1268  slstat->fCurFile = file;
1269  file->GetNode()->IncSlaveCnt(slstat->GetName());
1270  if (gPerfStats)
1271  gPerfStats->FileEvent(sl->GetOrdinal(), sl->GetName(),
1272  file->GetNode()->GetName(),
1273  file->GetElement()->GetFileName(), kTRUE);
1274  }
1275 
1276  // get a packet
1277 
1278  TDSetElement *base = file->GetElement();
1279  Long64_t num = Long64_t(fPacketSize*(Float_t)slstat->fSlave->GetPerfIdx()/fMaxPerfIdx);
1280  if (num < 1) num = 1;
1281 
1282  Long64_t first = file->GetNextEntry();
1283  Long64_t last = base->GetFirst() + base->GetNum();
1284 
1285  if ( first + num >= last ) {
1286  num = last - first;
1287  file->SetDone(); // done
1288 
1289  // delete file from active list (unalloc list is single pass, no delete needed)
1290  RemoveActive(file);
1291 
1292  } else {
1293  file->MoveNextEntry(num);
1294  }
1295 
1296 
1297  slstat->fCurElem = CreateNewPacket(base, first, num);
1298  if (base->GetEntryList())
1299  slstat->fCurElem->SetEntryList(base->GetEntryList(), first, num);
1300 
1301  // Flag the first packet of a new run (dataset)
1302  if (firstPacket)
1303  slstat->fCurElem->SetBit(TDSetElement::kNewRun);
1304  else
1305  slstat->fCurElem->ResetBit(TDSetElement::kNewRun);
1306 
1307  PDB(kPacketizer,2)
1308  Info("GetNextPacket","%s: %s %lld %lld", sl->GetOrdinal(), base->GetFileName(), first, num);
1309 
1310  return slstat->fCurElem;
1311 }
1312 
1313 ////////////////////////////////////////////////////////////////////////////////
1314 /// Return the number of workers still processing
1315 
1317 {
1318  Int_t actw = 0;
1319  TIter nxw(fSlaveStats);
1320  TObject *key;
1321  while ((key = nxw())) {
1322  TSlaveStat *wrkstat = (TSlaveStat *) fSlaveStats->GetValue(key);
1323  if (wrkstat && wrkstat->fCurFile) actw++;
1324  }
1325  // Done
1326  return actw;
1327 }
virtual Int_t AddProcessed(TSlave *, TProofProgressStatus *, Double_t, TList **)
virtual Bool_t IsValid() const
Definition: TSocket.h:146
Long64_t GetTDSetOffset() const
Definition: TDSet.h:128
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition: TObject.cxx:847
Long64_t GetEntries(Bool_t istree=kTRUE, Bool_t openfile=kTRUE)
Returns number of entries in tree or objects in file.
Definition: TDSet.cxx:410
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
Definition: TMonitor.cxx:438
long long Long64_t
Definition: RtypesCore.h:69
virtual Long64_t GetN() const
Definition: TEntryList.h:75
Int_t fPacketAsAFraction
Definition: TPacketizer.h:60
void SetProtocol(const char *proto, Bool_t setDefaultPort=kFALSE)
Set protocol and, optionally, change the port accordingly.
Definition: TUrl.cxx:520
float Float_t
Definition: RtypesCore.h:53
const char Option_t
Definition: RtypesCore.h:62
Int_t GetActiveWorkers()
Return the number of workers still processing.
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
Definition: TSocket.cxx:520
void SetTDSetOffset(Long64_t offset)
Definition: TDSet.h:129
This class represents a WWW compatible URL.
Definition: TUrl.h:35
TObject * GetParameter(const char *par) const
Get specified parameter.
Definition: TProof.cxx:9890
virtual Int_t Recv(TMessage *&mess)
Receive a TMessage object.
Definition: TSocket.cxx:818
const char * GetProtocol() const
Definition: TUrl.h:67
This class implements a data set to be used for PROOF processing.
Definition: TDSet.h:151
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
Bool_t GetValid() const
Definition: TDSet.h:119
TSocket * GetSocket() const
Definition: TProofServ.h:257
Double_t GetProcTime() const
virtual Int_t GetEntries() const
Definition: TCollection.h:86
virtual void AddAll(const TCollection *col)
Add all objects from collection col to this collection.
Definition: TCollection.cxx:58
void SetLastUpdate(Double_t updtTime=0)
Update time stamp either with the passed value (if > 0) or with the current time. ...
void Add(TObject *obj)
This function may not be used (but we need to provide it since it is a pure virtual in TCollection)...
Definition: TMap.cxx:53
#define R__ASSERT(e)
Definition: TError.h:96
virtual void Add(TSocket *sock, Int_t interest=kRead)
Add socket to the monitor&#39;s active list.
Definition: TMonitor.cxx:168
Long64_t GetFirst() const
Definition: TDSet.h:112
const char * GetOrdinal() const
Definition: TSlave.h:131
Basic string class.
Definition: TString.h:129
TList * fPackets
Definition: TPacketizer.h:44
Float_t GetCurrentRate(Bool_t &all)
Get Estimation of the current rate; just summing the current rates of the active workers.
void RemoveActiveNode(TFileNode *)
Remove node from the list of actives.
int Int_t
Definition: RtypesCore.h:41
bool Bool_t
Definition: RtypesCore.h:59
virtual ~TPacketizer()
Destructor.
virtual void Print(Option_t *option="") const
This method must be overridden when a class wants to print itself.
Definition: TObject.cxx:543
const char * GetName() const
Returns name of object.
Definition: TSlave.h:124
virtual void DeActivateAll()
De-activate all activated sockets.
Definition: TMonitor.cxx:302
Long64_t GetEntries() const
Int_t GetProtocol() const
Definition: TSlave.h:133
Bool_t IsValid() const
Definition: TUrl.h:82
TList * GetListOfElements() const
Definition: TDSet.h:229
void Reset()
Definition: TCollection.h:156
const char * GetHostFQDN() const
Return fully qualified domain name of url host.
Definition: TUrl.cxx:469
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
Definition: TObject.cxx:687
virtual TObject * FindObject(const char *name) const
Find an object in this list using its name.
Definition: TList.cxx:501
Int_t Length() const
Definition: TBuffer.h:94
virtual void Sort(Bool_t order=kSortAscending)
Sort linked list.
Definition: TList.cxx:775
This class generates packets to be processed on PROOF worker servers.
Definition: TPacketizer.h:39
Manages an element of a TDSet.
Definition: TDSet.h:66
const char * GetHost() const
Definition: TUrl.h:70
#define SafeDelete(p)
Definition: RConfig.h:499
TDSetElement * GetNextPacket(TSlave *sl, TMessage *r)
Get next packet.
TDSetElement * CreateNewPacket(TDSetElement *base, Long64_t first, Long64_t num)
Creates a new TDSetElement from from base packet starting from the first entry with num entries...
Long64_t GetNum() const
Definition: TDSet.h:114
#define PDB(mask, level)
Definition: TProofDebug.h:56
Int_t GetPerfIdx() const
Definition: TSlave.h:132
TSocket * GetSocket() const
Definition: TSlave.h:134
virtual void DeActivate(TSocket *sock)
De-activate a socket.
Definition: TMonitor.cxx:284
void IncEntries(Long64_t entries=1)
void IncBytesRead(Long64_t bytesRead)
virtual Bool_t HandleTimer(TTimer *timer)
Execute action in response of a timer timing out.
Definition: TObject.cxx:418
virtual Int_t GetN() const
Definition: TEventList.h:56
const char * GetDirectory() const
Return directory where to look for object.
Definition: TDSet.cxx:234
Long64_t fPacketSize
Definition: TPacketizer.h:53
TList * fFileNodes
Definition: TPacketizer.h:49
void Reset()
Reset the internal datastructure for packet distribution.
TSocket * Select()
Return pointer to socket for which an event is waiting.
Definition: TMonitor.cxx:322
TProofProgressStatus * fProgressStatus
A doubly linked list.
Definition: TList.h:43
const char * GetObjName() const
Definition: TDSet.h:120
TList * fUnAllocated
Definition: TPacketizer.h:50
void SetLastEntries(Long64_t entries)
void RemoveUnAllocNode(TFileNode *)
Remove unallocated node.
void SendDataSetStatus(const char *msg, UInt_t n, UInt_t tot, Bool_t st)
Send or notify data set status.
Definition: TProof.cxx:9308
void RemoveActive(TFileStat *file)
Remove file from the list of actives.
const char * GetName() const
Return static name for TOutputListSelectorDataMap objects.
Bool_t IsTree() const
Definition: TDSet.h:223
virtual TObject * First() const
Return the first object in the list. Returns 0 when list is empty.
Definition: TList.cxx:561
TRandom2 r(17)
virtual Bool_t IsSortable() const
Definition: TObject.h:120
R__EXTERN TSystem * gSystem
Definition: TSystem.h:539
Int_t fMaxPerfIdx
Definition: TPacketizer.h:56
virtual Int_t GetValue(const char *name, Int_t dflt)
Returns the integer value for a resource.
Definition: TEnv.cxx:482
virtual TObject * Remove(TObject *obj)
Remove object from the list.
Definition: TList.cxx:679
TObject * Next()
Definition: TCollection.h:153
TList * GetListOfActives() const
Returns a list with all active sockets.
Definition: TMonitor.cxx:498
unsigned int UInt_t
Definition: RtypesCore.h:42
TMarker * m
Definition: textangle.C:8
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition: TObject.cxx:873
char * Form(const char *fmt,...)
void Invalidate()
Definition: TDSet.h:134
TFileNode * NextUnAllocNode()
Get next unallocated node.
A TEventList object is a list of selected events (entries) in a TTree.
Definition: TEventList.h:31
TFileStat * GetNextUnAlloc(TFileNode *node=0)
Get next unallocated file.
virtual TObject * After(const TObject *obj) const
Returns the object after object obj.
Definition: TList.cxx:293
virtual Int_t Compare(const TObject *obj) const
Compare abstract method.
Definition: TObject.cxx:166
void Reset(Detail::TBranchProxy *x)
virtual void Activate(TSocket *sock)
Activate a de-activated socket.
Definition: TMonitor.cxx:250
void Print(Option_t *options="") const
Print a TDSetElement. When option="a" print full data.
Definition: TDSet.cxx:242
Long_t fMaxSlaveCnt
Definition: TPacketizer.h:58
void DeleteValues()
Remove all (key,value) pairs from the map AND delete the values when they are allocated on the heap...
Definition: TMap.cxx:150
#define Printf
Definition: TGeoToOCC.h:18
#define gPerfStats
const Bool_t kFALSE
Definition: RtypesCore.h:92
UInt_t What() const
Definition: TMessage.h:74
void SetHost(const char *host)
Definition: TUrl.h:87
long Long_t
Definition: RtypesCore.h:50
The packetizer is a load balancing object created for each query.
R__EXTERN TProof * gProof
Definition: TProof.h:1081
Int_t AddWorkers(TList *workers)
Adds new workers. Returns the number of workers added, or -1 on failure.
void Add(THist< DIMENSIONS, PRECISION_TO, STAT_TO... > &to, const THist< DIMENSIONS, PRECISION_FROM, STAT_FROM... > &from)
Add two histograms.
Definition: THist.hxx:336
#define ClassImp(name)
Definition: Rtypes.h:336
TObject * GetEntryList() const
Definition: TDSet.h:131
Bool_t fHeuristicPSiz
Definition: TPacketizer.h:68
double f(double x)
void SetValid()
Definition: TDSet.h:135
double Double_t
Definition: RtypesCore.h:55
virtual const char * HostName()
Return the system&#39;s host name.
Definition: TSystem.cxx:310
Long64_t GetEntriesProcessed() const
Double_t GetCPUTime() const
TMap implements an associative array of (key,value) pairs using a THashTable for efficient retrieval ...
Definition: TMap.h:40
R__EXTERN TEnv * gEnv
Definition: TEnv.h:170
This class controls a Parallel ROOT Facility, PROOF, cluster.
Definition: TProof.h:320
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
Definition: TString.h:572
void SetNum(Long64_t num)
Definition: TDSet.h:118
you should not use this method at all Int_t Int_t Double_t Double_t Double_t e
Definition: TRolke.cxx:630
const char * GetFileName() const
Definition: TDSet.h:111
Int_t BufferSize() const
Definition: TBuffer.h:92
virtual void Clear(Option_t *option="")
Remove all objects from the list.
Definition: TList.cxx:353
Bool_t IsNull() const
Definition: TString.h:385
void ValidateFiles(TDSet *dset, TList *slaves, Long64_t maxent=-1, Bool_t byfile=kFALSE)
Check existence of file/dir/tree an get number of entries.
Mother of all ROOT objects.
Definition: TObject.h:37
TObject * FindObject(const char *keyname) const
Check if a (key,value) pair exists with keyname as name of the key.
Definition: TMap.cxx:214
Long64_t GetBytesRead() const
const char * GetDataSet() const
Definition: TDSet.h:122
R__EXTERN TProofServ * gProofServ
Definition: TProofServ.h:347
virtual void Add(TObject *obj)
Definition: TList.h:77
Definition: file.py:1
TObject * GetValue(const char *keyname) const
Returns a pointer to the value associated with keyname as name of the key.
Definition: TMap.cxx:235
TList * fActive
Definition: TPacketizer.h:51
Definition: first.py:1
virtual void Print(Option_t *option="") const
Default print for collections, calls Print(option, 1).
virtual const char * GetName() const
Returns name of object.
Definition: TObject.cxx:364
virtual Int_t GetSize() const
Definition: TCollection.h:89
Class describing a PROOF worker server.
Definition: TSlave.h:46
TFileStat * GetNextActive()
Get next active file.
Container class for processing statistics.
void SetFirst(Long64_t first)
Definition: TDSet.h:113
virtual void SetTitle(const char *title="")
Set the title of the TNamed.
Definition: TNamed.cxx:155
A List of entry numbers in a TTree or TChain.
Definition: TEntryList.h:25
const Bool_t kTRUE
Definition: RtypesCore.h:91
const Int_t n
Definition: legend1.C:16
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition: TObject.cxx:859
virtual const char * GetTitle() const
Returns title of object.
Definition: TNamed.h:48
Bool_t fDefMaxWrkNode
Definition: TPacketizer.h:69
const char * Data() const
Definition: TString.h:347
TFileNode * NextActiveNode()
Get next active node.