ROOT  6.06/09
Reference Guide
TPacketizer.cxx
Go to the documentation of this file.
1 // @(#)root/proofplayer:$Id$
2 // Author: Maarten Ballintijn 18/03/02
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2002, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 //////////////////////////////////////////////////////////////////////////
13 // //
14 // TPacketizer //
15 // //
16 // This class generates packets to be processed on PROOF worker servers.//
17 // A packet is an event range (begin entry and number of entries) or //
18 // object range (first object and number of objects) in a TTree //
19 // (entries) or a directory (objects) in a file. //
20 // Packets are generated taking into account the performance of the //
21 // remote machine, the time it took to process a previous packet on //
22 // the remote machine, the locality of the database files, etc. //
23 // //
24 //////////////////////////////////////////////////////////////////////////
25 
26 
27 #include "TPacketizer.h"
28 
29 #include "Riostream.h"
30 #include "TDSet.h"
31 #include "TEnv.h"
32 #include "TError.h"
33 #include "TEventList.h"
34 #include "TEntryList.h"
35 #include "TMap.h"
36 #include "TMessage.h"
37 #include "TMonitor.h"
38 #include "TNtupleD.h"
39 #include "TObject.h"
40 #include "TParameter.h"
41 #include "TPerfStats.h"
42 #include "TProofDebug.h"
43 #include "TProof.h"
44 #include "TProofPlayer.h"
45 #include "TProofServ.h"
46 #include "TSlave.h"
47 #include "TSocket.h"
48 #include "TTimer.h"
49 #include "TUrl.h"
50 #include "TClass.h"
51 #include "TMath.h"
52 #include "TObjString.h"
53 
54 //
55 // The following three utility classes manage the state of the
56 // work to be performed and the slaves involved in the process.
57 // A list of TFileNode(s) describes the hosts with files, each
58 // has a list of TFileStat(s) keeping the state for each TDSet
59 // element (file).
60 //
61 // The list of TSlaveStat(s) keep track of the work (being) done
62 // by each slave
63 //
64 
65 
66 //------------------------------------------------------------------------------
67 
68 class TPacketizer::TFileStat : public TObject {
69 
70 private:
71  Bool_t fIsDone; // is this element processed
72  TFileNode *fNode; // my FileNode
73  TDSetElement *fElement; // location of the file and its range
74  Long64_t fNextEntry; // cursor in the range, -1 when done
75 
76 public:
77  TFileStat(TFileNode *node, TDSetElement *elem);
78 
79  Bool_t IsDone() const {return fIsDone;}
80  void SetDone() {fIsDone = kTRUE;}
81  TFileNode *GetNode() const {return fNode;}
82  TDSetElement *GetElement() const {return fElement;}
83  Long64_t GetNextEntry() const {return fNextEntry;}
84  void MoveNextEntry(Long64_t step) {fNextEntry += step;}
85 };
86 
87 
88 TPacketizer::TFileStat::TFileStat(TFileNode *node, TDSetElement *elem)
89  : fIsDone(kFALSE), fNode(node), fElement(elem), fNextEntry(elem->GetFirst())
90 {
91 }
92 
93 
94 //------------------------------------------------------------------------------
95 
96 class TPacketizer::TFileNode : public TObject {
97 
98 private:
99  TString fNodeName; // FQDN of the node
100  TList *fFiles; // TDSetElements (files) stored on this node
101  TObject *fUnAllocFileNext; // cursor in fFiles
102  TList *fActFiles; // files with work remaining
103  TObject *fActFileNext; // cursor in fActFiles
104  Int_t fMySlaveCnt; // number of slaves running on this node
105  Int_t fSlaveCnt; // number of external slaves processing files on this node
106 
107 public:
108  TFileNode(const char *name);
109  ~TFileNode() { delete fFiles; delete fActFiles; }
110 
111  void IncMySlaveCnt() { fMySlaveCnt++; }
112  void IncSlaveCnt(const char *slave) { if (fNodeName != slave) fSlaveCnt++; }
113  void DecSlaveCnt(const char *slave) { if (fNodeName != slave) fSlaveCnt--; R__ASSERT(fSlaveCnt >= 0); }
114  Int_t GetSlaveCnt() const {return fMySlaveCnt + fSlaveCnt;}
115  Int_t GetNumberOfActiveFiles() const { return fActFiles->GetSize(); }
116  Bool_t IsSortable() const { return kTRUE; }
117 
118  const char *GetName() const { return fNodeName.Data(); }
119 
120  void Add(TDSetElement *elem)
121  {
122  TFileStat *f = new TFileStat(this,elem);
123  fFiles->Add(f);
124  if (fUnAllocFileNext == 0) fUnAllocFileNext = fFiles->First();
125  }
126 
127  TFileStat *GetNextUnAlloc()
128  {
129  TObject *next = fUnAllocFileNext;
130 
131  if (next != 0) {
132  // make file active
133  fActFiles->Add(next);
134  if (fActFileNext == 0) fActFileNext = fActFiles->First();
135 
136  // move cursor
137  fUnAllocFileNext = fFiles->After(fUnAllocFileNext);
138  }
139 
140  return (TFileStat *) next;
141  }
142 
143  TFileStat *GetNextActive()
144  {
145  TObject *next = fActFileNext;
146 
147  if (fActFileNext != 0) {
148  fActFileNext = fActFiles->After(fActFileNext);
149  if (fActFileNext == 0) fActFileNext = fActFiles->First();
150  }
151 
152  return (TFileStat *) next;
153  }
154 
155  void RemoveActive(TFileStat *file)
156  {
157  if (fActFileNext == file) fActFileNext = fActFiles->After(file);
158  fActFiles->Remove(file);
159  if (fActFileNext == 0) fActFileNext = fActFiles->First();
160  }
161 
162  Int_t Compare(const TObject *other) const
163  {
164  // Must return -1 if this is smaller than obj, 0 if objects are equal
165  // and 1 if this is larger than obj.
166  const TFileNode *obj = dynamic_cast<const TFileNode*>(other);
167  if (!obj) {
168  Error("Compare", "input is not a TPacketizer::TFileNode object");
169  return 0;
170  }
171 
172  Int_t myVal = GetSlaveCnt();
173  Int_t otherVal = obj->GetSlaveCnt();
174  if (myVal < otherVal) {
175  return -1;
176  } else if (myVal > otherVal) {
177  return 1;
178  } else {
179  return 0;
180  }
181  }
182 
183  void Print(Option_t *) const
184  {
185  std::cout << "OBJ: " << IsA()->GetName() << "\t" << fNodeName
186  << "\tMySlaveCount " << fMySlaveCnt
187  << "\tSlaveCount " << fSlaveCnt << std::endl;
188  }
189 
190  void Reset()
191  {
192  fUnAllocFileNext = fFiles->First();
193  fActFiles->Clear();
194  fActFileNext = 0;
195  fSlaveCnt = 0;
196  fMySlaveCnt = 0;
197  }
198 };
199 
200 
201 TPacketizer::TFileNode::TFileNode(const char *name)
202  : fNodeName(name), fFiles(new TList), fUnAllocFileNext(0),fActFiles(new TList),
203  fActFileNext(0), fMySlaveCnt(0), fSlaveCnt(0)
204 {
205  // Constructor
206 
207  fFiles->SetOwner();
208  fActFiles->SetOwner(kFALSE);
209 }
210 
211 
212 //------------------------------------------------------------------------------
213 
214 class TPacketizer::TSlaveStat : public TVirtualPacketizer::TVirtualSlaveStat {
215 
216 friend class TPacketizer;
217 
218 private:
219  TFileNode *fFileNode; // corresponding node or 0
220  TFileStat *fCurFile; // file currently being processed
221  TDSetElement *fCurElem; // TDSetElement currently being processed
223 public:
224  TSlaveStat(TSlave *slave);
225  ~TSlaveStat();
226 
227  TFileNode *GetFileNode() const { return fFileNode; }
228 
229  void SetFileNode(TFileNode *node) { fFileNode = node; }
230 };
231 
232 
233 TPacketizer::TSlaveStat::TSlaveStat(TSlave *slave)
234  : fFileNode(0), fCurFile(0), fCurElem(0)
235 {
236  fSlave = slave;
237  fStatus = new TProofProgressStatus();
238 }
239 
240 ////////////////////////////////////////////////////////////////////////////////
241 /// Cleanup
242 
243 TPacketizer::TSlaveStat::~TSlaveStat()
244 {
245  SafeDelete(fStatus);
246 }
247 
248 TProofProgressStatus *TPacketizer::TSlaveStat::AddProcessed(TProofProgressStatus *st)
249 {
250  // Update the status info to the 'st'.
251  // return the difference (*st - *fStatus)
252 
253  if (st) {
254  // The entriesis not correct in 'st'
255  Long64_t lastEntries = st->GetEntries() - fStatus->GetEntries();
256  // The last proc time should not be added
257  fStatus->SetLastProcTime(0.);
258  // Get the diff
259  TProofProgressStatus *diff = new TProofProgressStatus(*st - *fStatus);
260  *fStatus += *diff;
261  // Set the correct value
262  fStatus->SetLastEntries(lastEntries);
263  return diff;
264  } else {
265  Error("AddProcessed", "status arg undefined");
266  return 0;
267  }
268 }
269 
270 //------------------------------------------------------------------------------
271 
273 
274 ////////////////////////////////////////////////////////////////////////////////
275 /// Constructor
276 
277 TPacketizer::TPacketizer(TDSet *dset, TList *slaves, Long64_t first,
278  Long64_t num, TList *input, TProofProgressStatus *st)
279  : TVirtualPacketizer(input, st)
280 {
281  PDB(kPacketizer,1) Info("TPacketizer", "Enter (first %lld, num %lld)", first, num);
282 
283  // Init pointer members
284  fPackets = 0;
285  fUnAllocated = 0;
286  fActive = 0;
287  fFileNodes = 0;
288  fMaxPerfIdx = 1;
289  fMaxSlaveCnt = 0;
292 
293  if (!fProgressStatus) {
294  Error("TPacketizer", "No progress status");
295  return;
296  }
297 
298  Long_t maxSlaveCnt = 0;
299  if (TProof::GetParameter(input, "PROOF_MaxSlavesPerNode", maxSlaveCnt) == 0) {
300  if (maxSlaveCnt < 0) {
301  Warning("TPacketizer", "PROOF_MaxSlavesPerNode must be positive");
302  maxSlaveCnt = 0;
303  }
304  if (maxSlaveCnt > 0) fDefMaxWrkNode = kFALSE;
305  } else {
306  // Try also with Int_t (recently supported in TProof::SetParameter)
307  Int_t mxslcnt = -1;
308  if (TProof::GetParameter(input, "PROOF_MaxSlavesPerNode", mxslcnt) == 0) {
309  if (mxslcnt < 0) {
310  Warning("TPacketizer", "PROOF_MaxSlavesPerNode must be positive");
311  mxslcnt = 0;
312  }
313  maxSlaveCnt = (Long_t) mxslcnt;
314  if (maxSlaveCnt > 0) fDefMaxWrkNode = kFALSE;
315  }
316  }
317  if (!maxSlaveCnt) {
318  maxSlaveCnt = gEnv->GetValue("Packetizer.MaxWorkersPerNode", slaves->GetSize());
319  if (maxSlaveCnt != slaves->GetSize()) fDefMaxWrkNode = kFALSE;
320  }
321  if (maxSlaveCnt > 0) {
322  fMaxSlaveCnt = maxSlaveCnt;
323  PDB(kPacketizer,1)
324  Info("TPacketizer", "setting max number of workers per node to %ld", fMaxSlaveCnt);
325  }
326 
327  fPackets = new TList;
328  fPackets->SetOwner();
329 
330  fFileNodes = new TList;
331  fFileNodes->SetOwner();
332  fUnAllocated = new TList;
334  fActive = new TList;
336 
337 
338  fValid = kTRUE;
339 
340  // Resolve end-point urls to optmize distribution
341  // dset->Lookup(); // moved to TProofPlayerRemote::Process
342 
343  // Split into per host entries
344  dset->Reset();
345  TDSetElement *e;
346  while ((e = (TDSetElement*)dset->Next())) {
347  if (e->GetValid()) continue;
348 
349  TUrl url = e->GetFileName();
350 
351  // Map non URL filenames to dummy host
352  TString host;
353  if ( !url.IsValid() ||
354  (strncmp(url.GetProtocol(),"root", 4) &&
355  strncmp(url.GetProtocol(),"rfio", 4) &&
356  strncmp(url.GetProtocol(),"file", 4)) ) {
357  host = "no-host";
358  } else if ( url.IsValid() && !strncmp(url.GetProtocol(),"file", 4)) {
359  host = "localhost";
360  url.SetProtocol("root");
361  } else {
362  host = url.GetHost();
363  }
364  // Get full name for local hosts
365  if (host.Contains("localhost") || host == "127.0.0.1") {
366  url.SetHost(gSystem->HostName());
367  host = url.GetHostFQDN();
368  }
369 
370  TFileNode *node = (TFileNode*) fFileNodes->FindObject( host );
371 
372  if (node == 0) {
373  node = new TFileNode(host);
374  fFileNodes->Add(node);
375  }
376 
377  node->Add( e );
378  }
379 
380  fSlaveStats = new TMap;
382 
383  // Record initial available workers
384  Int_t nwrks = AddWorkers(slaves);
385  Info("TPacketizer", "Initial number of workers: %d", nwrks);
386 
387  // Setup file & filenode structure
388  Reset();
389  // Optimize the number of files to be open when running on subsample
390  Int_t validateMode = 0;
391  Int_t gprc = TProof::GetParameter(input, "PROOF_ValidateByFile", validateMode);
392  Bool_t byfile = (gprc == 0 && validateMode > 0 && num > -1) ? kTRUE : kFALSE;
393  if (num > -1)
394  PDB(kPacketizer,2)
395  Info("TPacketizer",
396  "processing subset of entries: validating by file? %s", byfile ? "yes": "no");
397  ValidateFiles(dset, slaves, num, byfile);
398 
399  if (!fValid) return;
400 
401  // apply global range (first,num) to dset and rebuild structure
402  // ommitting TDSet elements that are not needed
403 
404  Int_t files = 0;
405  fTotalEntries = 0;
406  fUnAllocated->Clear(); // avoid dangling pointers
407  fActive->Clear();
408  fFileNodes->Clear(); // then delete all objects
409  PDB(kPacketizer,2) Info("TPacketizer", "processing range: first %lld, num %lld", first, num);
410 
411  dset->Reset();
412  Long64_t cur = 0;
413  while (( e = (TDSetElement*)dset->Next())) {
414 
415  // Skip invalid or missing file; It will be moved
416  // from the dset to the 'MissingFiles' list in the player.
417  if (!e->GetValid()) continue;
418 
419  // The dataset name, if any
420  if (fDataSet.IsNull() && e->GetDataSet() && strlen(e->GetDataSet()))
421  fDataSet = e->GetDataSet();
422 
423  TUrl url = e->GetFileName();
424  Long64_t eFirst = e->GetFirst();
425  Long64_t eNum = e->GetNum();
426  PDB(kPacketizer,2)
427  Info("TPacketizer", " --> '%s'", e->GetFileName());
428  PDB(kPacketizer,2)
429  Info("TPacketizer", " --> first %lld, num %lld (cur %lld)", eFirst, eNum, cur);
430 
431  if (!e->GetEntryList()){
432  // this element is before the start of the global range, skip it
433  if (cur + eNum < first) {
434  cur += eNum;
435  PDB(kPacketizer,2)
436  Info("TPacketizer", " --> skip element cur %lld", cur);
437  continue;
438  }
439 
440  // this element is after the end of the global range, skip it
441  if (num != -1 && (first+num <= cur)) {
442  cur += eNum;
443  PDB(kPacketizer,2)
444  Info("TPacketizer", " --> drop element cur %lld", cur);
445  continue; // break ??
446  }
447 
448  Bool_t inRange = kFALSE;
449  if (cur <= first || (num != -1 && (first+num <= cur+eNum))) {
450 
451  if (cur <= first) {
452  // If this element contains the start of the global range
453  // adjust its start and number of entries
454  e->SetFirst( eFirst + (first - cur) );
455  e->SetNum( e->GetNum() - (first - cur) );
456  PDB(kPacketizer,2)
457  Info("TPacketizer", " --> adjust start %lld and end %lld",
458  eFirst + (first - cur), first + num - cur);
459  inRange = kTRUE;
460  }
461  if (num != -1 && (first+num <= cur+eNum)) {
462  // If this element contains the end of the global range
463  // adjust its number of entries
464  e->SetNum( first + num - e->GetFirst() - cur );
465  PDB(kPacketizer,2)
466  Info("TPacketizer", " --> adjust end %lld", first + num - cur);
467  inRange = kTRUE;
468  }
469 
470  } else {
471  // Increment the counter ...
472  PDB(kPacketizer,2)
473  Info("TPacketizer", " --> increment 'cur' by %lld", eNum);
474  cur += eNum;
475  }
476  // Re-adjust eNum and cur, if needed
477  if (inRange) {
478  cur += eNum;
479  eNum = e->GetNum();
480  }
481 
482  } else {
483  TEntryList *enl = dynamic_cast<TEntryList *>(e->GetEntryList());
484  if (enl) {
485  eNum = enl->GetN();
486  } else {
487  TEventList *evl = dynamic_cast<TEventList *>(e->GetEntryList());
488  eNum = evl ? evl->GetN() : eNum;
489  }
490  if (!eNum)
491  continue;
492  }
493  PDB(kPacketizer,2)
494  Info("TPacketizer", " --> next cur %lld", cur);
495 
496  // Map non URL filenames to dummy host
497  TString host;
498  if ( !url.IsValid() ||
499  (strncmp(url.GetProtocol(),"root", 4) &&
500  strncmp(url.GetProtocol(),"rfio", 4) &&
501  strncmp(url.GetProtocol(),"file", 4)) ) {
502  host = "no-host";
503  } else if ( url.IsValid() && !strncmp(url.GetProtocol(),"file", 4)) {
504  host = "localhost";
505  url.SetProtocol("root");
506  } else {
507  host = url.GetHostFQDN();
508  }
509  // Get full name for local hosts
510  if (host.Contains("localhost") || host == "127.0.0.1") {
511  url.SetHost(gSystem->HostName());
512  host = url.GetHostFQDN();
513  }
514 
515  TFileNode *node = (TFileNode*) fFileNodes->FindObject( host );
516 
517  if ( node == 0 ) {
518  node = new TFileNode( host );
519  fFileNodes->Add( node );
520  }
521 
522  ++files;
523  fTotalEntries += eNum;
524  node->Add(e);
525  PDB(kPacketizer,2) e->Print("a");
526  }
527 
528  PDB(kPacketizer,1)
529  Info("TPacketizer", "processing %lld entries in %d files on %d hosts",
530  fTotalEntries, files, fFileNodes->GetSize());
531 
532  // Set the total number for monitoring
533  if (gPerfStats)
534  gPerfStats->SetNumEvents(fTotalEntries);
535 
536  Reset();
537 
538  if (fFileNodes->GetSize() == 0) {
539  Info("TPacketizer", "no valid or non-empty file found: setting invalid");
540  // No valid files: set invalid and return
541  fValid = kFALSE;
542  return;
543  }
544 
545  // Below we provide a possibility to change the way packet size is
546  // calculated or define the packet size directly.
547  // fPacketAsAFraction can be interpreted as follows:
548  // assuming all slaves have equal processing rate,
549  // packet size is (#events processed by 1 slave) / fPacketSizeAsAFraction.
550  // It substitutes 20 in the old formula to calculate the fPacketSize:
551  // fPacketSize = fTotalEntries / (20 * nslaves)
552  Long_t packetAsAFraction = 20;
553  if (TProof::GetParameter(input, "PROOF_PacketAsAFraction", packetAsAFraction) == 0)
554  Info("Process", "using alternate fraction of query time as a packet Size: %ld",
555  packetAsAFraction);
556  fPacketAsAFraction = (Int_t)packetAsAFraction;
557 
558  fPacketSize = 1;
559  if (TProof::GetParameter(input, "PROOF_PacketSize", fPacketSize) == 0) {
560  Info("Process","using alternate packet size: %lld", fPacketSize);
561  } else {
562  // Heuristic for starting packet size
564  Int_t nslaves = fSlaveStats->GetSize();
565  if (nslaves > 0) {
567  if (fPacketSize < 1) fPacketSize = 1;
568  } else {
569  fPacketSize = 1;
570  }
571  }
572 
573  PDB(kPacketizer,1) Info("TPacketizer", "Base Packetsize = %lld", fPacketSize);
574 
575  if (!fValid)
577 
578  PDB(kPacketizer,1) Info("TPacketizer", "Return");
579 }
580 
581 ////////////////////////////////////////////////////////////////////////////////
582 /// Destructor.
583 
585 {
586  if (fSlaveStats) {
588  }
589 
595 }
596 
597 ////////////////////////////////////////////////////////////////////////////////
598 /// Adds new workers. Returns the number of workers added, or -1 on failure.
599 
601 {
602  if (!workers) {
603  Error("AddWorkers", "Null list of new workers!");
604  return -1;
605  }
606 
607  Int_t curNumOfWrks = fSlaveStats->GetEntries();
608 
609  TSlave *sl;
610  TIter next(workers);
611  while (( sl = dynamic_cast<TSlave*>(next()) ))
612  if (!fSlaveStats->FindObject(sl)) {
613  fSlaveStats->Add(sl, new TSlaveStat(sl));
615  }
616 
617  // If heuristic (and new workers) set the packet size
618  Int_t nwrks = fSlaveStats->GetSize();
619  if (fHeuristicPSiz && nwrks > curNumOfWrks) {
620  if (nwrks > 0) {
622  if (fPacketSize < 1) fPacketSize = 1;
623  } else {
624  fPacketSize = 1;
625  }
626  }
627 
628  // Update the max number that can access one file node if the default is used
629  if (fDefMaxWrkNode && nwrks > fMaxSlaveCnt) fMaxSlaveCnt = nwrks;
630 
631  // Done
632  return nwrks;
633 }
634 
635 ////////////////////////////////////////////////////////////////////////////////
636 /// Get next unallocated file.
637 
638 TPacketizer::TFileStat *TPacketizer::GetNextUnAlloc(TFileNode *node)
639 {
640  TFileStat *file = 0;
641 
642  if (node != 0) {
643  file = node->GetNextUnAlloc();
644  if (file == 0) RemoveUnAllocNode(node);
645  } else {
646  while (file == 0 && ((node = NextUnAllocNode()) != 0)) {
647  file = node->GetNextUnAlloc();
648  if (file == 0) RemoveUnAllocNode(node);
649  }
650  }
651 
652  if (file != 0) {
653  // if needed make node active
654  if (fActive->FindObject(node) == 0) {
655  fActive->Add(node);
656  }
657  }
658 
659  return file;
660 }
661 
662 ////////////////////////////////////////////////////////////////////////////////
663 /// Get next unallocated node.
664 
665 TPacketizer::TFileNode *TPacketizer::NextUnAllocNode()
666 {
667  fUnAllocated->Sort();
668  PDB(kPacketizer,2) {
669  std::cout << "TPacketizer::NextUnAllocNode()" << std::endl;
670  fUnAllocated->Print();
671  }
672 
673  TFileNode *fn = (TFileNode*) fUnAllocated->First();
674  if (fn != 0 && fMaxSlaveCnt > 0 && fn->GetSlaveCnt() >= fMaxSlaveCnt) {
675  PDB(kPacketizer,1) Info("NextUnAllocNode", "reached workers per node limit (%ld)",
676  fMaxSlaveCnt);
677  fn = 0;
678  }
679 
680  return fn;
681 }
682 
683 ////////////////////////////////////////////////////////////////////////////////
684 /// Remove unallocated node.
685 
686 void TPacketizer::RemoveUnAllocNode(TFileNode * node)
687 {
688  fUnAllocated->Remove(node);
689 }
690 
691 ////////////////////////////////////////////////////////////////////////////////
692 /// Get next active file.
693 
694 TPacketizer::TFileStat *TPacketizer::GetNextActive()
695 {
696  TFileNode *node;
697  TFileStat *file = 0;
698 
699  while (file == 0 && ((node = NextActiveNode()) != 0)) {
700  file = node->GetNextActive();
701  if (file == 0) RemoveActiveNode(node);
702  }
703 
704  return file;
705 }
706 
707 ////////////////////////////////////////////////////////////////////////////////
708 /// Get next active node.
709 
710 TPacketizer::TFileNode *TPacketizer::NextActiveNode()
711 {
712  fActive->Sort();
713  PDB(kPacketizer,2) {
714  Printf("TPacketizer::NextActiveNode : ----------------------");
715  fActive->Print();
716  }
717 
718  TFileNode *fn = (TFileNode*) fActive->First();
719  if (fn != 0 && fMaxSlaveCnt > 0 && fn->GetSlaveCnt() >= fMaxSlaveCnt) {
720  PDB(kPacketizer,1)
721  Info("NextActiveNode", "reached workers per node limit (%ld)", fMaxSlaveCnt);
722  fn = 0;
723  }
724 
725  return fn;
726 }
727 
728 ////////////////////////////////////////////////////////////////////////////////
729 /// Remove file from the list of actives.
730 
731 void TPacketizer::RemoveActive(TFileStat *file)
732 {
733  TFileNode *node = file->GetNode();
734 
735  node->RemoveActive(file);
736  if (node->GetNumberOfActiveFiles() == 0) RemoveActiveNode(node);
737 }
738 
739 ////////////////////////////////////////////////////////////////////////////////
740 /// Remove node from the list of actives.
741 
742 void TPacketizer::RemoveActiveNode(TFileNode *node)
743 {
744  fActive->Remove(node);
745 }
746 
747 ////////////////////////////////////////////////////////////////////////////////
748 /// Reset the internal datastructure for packet distribution.
749 
751 {
752  fUnAllocated->Clear();
754 
755  fActive->Clear();
756 
757  TIter files(fFileNodes);
758  TFileNode *fn;
759  while ((fn = (TFileNode*) files.Next()) != 0) {
760  fn->Reset();
761  }
762 
763  TIter slaves(fSlaveStats);
764  TObject *key;
765  while ((key = slaves.Next()) != 0) {
766  TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue(key);
767  if (slstat) {
768  fn = (TFileNode*) fFileNodes->FindObject(slstat->GetName());
769  if (fn != 0 ) {
770  slstat->SetFileNode(fn);
771  fn->IncMySlaveCnt();
772  }
773  slstat->fCurFile = 0;
774  } else {
775  Warning("Reset", "TSlaveStat associated to key '%s' is NULL", key->GetName());
776  }
777  }
778 }
779 
780 ////////////////////////////////////////////////////////////////////////////////
781 /// Check existence of file/dir/tree an get number of entries.
782 /// Assumes the files have been setup.
783 
784 void TPacketizer::ValidateFiles(TDSet *dset, TList *slaves, Long64_t maxent, Bool_t byfile)
785 {
786  TMap slaves_by_sock;
787  TMonitor mon;
788  TList workers;
789 
790 
791  // Setup the communication infrastructure
792 
793  workers.AddAll(slaves);
794  TIter si(slaves);
795  TSlave *slm = 0;
796  while ((slm = (TSlave*)si.Next()) != 0) {
797  PDB(kPacketizer,3)
798  Info("ValidateFiles","socket added to monitor: %p (%s)",
799  slm->GetSocket(), slm->GetName());
800  mon.Add(slm->GetSocket());
801  slaves_by_sock.Add(slm->GetSocket(), slm);
802  PDB(kPacketizer,1)
803  Info("ValidateFiles",
804  "mon: %p, wrk: %p, sck: %p", &mon, slm, slm->GetSocket());
805  }
806 
807  mon.DeActivateAll();
808 
809  ((TProof*)gProof)->DeActivateAsyncInput();
810 
811  // Some monitoring systems (TXSocketHandler) need to know this
812  ((TProof*)gProof)->fCurrentMonitor = &mon;
813 
814  // Preparing for client notification
815  TString msg("Validating files");
816  UInt_t n = 0;
817  UInt_t tot = dset->GetListOfElements()->GetSize();
818  Bool_t st = kTRUE;
819 
820  Long64_t totent = 0, nopenf = 0;
821  while (kTRUE) {
822 
823  // send work
824  while( TSlave *s = (TSlave*)workers.First() ) {
825 
826  workers.Remove(s);
827 
828  // find a file
829 
830  TSlaveStat *slstat = (TSlaveStat*)fSlaveStats->GetValue(s);
831  if (!slstat) {
832  Error("ValidateFiles", "TSlaveStat associated to slave '%s' is NULL", s->GetName());
833  continue;
834  }
835  TFileNode *node = 0;
836  TFileStat *file = 0;
837 
838  // try its own node first
839  if ( (node = slstat->GetFileNode()) != 0 ) {
840  file = GetNextUnAlloc(node);
841  if ( file == 0 ) {
842  slstat->SetFileNode(0);
843  }
844  }
845 
846  // look for a file on any other node if necessary
847  if (file == 0) {
848  file = GetNextUnAlloc();
849  }
850 
851  if ( file != 0 ) {
852  // files are done right away
853  RemoveActive(file);
854 
855  slstat->fCurFile = file;
856  TDSetElement *elem = file->GetElement();
857  Long64_t entries = elem->GetEntries(kTRUE, kFALSE);
858  if (entries < 0 || strlen(elem->GetTitle()) <= 0) {
859  // This is decremented when we get the reply
860  file->GetNode()->IncSlaveCnt(slstat->GetName());
862  m << dset->IsTree()
863  << TString(elem->GetFileName())
864  << TString(elem->GetDirectory())
865  << TString(elem->GetObjName());
866 
867  s->GetSocket()->Send( m );
868  mon.Activate(s->GetSocket());
869  PDB(kPacketizer,2)
870  Info("ValidateFiles",
871  "sent to worker-%s (%s) via %p GETENTRIES on %s %s %s %s",
872  s->GetOrdinal(), s->GetName(), s->GetSocket(),
873  dset->IsTree() ? "tree" : "objects", elem->GetFileName(),
874  elem->GetDirectory(), elem->GetObjName());
875  } else {
876  // Fill the info
877  elem->SetTDSetOffset(entries);
878  if (entries > 0) {
879  // Most likely valid
880  elem->SetValid();
881  if (!elem->GetEntryList()) {
882  if (elem->GetFirst() > entries) {
883  Error("ValidateFiles",
884  "first (%lld) higher then number of entries (%lld) in %s",
885  elem->GetFirst(), entries, elem->GetFileName());
886  // disable element
887  slstat->fCurFile->SetDone();
888  elem->Invalidate();
890  }
891  if (elem->GetNum() == -1) {
892  elem->SetNum(entries - elem->GetFirst());
893  } else if (elem->GetFirst() + elem->GetNum() > entries) {
894  Warning("ValidateFiles", "num (%lld) + first (%lld) larger then number of"
895  " keys/entries (%lld) in %s", elem->GetNum(), elem->GetFirst(),
896  entries, elem->GetFileName());
897  elem->SetNum(entries - elem->GetFirst());
898  }
899  PDB(kPacketizer,2)
900  Info("ValidateFiles",
901  "found elem '%s' with %lld entries", elem->GetFileName(), entries);
902  }
903  }
904  // Notify the client
905  n++;
906  gProof->SendDataSetStatus(msg, n, tot, st);
907 
908  // This worker is ready for the next validation
909  workers.Add(s);
910  }
911  }
912  }
913 
914  // Check if there is anything to wait for
915  if (mon.GetActive() == 0) {
916  if (byfile && maxent > 0 && totent > 0) {
917  // How many files do we still need ?
918  Long64_t nrestf = (maxent - totent) * nopenf / totent ;
919  if (nrestf <= 0 && maxent > totent) nrestf = 1;
920  if (nrestf > 0) {
921  PDB(kPacketizer,3)
922  Info("ValidateFiles", "{%lld, %lld, %lld): needs to validate %lld more files",
923  maxent, totent, nopenf, nrestf);
924  si.Reset();
925  while ((slm = (TSlave *) si.Next()) && nrestf--) {
926  workers.Add(slm);
927  }
928  continue;
929  } else {
930  PDB(kPacketizer,3)
931  Info("ValidateFiles", "no need to validate more files");
932  break;
933  }
934  } else {
935  break;
936  }
937  }
938 
939  PDB(kPacketizer,3) {
940  Info("ValidateFiles", "waiting for %d workers:", mon.GetActive());
941  TList *act = mon.GetListOfActives();
942  TIter next(act);
943  TSocket *s = 0;
944  while ((s = (TSocket*) next())) {
945  Info("ValidateFiles", "found sck: %p", s);
946  TSlave *sl = (TSlave *) slaves_by_sock.GetValue(s);
947  if (sl)
948  Info("ValidateFiles", " worker-%s (%s)", sl->GetOrdinal(), sl->GetName());
949  }
950  delete act;
951  }
952 
953  TSocket *sock = mon.Select();
954  // If we have been interrupted break
955  if (!sock) {
956  Error("ValidateFiles", "selection has been interrupted - STOP");
957  mon.DeActivateAll();
958  fValid = kFALSE;
959  break;
960  }
961  mon.DeActivate(sock);
962 
963  PDB(kPacketizer,3) Info("ValidateFiles", "select returned: %p", sock);
964 
965  TSlave *slave = (TSlave *) slaves_by_sock.GetValue( sock );
966  if (!sock->IsValid()) {
967  // A socket got invalid during validation
968  Error("ValidateFiles", "worker-%s (%s) got invalid - STOP",
969  slave->GetOrdinal(), slave->GetName());
970  ((TProof*)gProof)->MarkBad(slave);
971  fValid = kFALSE;
972  break;
973  }
974 
975  TMessage *reply;
976 
977  if ( sock->Recv(reply) <= 0 ) {
978  // Help! lost a slave?
979  ((TProof*)gProof)->MarkBad(slave);
980  fValid = kFALSE;
981  Error("ValidateFiles", "Recv failed! for worker-%s (%s)",
982  slave->GetOrdinal(), slave->GetName());
983  continue;
984  }
985 
986  if (reply->What() != kPROOF_GETENTRIES) {
987  // Not what we want: handover processing to the central machinery
988  Int_t what = reply->What();
989  ((TProof*)gProof)->HandleInputMessage(slave, reply);
990  if (what == kPROOF_FATAL) {
991  Error("ValidateFiles", "kPROOF_FATAL from worker-%s (%s)",
992  slave->GetOrdinal(), slave->GetName());
993  fValid = kFALSE;
994  } else {
995  // Reactivate the socket
996  mon.Activate(sock);
997  }
998  // Get next message
999  continue;
1000  }
1001 
1002  TSlaveStat *slavestat = (TSlaveStat*) fSlaveStats->GetValue( slave );
1003  TDSetElement *e = slavestat->fCurFile->GetElement();
1004  slavestat->fCurFile->GetNode()->DecSlaveCnt(slavestat->GetName());
1005  Long64_t entries;
1006 
1007  (*reply) >> entries;
1008 
1009  // Extract object name, if there
1010  if ((reply->BufferSize() > reply->Length())) {
1011  TString objname;
1012  (*reply) >> objname;
1013  e->SetTitle(objname);
1014  }
1015 
1016  e->SetTDSetOffset(entries);
1017  if ( entries > 0 ) {
1018 
1019  // This dataset element is most likely valid
1020  e->SetValid();
1021 
1022  //if (!e->GetEventList()) {
1023  if (!e->GetEntryList()){
1024  if ( e->GetFirst() > entries ) {
1025  Error("ValidateFiles", "first (%lld) higher then number of entries (%lld) in %s",
1026  e->GetFirst(), entries, e->GetFileName());
1027 
1028  // Invalidate the element
1029  slavestat->fCurFile->SetDone();
1030  e->Invalidate();
1031  dset->SetBit(TDSet::kSomeInvalid);
1032  }
1033 
1034  if ( e->GetNum() == -1 ) {
1035  e->SetNum( entries - e->GetFirst() );
1036  } else if ( e->GetFirst() + e->GetNum() > entries ) {
1037  Error("ValidateFiles",
1038  "num (%lld) + first (%lld) larger then number of keys/entries (%lld) in %s",
1039  e->GetNum(), e->GetFirst(), entries, e->GetFileName());
1040  e->SetNum(entries - e->GetFirst());
1041  }
1042  }
1043 
1044  // Count
1045  totent += entries;
1046  nopenf++;
1047 
1048  // Notify the client
1049  n++;
1050  gProof->SendDataSetStatus(msg, n, tot, st);
1051 
1052  } else {
1053 
1054  Error("ValidateFiles", "cannot get entries for %s (", e->GetFileName() );
1055  //
1056  // Need to fix this with a user option to allow incomplete file sets (rdm)
1057  //
1058  //fValid = kFALSE; // all element must be readable!
1059  if (gProofServ) {
1061  m << TString(Form("Cannot get entries for file: %s - skipping", e->GetFileName()));
1062  gProofServ->GetSocket()->Send(m);
1063  }
1064 
1065  // Invalidate the element
1066  e->Invalidate();
1067  dset->SetBit(TDSet::kSomeInvalid);
1068  }
1069  PDB(kPacketizer,3) Info("ValidateFiles", " %lld events validated", totent);
1070 
1071  // Ready for the next job, unless we have enough files
1072  if (maxent < 0 || ((totent < maxent) && !byfile))
1073  workers.Add(slave);
1074  }
1075 
1076  // report std. output from slaves??
1077 
1078  ((TProof*)gProof)->ActivateAsyncInput();
1079 
1080  // This needs to be reset
1081  ((TProof*)gProof)->fCurrentMonitor = 0;
1082 
1083  // No reason to continue if invalid
1084  if (!fValid)
1085  return;
1086 
1087 
1088  // compute the offset for each file element
1089  Long64_t offset = 0;
1090  Long64_t newOffset = 0;
1091  TIter next(dset->GetListOfElements());
1092  TDSetElement *el;
1093  while ( (el = dynamic_cast<TDSetElement*> (next())) ) {
1094  newOffset = offset + el->GetTDSetOffset();
1095  el->SetTDSetOffset(offset);
1096  offset = newOffset;
1097  }
1098 }
1099 
1100 ////////////////////////////////////////////////////////////////////////////////
1101 /// Get entries processed by the specified slave.
1102 
1104 {
1105  if ( fSlaveStats == 0 ) return 0;
1106 
1107  TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue( slave );
1108 
1109  if ( slstat == 0 ) return 0;
1110 
1111  return slstat->GetEntriesProcessed();
1112 }
1113 
1114 ////////////////////////////////////////////////////////////////////////////////
1115 /// Get Estimation of the current rate; just summing the current rates of
1116 /// the active workers
1117 
1119 {
1120  all = kTRUE;
1121  // Loop over the workers
1122  Float_t currate = 0.;
1123  if (fSlaveStats && fSlaveStats->GetSize() > 0) {
1124  TIter nxw(fSlaveStats);
1125  TObject *key;
1126  while ((key = nxw()) != 0) {
1127  TSlaveStat *slstat = (TSlaveStat *) fSlaveStats->GetValue(key);
1128  if (slstat && slstat->GetProgressStatus() && slstat->GetEntriesProcessed() > 0) {
1129  // Sum-up the current rates
1130  currate += slstat->GetProgressStatus()->GetCurrentRate();
1131  } else {
1132  all = kFALSE;
1133  }
1134  }
1135  }
1136  // Done
1137  return currate;
1138 }
1139 
1140 ////////////////////////////////////////////////////////////////////////////////
1141 /// Get next packet
1142 
1144 {
1145  if ( !fValid ) {
1146  return 0;
1147  }
1148 
1149  // Find worker
1150 
1151  TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue( sl );
1152 
1153  R__ASSERT( slstat != 0 );
1154 
1155  PDB(kPacketizer,1)
1156  Info("GetNextPacket","worker-%s (%s)", sl->GetOrdinal(), sl->GetName());
1157  // update stats & free old element
1158 
1159  Bool_t firstPacket = kFALSE;
1160  if ( slstat->fCurElem != 0 ) {
1161  Double_t latency = 0., proctime = 0., proccpu = 0.;
1162  Long64_t bytesRead = -1;
1163  Long64_t totalEntries = -1;
1164  Long64_t totev = 0;
1165  Long64_t numev = slstat->fCurElem->GetNum();
1166 
1167  fPackets->Add(slstat->fCurElem);
1168 
1169  if (sl->GetProtocol() > 18) {
1171  (*r) >> latency;
1172  (*r) >> status;
1173 
1174  // Calculate the progress made in the last packet
1175  TProofProgressStatus *progress = 0;
1176  if (status) {
1177  // upadte the worker status
1178  numev = status->GetEntries() - slstat->GetEntriesProcessed();
1179  progress = slstat->AddProcessed(status);
1180  if (progress) {
1181  // (*fProgressStatus) += *progress;
1182  proctime = progress->GetProcTime();
1183  proccpu = progress->GetCPUTime();
1184  totev = status->GetEntries(); // for backward compatibility
1185  bytesRead = progress->GetBytesRead();
1186  delete progress;
1187  }
1188  delete status;
1189  } else
1190  Error("GetNextPacket", "no status came in the kPROOF_GETPACKET message");
1191  } else {
1192 
1193  (*r) >> latency >> proctime >> proccpu;
1194 
1195  // only read new info if available
1196  if (r->BufferSize() > r->Length()) (*r) >> bytesRead;
1197  if (r->BufferSize() > r->Length()) (*r) >> totalEntries;
1198  if (r->BufferSize() > r->Length()) (*r) >> totev;
1199 
1200  numev = totev - slstat->GetEntriesProcessed();
1201  if (numev > 0) slstat->GetProgressStatus()->IncEntries(numev);
1202  if (bytesRead > 0) slstat->GetProgressStatus()->IncBytesRead(bytesRead);
1203  if (numev > 0 || bytesRead > 0) slstat->GetProgressStatus()->SetLastUpdate();
1204  }
1205 
1206  if (fProgressStatus) {
1207  if (numev > 0) fProgressStatus->IncEntries(numev);
1208  if (bytesRead > 0) fProgressStatus->IncBytesRead(bytesRead);
1209  if (numev > 0 || bytesRead > 0) fProgressStatus->SetLastUpdate();
1210  }
1211  PDB(kPacketizer,2)
1212  Info("GetNextPacket","worker-%s (%s): %lld %7.3lf %7.3lf %7.3lf %lld",
1213  sl->GetOrdinal(), sl->GetName(),
1214  numev, latency, proctime, proccpu, bytesRead);
1215 
1216  if (gPerfStats)
1217  gPerfStats->PacketEvent(sl->GetOrdinal(), sl->GetName(), slstat->fCurElem->GetFileName(),
1218  numev, latency, proctime, proccpu, bytesRead);
1219 
1220  slstat->fCurElem = 0;
1222  HandleTimer(0); // Send last timer message
1223  delete fProgress; fProgress = 0;
1224  }
1225  } else {
1226  firstPacket = kTRUE;
1227  }
1228 
1229  if ( fStop ) {
1230  HandleTimer(0);
1231  return 0;
1232  }
1233 
1234  // get a file if needed
1235 
1236  TFileStat *file = slstat->fCurFile;
1237 
1238  if ( file != 0 && file->IsDone() ) {
1239  file->GetNode()->DecSlaveCnt(slstat->GetName());
1240  if (gPerfStats)
1241  gPerfStats->FileEvent(sl->GetOrdinal(), sl->GetName(), file->GetNode()->GetName(),
1242  file->GetElement()->GetFileName(), kFALSE);
1243  file = 0;
1244  }
1245  // Reset the current file field
1246  slstat->fCurFile = file;
1247 
1248  if (!file) {
1249 
1250  // Try its own node first
1251  if (slstat->GetFileNode() != 0) {
1252  file = GetNextUnAlloc(slstat->GetFileNode());
1253  if (!file) {
1254  slstat->SetFileNode(0);
1255  }
1256  }
1257 
1258  // try to find an unused filenode first
1259  if (!file) {
1260  file = GetNextUnAlloc();
1261  }
1262 
1263  // then look at the active filenodes
1264  if (!file) {
1265  file = GetNextActive();
1266  }
1267 
1268  if (!file) return 0;
1269 
1270  slstat->fCurFile = file;
1271  file->GetNode()->IncSlaveCnt(slstat->GetName());
1272  if (gPerfStats)
1273  gPerfStats->FileEvent(sl->GetOrdinal(), sl->GetName(),
1274  file->GetNode()->GetName(),
1275  file->GetElement()->GetFileName(), kTRUE);
1276  }
1277 
1278  // get a packet
1279 
1280  TDSetElement *base = file->GetElement();
1281  Long64_t num = Long64_t(fPacketSize*(Float_t)slstat->fSlave->GetPerfIdx()/fMaxPerfIdx);
1282  if (num < 1) num = 1;
1283 
1284  Long64_t first = file->GetNextEntry();
1285  Long64_t last = base->GetFirst() + base->GetNum();
1286 
1287  if ( first + num >= last ) {
1288  num = last - first;
1289  file->SetDone(); // done
1290 
1291  // delete file from active list (unalloc list is single pass, no delete needed)
1292  RemoveActive(file);
1293 
1294  } else {
1295  file->MoveNextEntry(num);
1296  }
1297 
1298 
1299  slstat->fCurElem = CreateNewPacket(base, first, num);
1300  if (base->GetEntryList())
1301  slstat->fCurElem->SetEntryList(base->GetEntryList(), first, num);
1302 
1303  // Flag the first packet of a new run (dataset)
1304  if (firstPacket)
1305  slstat->fCurElem->SetBit(TDSetElement::kNewRun);
1306  else
1307  slstat->fCurElem->ResetBit(TDSetElement::kNewRun);
1308 
1309  PDB(kPacketizer,2)
1310  Info("GetNextPacket","%s: %s %lld %lld", sl->GetOrdinal(), base->GetFileName(), first, num);
1311 
1312  return slstat->fCurElem;
1313 }
1314 
1315 ////////////////////////////////////////////////////////////////////////////////
1316 /// Return the number of workers still processing
1317 
1319 {
1320  Int_t actw = 0;
1321  TIter nxw(fSlaveStats);
1322  TObject *key;
1323  while ((key = nxw())) {
1324  TSlaveStat *wrkstat = (TSlaveStat *) fSlaveStats->GetValue(key);
1325  if (wrkstat && wrkstat->fCurFile) actw++;
1326  }
1327  // Done
1328  return actw;
1329 }
const char * GetHost() const
Definition: TUrl.h:76
virtual Int_t GetEntries() const
Definition: TCollection.h:92
virtual const char * GetTitle() const
Returns title of object.
Definition: TNamed.h:52
const char * GetOrdinal() const
Definition: TSlave.h:135
Long64_t GetEntries(Bool_t istree=kTRUE, Bool_t openfile=kTRUE)
Returns number of entries in tree or objects in file.
Definition: TDSet.cxx:432
void Add(THist< DIMENSION, PRECISIONA > &to, THist< DIMENSION, PRECISIONB > &from)
Definition: THist.h:335
long long Long64_t
Definition: RtypesCore.h:69
TSocket * GetSocket() const
Definition: TSlave.h:138
Int_t fPacketAsAFraction
Definition: TPacketizer.h:62
void SetProtocol(const char *proto, Bool_t setDefaultPort=kFALSE)
Set protocol and, optionally, change the port accordingly.
Definition: TUrl.cxx:518
ClassImp(TSeqCollection) Int_t TSeqCollection TIter next(this)
Return index of object in collection.
float Float_t
Definition: RtypesCore.h:53
const char Option_t
Definition: RtypesCore.h:62
virtual Bool_t IsValid() const
Definition: TSocket.h:162
Int_t GetActiveWorkers()
Return the number of workers still processing.
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
Definition: TSocket.cxx:520
void SetTDSetOffset(Long64_t offset)
Definition: TDSet.h:131
This class represents a WWW compatible URL.
Definition: TUrl.h:41
virtual Int_t Recv(TMessage *&mess)
Receive a TMessage object.
Definition: TSocket.cxx:818
Definition: TDSet.h:153
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
const char * GetProtocol() const
Definition: TUrl.h:73
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition: TObject.cxx:892
Long64_t GetBytesRead() const
void SetLastUpdate(Double_t updtTime=0)
Update time stamp either with the passed value (if > 0) or with the current time. ...
void Add(TObject *obj)
This function may not be used (but we need to provide it since it is a pure virtual in TCollection)...
Definition: TMap.cxx:52
#define R__ASSERT(e)
Definition: TError.h:98
virtual void Add(TSocket *sock, Int_t interest=kRead)
Add socket to the monitor's active list.
Definition: TMonitor.cxx:168
Basic string class.
Definition: TString.h:137
TList * fPackets
Definition: TPacketizer.h:46
Float_t GetCurrentRate(Bool_t &all)
Get Estimation of the current rate; just summing the current rates of the active workers.
void RemoveActiveNode(TFileNode *)
Remove node from the list of actives.
TList * GetListOfActives() const
Returns a list with all active sockets.
Definition: TMonitor.cxx:498
TAlienJobStatus * status
Definition: TAlienJob.cxx:51
int Int_t
Definition: RtypesCore.h:41
bool Bool_t
Definition: RtypesCore.h:59
const Bool_t kFALSE
Definition: Rtypes.h:92
virtual ~TPacketizer()
Destructor.
virtual TObject * FindObject(const char *name) const
Find an object in this list using its name.
Definition: TList.cxx:496
void Print(Option_t *options="") const
Print a TDSetElement. When option="a" print full data.
Definition: TDSet.cxx:264
virtual void DeActivateAll()
De-activate all activated sockets.
Definition: TMonitor.cxx:302
Bool_t GetValid() const
Definition: TDSet.h:121
void Reset()
Definition: TCollection.h:161
virtual Bool_t HandleTimer(TTimer *timer)
Send progress message to client.
Long64_t GetEntriesProcessed() const
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
Definition: TObject.cxx:732
Double_t GetProcTime() const
const char * GetObjName() const
Definition: TDSet.h:122
virtual void Sort(Bool_t order=kSortAscending)
Sort linked list.
Definition: TList.cxx:770
virtual Int_t GetN() const
Definition: TEventList.h:58
Long64_t GetNum() const
Definition: TDSet.h:116
const char * Data() const
Definition: TString.h:349
Double_t GetCPUTime() const
#define SafeDelete(p)
Definition: RConfig.h:436
Long64_t GetTDSetOffset() const
Definition: TDSet.h:130
Bool_t IsTree() const
Definition: TDSet.h:225
TDSetElement * GetNextPacket(TSlave *sl, TMessage *r)
Get next packet.
virtual TObject * After(const TObject *obj) const
Returns the object after object obj.
Definition: TList.cxx:288
TDSetElement * CreateNewPacket(TDSetElement *base, Long64_t first, Long64_t num)
Creates a new TDSetElement from from base packet starting from the first entry with num entries...
#define PDB(mask, level)
Definition: TProofDebug.h:58
virtual void DeActivate(TSocket *sock)
De-activate a socket.
Definition: TMonitor.cxx:284
void IncEntries(Long64_t entries=1)
void IncBytesRead(Long64_t bytesRead)
virtual void AddAll(const TCollection *col)
TList * GetListOfElements() const
Definition: TDSet.h:231
void Info(const char *location, const char *msgfmt,...)
TObject * GetValue(const char *keyname) const
Returns a pointer to the value associated with keyname as name of the key.
Definition: TMap.cxx:234
Long64_t fPacketSize
Definition: TPacketizer.h:55
TList * fFileNodes
Definition: TPacketizer.h:51
void Reset()
Reset the internal datastructure for packet distribution.
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition: TObject.cxx:918
void Error(const char *location, const char *msgfmt,...)
TSocket * Select()
Return pointer to socket for which an event is waiting.
Definition: TMonitor.cxx:322
TObject * GetParameter(const char *par) const
Get specified parameter.
Definition: TProof.cxx:10485
TProofProgressStatus * fProgressStatus
Long64_t GetFirst() const
Definition: TDSet.h:114
Bool_t IsValid() const
Definition: TUrl.h:88
virtual Bool_t IsSortable() const
Definition: TObject.h:139
TObject * FindObject(const char *keyname) const
Check if a (key,value) pair exists with keyname as name of the key.
Definition: TMap.cxx:213
virtual void Print(Option_t *option="") const
This method must be overridden when a class wants to print itself.
Definition: TObject.cxx:594
A doubly linked list.
Definition: TList.h:47
static const char * what
Definition: stlLoader.cc:6
TList * fUnAllocated
Definition: TPacketizer.h:52
const char * GetName() const
Returns name of object.
Definition: TSlave.h:128
void SetLastEntries(Long64_t entries)
TSocket * GetSocket() const
Definition: TProofServ.h:269
void RemoveUnAllocNode(TFileNode *)
Remove unallocated node.
void SendDataSetStatus(const char *msg, UInt_t n, UInt_t tot, Bool_t st)
Send or notify data set status.
Definition: TProof.cxx:9903
void RemoveActive(TFileStat *file)
Remove file from the list of actives.
const char * GetFileName() const
Definition: TDSet.h:113
ROOT::R::TRInterface & r
Definition: Object.C:4
R__EXTERN TSystem * gSystem
Definition: TSystem.h:549
Int_t fMaxPerfIdx
Definition: TPacketizer.h:58
virtual Int_t GetValue(const char *name, Int_t dflt)
Returns the integer value for a resource.
Definition: TEnv.cxx:494
virtual TObject * Remove(TObject *obj)
Remove object from the list.
Definition: TList.cxx:674
TObject * Next()
Definition: TCollection.h:158
TClass * IsA() const
const char * GetHostFQDN() const
Return fully qualified domain name of url host.
Definition: TUrl.cxx:467
unsigned int UInt_t
Definition: RtypesCore.h:42
TMarker * m
Definition: textangle.C:8
char * Form(const char *fmt,...)
TObject * GetEntryList() const
Definition: TDSet.h:133
void Invalidate()
Definition: TDSet.h:136
TFileNode * NextUnAllocNode()
Get next unallocated node.
A TEventList object is a list of selected events (entries) in a TTree.
Definition: TEventList.h:33
TFileStat * GetNextUnAlloc(TFileNode *node=0)
Get next unallocated file.
virtual const char * GetName() const
Returns name of object.
Definition: TNamed.h:51
Bool_t IsNull() const
Definition: TString.h:387
void Reset(Detail::TBranchProxy *x)
void Warning(const char *location, const char *msgfmt,...)
Long64_t GetEntries() const
virtual void Activate(TSocket *sock)
Activate a de-activated socket.
Definition: TMonitor.cxx:250
Long_t fMaxSlaveCnt
Definition: TPacketizer.h:60
void DeleteValues()
Remove all (key,value) pairs from the map AND delete the values when they are allocated on the heap...
Definition: TMap.cxx:149
#define Printf
Definition: TGeoToOCC.h:18
ClassImp(TPacketizer) TPacketizer
Constructor.
#define gPerfStats
virtual void Print(Option_t *option="") const
Default print for collections, calls Print(option, 1).
const char * GetDataSet() const
Definition: TDSet.h:124
void SetHost(const char *host)
Definition: TUrl.h:93
long Long_t
Definition: RtypesCore.h:50
Int_t GetPerfIdx() const
Definition: TSlave.h:136
R__EXTERN TProof * gProof
Definition: TProof.h:1110
Int_t AddWorkers(TList *workers)
Adds new workers. Returns the number of workers added, or -1 on failure.
virtual Int_t GetSize() const
Definition: TCollection.h:95
Bool_t fHeuristicPSiz
Definition: TPacketizer.h:70
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
Definition: TMonitor.cxx:438
virtual const char * GetName() const
Returns name of object.
Definition: TObject.cxx:415
void SetValid()
Definition: TDSet.h:137
double Double_t
Definition: RtypesCore.h:55
virtual const char * HostName()
Return the system's host name.
Definition: TSystem.cxx:307
TMap implements an associative array of (key,value) pairs using a THashTable for efficient retrieval ...
Definition: TMap.h:44
R__EXTERN TEnv * gEnv
Definition: TEnv.h:174
Definition: TProof.h:339
void SetNum(Long64_t num)
Definition: TDSet.h:120
UInt_t What() const
Definition: TMessage.h:80
virtual void Clear(Option_t *option="")
Remove all objects from the list.
Definition: TList.cxx:348
Int_t GetProtocol() const
Definition: TSlave.h:137
void ValidateFiles(TDSet *dset, TList *slaves, Long64_t maxent=-1, Bool_t byfile=kFALSE)
Check existence of file/dir/tree an get number of entries.
virtual Long64_t GetN() const
Definition: TEntryList.h:77
Mother of all ROOT objects.
Definition: TObject.h:58
virtual TObject * First() const
Return the first object in the list. Returns 0 when list is empty.
Definition: TList.cxx:556
Int_t BufferSize() const
Definition: TBuffer.h:92
R__EXTERN TProofServ * gProofServ
Definition: TProofServ.h:359
virtual void Add(TObject *obj)
Definition: TList.h:81
Int_t Length() const
Definition: TBuffer.h:94
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
Definition: TString.h:567
virtual Int_t Compare(const TObject *obj) const
Compare abstract method.
Definition: TObject.cxx:218
ClassImp(TSlaveInfo) Int_t TSlaveInfo const TSlaveInfo * si
Used to sort slaveinfos by ordinal.
Definition: TProof.cxx:167
TList * fActive
Definition: TPacketizer.h:53
Definition: TSlave.h:50
TFileStat * GetNextActive()
Get next active file.
const Bool_t kTRUE
Definition: Rtypes.h:91
void SetFirst(Long64_t first)
Definition: TDSet.h:115
virtual void SetTitle(const char *title="")
Change (i.e. set) the title of the TNamed.
Definition: TNamed.cxx:152
A List of entry numbers in a TTree or TChain.
Definition: TEntryList.h:27
virtual TProofProgressStatus * AddProcessed(TProofProgressStatus *st)=0
const Int_t n
Definition: legend1.C:16
const char * GetDirectory() const
Return directory where to look for object.
Definition: TDSet.cxx:256
Bool_t fDefMaxWrkNode
Definition: TPacketizer.h:71
TFileNode * NextActiveNode()
Get next active node.
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition: TObject.cxx:904