ROOT  6.06/09
Reference Guide
TPacketizerAdaptive.cxx
Go to the documentation of this file.
1 // @(#)root/proofplayer:$Id$
2 // Author: Jan Iwaszkiewicz 11/12/06
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2002, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11 
12 //////////////////////////////////////////////////////////////////////////
13 // //
14 // TPacketizerAdaptive //
15 // //
16 // This packetizer is based on TPacketizer but uses different //
17 // load-balancing algorithms and data structures. //
18 // Two main improvements in the load-balancing strategy: //
19 // - First one was to change the order in which the files are assigned //
20 // to the computing nodes in such a way that network transfers are //
21 // evenly distributed in the query time. Transfer of the remote files //
22 // was often becoming a bottleneck at the end of a query. //
23 // - The other improvement is the use of time-based packet size. We //
24 // measure the processing rate of all the nodes and calculate the //
25 // packet size, so that it takes certain amount of time. In this way //
26 // packetizer prevents the situation where the query can't finish //
27 // because of one slow node. //
28 // //
29 // The data structures: TFileStat, TFileNode and TSlaveStat are //
30 // enriched + changed and TFileNode::Compare method is changed. //
31 // //
32 //////////////////////////////////////////////////////////////////////////
33 
34 
35 #include "TPacketizerAdaptive.h"
36 
37 #include "Riostream.h"
38 #include "TDSet.h"
39 #include "TError.h"
40 #include "TEnv.h"
41 #include "TEntryList.h"
42 #include "TEventList.h"
43 #include "TMap.h"
44 #include "TMessage.h"
45 #include "TMonitor.h"
46 #include "TNtupleD.h"
47 #include "TObject.h"
48 #include "TParameter.h"
49 #include "TPerfStats.h"
50 #include "TProofDebug.h"
51 #include "TProof.h"
52 #include "TProofServ.h"
53 #include "TSlave.h"
54 #include "TSocket.h"
55 #include "TSortedList.h"
56 #include "TUrl.h"
57 #include "TClass.h"
58 #include "TRandom.h"
59 #include "TMath.h"
60 #include "TObjString.h"
61 #include "TList.h"
62 
63 //
64 // The following three utility classes manage the state of the
65 // work to be performed and the slaves involved in the process.
66 // A list of TFileNode(s) describes the hosts with files, each
67 // has a list of TFileStat(s) keeping the state for each TDSet
68 // element (file).
69 //
70 // The list of TSlaveStat(s) keep track of the work (being) done
71 // by each slave
72 //
73 
74 
75 //------------------------------------------------------------------------------
76 
77 class TPacketizerAdaptive::TFileStat : public TObject {
78 
79 private:
80  Bool_t fIsDone; // is this element processed
81  TFileNode *fNode; // my FileNode
82  TDSetElement *fElement; // location of the file and its range
83  Long64_t fNextEntry; // cursor in the range, -1 when done // needs changing
84 
85 public:
86  TFileStat(TFileNode *node, TDSetElement *elem, TList *file);
87 
88  Bool_t IsDone() const {return fIsDone;}
89  Bool_t IsSortable() const { return kTRUE; }
90  void SetDone() {fIsDone = kTRUE;}
91  TFileNode *GetNode() const {return fNode;}
92  TDSetElement *GetElement() const {return fElement;}
93  Long64_t GetNextEntry() const {return fNextEntry;}
94  void MoveNextEntry(Long64_t step) {fNextEntry += step;}
95 
96  // This method is used to keep a sorted list of remaining files to be processed
97  Int_t Compare(const TObject* obj) const
98  {
99  // Return -1 if elem.entries < obj.elem.entries, 0 if elem.entries equal
100  // and 1 if elem.entries < obj.elem.entries.
101  const TFileStat *fst = dynamic_cast<const TFileStat*>(obj);
102  if (fst && GetElement() && fst->GetElement()) {
103  Long64_t ent = GetElement()->GetNum();
104  Long64_t entfst = fst->GetElement()->GetNum();
105  if (ent > 0 && entfst > 0) {
106  if (ent > entfst) {
107  return 1;
108  } else if (ent < entfst) {
109  return -1;
110  } else {
111  return 0;
112  }
113  }
114  }
115  // No info: assume equal (no change in order)
116  return 0;
117  }
118  void Print(Option_t * = 0) const
119  { // Notify file name and entries
120  Printf("TFileStat: %s %lld", fElement ? fElement->GetName() : "---",
121  fElement ? fElement->GetNum() : -1);
122  }
123 };
124 
125 TPacketizerAdaptive::TFileStat::TFileStat(TFileNode *node, TDSetElement *elem, TList *files)
126  : fIsDone(kFALSE), fNode(node), fElement(elem), fNextEntry(elem->GetFirst())
127 {
128  // Constructor: add to the global list
129  if (files) files->Add(this);
130 }
131 
132 //------------------------------------------------------------------------------
133 
134 // a class describing a file node as a part of a session
135 class TPacketizerAdaptive::TFileNode : public TObject {
136 
137 private:
138  TString fNodeName; // FQDN of the node
139  TList *fFiles; // TDSetElements (files) stored on this node
140  TObject *fUnAllocFileNext; // cursor in fFiles
141  TList *fActFiles; // files with work remaining
142  TObject *fActFileNext; // cursor in fActFiles
143  Int_t fMySlaveCnt; // number of slaves running on this node
144  // (which can process remote files)
145  Int_t fExtSlaveCnt; // number of external slaves processing
146  // files on this node
147  Int_t fRunSlaveCnt; // total number of slaves processing files
148  // on this node
149  Long64_t fProcessed; // number of events processed on this node
150  Long64_t fEvents; // number of entries in files on this node
151 
152  Int_t fStrategy; // 0 means the classic and 1 (default) - the adaptive strategy
153 
154  TSortedList *fFilesToProcess; // Global list of files (TFileStat) to be processed (owned by TPacketizer)
155 
156 public:
157  TFileNode(const char *name, Int_t strategy, TSortedList *files);
158  ~TFileNode() { delete fFiles; delete fActFiles; }
159 
160  void IncMySlaveCnt() { fMySlaveCnt++; }
161  Int_t GetMySlaveCnt() const { return fMySlaveCnt; }
162  void IncExtSlaveCnt(const char *slave) { if (fNodeName != slave) fExtSlaveCnt++; }
163  void DecExtSlaveCnt(const char *slave) { if (fNodeName != slave) fExtSlaveCnt--; R__ASSERT(fExtSlaveCnt >= 0); }
164  Int_t GetSlaveCnt() const { return fMySlaveCnt + fExtSlaveCnt; }
165  void IncRunSlaveCnt() { fRunSlaveCnt++; }
166  void DecRunSlaveCnt() { fRunSlaveCnt--; R__ASSERT(fRunSlaveCnt >= 0); }
167  Int_t GetRunSlaveCnt() const { return fRunSlaveCnt; }
168  Int_t GetExtSlaveCnt() const { return fExtSlaveCnt; }
169  Int_t GetNumberOfActiveFiles() const { return fActFiles->GetSize(); }
170  Bool_t IsSortable() const { return kTRUE; }
171  Int_t GetNumberOfFiles() { return fFiles->GetSize(); }
172  void IncProcessed(Long64_t nEvents)
173  { fProcessed += nEvents; }
174  Long64_t GetProcessed() const { return fProcessed; }
175  void DecreaseProcessed(Long64_t nEvents) { fProcessed -= nEvents; }
176  // this method is used by Compare() it adds 1, so it returns a number that
177  // would be true if one more slave is added.
178  Long64_t GetEventsLeftPerSlave() const
179  { return ((fEvents - fProcessed)/(fRunSlaveCnt + 1)); }
180  void IncEvents(Long64_t nEvents) { fEvents += nEvents; }
181  const char *GetName() const { return fNodeName.Data(); }
182  Long64_t GetNEvents() const { return fEvents; }
183 
184  void Print(Option_t * = 0) const
185  {
186  TFileStat *fs = 0;
187  TDSetElement *e = 0;
188  Int_t nn = 0;
189  Printf("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
190  Printf("+++ TFileNode: %s +++", fNodeName.Data());
191  Printf("+++ Evts: %lld (total: %lld) ", fProcessed, fEvents);
192  Printf("+++ Worker count: int:%d, ext: %d, tot:%d ", fMySlaveCnt, fExtSlaveCnt, fRunSlaveCnt);
193  Printf("+++ Files: %d ", fFiles ? fFiles->GetSize() : 0);
194  if (fFiles && fFiles->GetSize() > 0) {
195  TIter nxf(fFiles);
196  while ((fs = (TFileStat *) nxf())) {
197  if ((e = fs->GetElement())) {
198  Printf("+++ #%d: %s %lld - %lld (%lld) - next: %lld ", ++nn, e->GetName(),
199  e->GetFirst(), e->GetFirst() + e->GetNum() - 1, e->GetNum(), fs->GetNextEntry());
200  } else {
201  Printf("+++ #%d: no element! ", ++nn);
202  }
203  }
204  }
205  Printf("+++ Active files: %d ", fActFiles ? fActFiles->GetSize() : 0);
206  if (fActFiles && fActFiles->GetSize() > 0) {
207  TIter nxaf(fActFiles);
208  while ((fs = (TFileStat *) nxaf())) {
209  if ((e = fs->GetElement())) {
210  Printf("+++ #%d: %s %lld - %lld (%lld) - next: %lld", ++nn, e->GetName(),
211  e->GetFirst(), e->GetFirst() + e->GetNum() - 1, e->GetNum(), fs->GetNextEntry());
212  } else {
213  Printf("+++ #%d: no element! ", ++nn);
214  }
215  }
216  }
217  Printf("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
218  }
219 
220  void Add(TDSetElement *elem, Bool_t tolist)
221  {
222  TList *files = tolist ? (TList *)fFilesToProcess : (TList *)0;
223  TFileStat *f = new TFileStat(this, elem, files);
224  fFiles->Add(f);
225  if (fUnAllocFileNext == 0) fUnAllocFileNext = fFiles->First();
226  }
227 
228  TFileStat *GetNextUnAlloc()
229  {
230  TObject *next = fUnAllocFileNext;
231 
232  if (next != 0) {
233  // make file active
234  fActFiles->Add(next);
235  if (fActFileNext == 0) fActFileNext = fActFiles->First();
236 
237  // move cursor
238  fUnAllocFileNext = fFiles->After(fUnAllocFileNext);
239  }
240  return (TFileStat *) next;
241  }
242 
243  TFileStat *GetNextActive()
244  {
245  TObject *next = fActFileNext;
246 
247  if (fActFileNext != 0) {
248  fActFileNext = fActFiles->After(fActFileNext);
249  if (fActFileNext == 0) fActFileNext = fActFiles->First();
250  }
251 
252  return (TFileStat *) next;
253  }
254 
255  void RemoveActive(TFileStat *file)
256  {
257  if (fActFileNext == file) fActFileNext = fActFiles->After(file);
258  fActFiles->Remove(file);
259  if (fFilesToProcess) fFilesToProcess->Remove(file);
260  if (fActFileNext == 0) fActFileNext = fActFiles->First();
261  }
262 
263  Int_t Compare(const TObject *other) const
264  {
265  // Must return -1 if this is smaller than obj, 0 if objects are equal
266  // and 1 if this is larger than obj.
267  // smaller means more needing a new worker.
268  // Two cases are considered depending on
269  // relation between harddrive speed and network bandwidth.
270 
271  const TFileNode *obj = dynamic_cast<const TFileNode*>(other);
272  if (!obj) {
273  Error("Compare", "input is not a TPacketizer::TFileNode object");
274  return 0;
275  }
276 
277  // how many more events it has than obj
278 
279  if (fStrategy == 1) {
280  // The default adaptive strategy.
281  Int_t myVal = GetRunSlaveCnt();
282  Int_t otherVal = obj->GetRunSlaveCnt();
283  if (myVal < otherVal) {
284  return -1;
285  } else if (myVal > otherVal) {
286  return 1;
287  } else {
288  // if this has more events to process than obj
289  if ((fEvents - fProcessed) >
290  (obj->GetNEvents() - obj->GetProcessed())) {
291  return -1;
292  } else {
293  return 1;
294  }
295  }
296  } else {
297  Int_t myVal = GetSlaveCnt();
298  Int_t otherVal = obj->GetSlaveCnt();
299  if (myVal < otherVal) {
300  return -1;
301  } else if (myVal > otherVal) {
302  return 1;
303  } else {
304  return 0;
305  }
306  }
307  }
308 
309  void Reset()
310  {
311  fUnAllocFileNext = fFiles->First();
312  fActFiles->Clear();
313  fActFileNext = 0;
314  fExtSlaveCnt = 0;
315  fMySlaveCnt = 0;
316  fRunSlaveCnt = 0;
317  }
318 };
319 
320 
321 TPacketizerAdaptive::TFileNode::TFileNode(const char *name, Int_t strategy, TSortedList *files)
322  : fNodeName(name), fFiles(new TList), fUnAllocFileNext(0),
323  fActFiles(new TList), fActFileNext(0), fMySlaveCnt(0),
324  fExtSlaveCnt(0), fRunSlaveCnt(0), fProcessed(0), fEvents(0),
325  fStrategy(strategy), fFilesToProcess(files)
326 {
327  // Constructor
328 
329  fFiles->SetOwner();
330  fActFiles->SetOwner(kFALSE);
331 }
332 
333 //------------------------------------------------------------------------------
334 
335 class TPacketizerAdaptive::TSlaveStat : public TVirtualPacketizer::TVirtualSlaveStat {
336 
337 friend class TPacketizerAdaptive;
338 
339 private:
340  TFileNode *fFileNode; // corresponding node or 0
341  TFileStat *fCurFile; // file currently being processed
342  TDSetElement *fCurElem; // TDSetElement currently being processed
343  Long64_t fCurProcessed; // events processed in the current file
344  Float_t fCurProcTime; // proc time spent on the current file
345  TList *fDSubSet; // packets processed by this worker
346 
347 public:
348  TSlaveStat(TSlave *slave);
349  ~TSlaveStat();
350  TFileNode *GetFileNode() const { return fFileNode; }
351  Long64_t GetEntriesProcessed() const { return fStatus?fStatus->GetEntries():-1; }
352  Double_t GetProcTime() const { return fStatus?fStatus->GetProcTime():-1; }
353  TFileStat *GetCurFile() { return fCurFile; }
354  void SetFileNode(TFileNode *node) { fFileNode = node; }
355  void UpdateRates(TProofProgressStatus *st);
356  Float_t GetAvgRate() { return fStatus->GetRate(); }
357  Float_t GetCurRate() {
358  return (fCurProcTime?fCurProcessed/fCurProcTime:0); }
359  Int_t GetLocalEventsLeft() {
360  return fFileNode?(fFileNode->GetEventsLeftPerSlave()):0; }
361  TList *GetProcessedSubSet() { return fDSubSet; }
364 };
365 
366 ////////////////////////////////////////////////////////////////////////////////
367 /// Constructor
368 
369 TPacketizerAdaptive::TSlaveStat::TSlaveStat(TSlave *slave)
370  : fFileNode(0), fCurFile(0), fCurElem(0),
371  fCurProcessed(0), fCurProcTime(0)
372 {
373  fDSubSet = new TList();
374  fDSubSet->SetOwner();
375  fSlave = slave;
376  fStatus = new TProofProgressStatus();
377  // The slave name is a special one in PROOF-Lite: avoid blocking on the DNS
378  // for non existing names
379  fWrkFQDN = slave->GetName();
380  if (strcmp(slave->ClassName(), "TSlaveLite")) {
381  fWrkFQDN = TUrl(fWrkFQDN).GetHostFQDN();
382  // Get full name for local hosts
383  if (fWrkFQDN.Contains("localhost") || fWrkFQDN == "127.0.0.1")
384  fWrkFQDN = TUrl(gSystem->HostName()).GetHostFQDN();
385  }
386  PDB(kPacketizer, 2)
387  Info("TSlaveStat", "wrk FQDN: %s", fWrkFQDN.Data());
388 }
389 
390 ////////////////////////////////////////////////////////////////////////////////
391 /// Cleanup
392 
393 TPacketizerAdaptive::TSlaveStat::~TSlaveStat()
394 {
395  SafeDelete(fDSubSet);
396  SafeDelete(fStatus);
397 }
398 
399 ////////////////////////////////////////////////////////////////////////////////
400 /// Update packetizer rates
401 
402 void TPacketizerAdaptive::TSlaveStat::UpdateRates(TProofProgressStatus *st)
403 {
404  if (!st) {
405  Error("UpdateRates", "no status object!");
406  return;
407  }
408  if (fCurFile->IsDone()) {
409  fCurProcTime = 0;
410  fCurProcessed = 0;
411  } else {
412  fCurProcTime += st->GetProcTime() - GetProcTime();
413  fCurProcessed += st->GetEntries() - GetEntriesProcessed();
414  }
415  fCurFile->GetNode()->IncProcessed(st->GetEntries() - GetEntriesProcessed());
416  st->SetLastEntries(st->GetEntries() - fStatus->GetEntries());
417  SafeDelete(fStatus);
418  fStatus = st;
419 }
420 
421 ////////////////////////////////////////////////////////////////////////////////
422 /// Add the current element to the fDSubSet (subset processed by this worker)
423 /// and if the status arg is given, then change the size of the packet.
424 /// return the difference (*st - *fStatus)
425 
426 TProofProgressStatus *TPacketizerAdaptive::TSlaveStat::AddProcessed(TProofProgressStatus *st)
427 {
428  if (st && fDSubSet && fCurElem) {
429  if (fCurElem->GetNum() != st->GetEntries() - GetEntriesProcessed())
430  fCurElem->SetNum(st->GetEntries() - GetEntriesProcessed());
431  fDSubSet->Add(fCurElem);
432  TProofProgressStatus *diff = new TProofProgressStatus(*st - *fStatus);
433  return diff;
434  } else {
435  Error("AddProcessed", "processed subset of current elem undefined");
436  return 0;
437  }
438 }
439 
440 //------------------------------------------------------------------------------
441 
443 
444 ////////////////////////////////////////////////////////////////////////////////
445 /// Constructor
446 
448  Long64_t first, Long64_t num,
449  TList *input, TProofProgressStatus *st)
450  : TVirtualPacketizer(input, st)
451 {
452  PDB(kPacketizer,1) Info("TPacketizerAdaptive",
453  "enter (first %lld, num %lld)", first, num);
454 
455  // Init pointer members
456  fSlaveStats = 0;
457  fUnAllocated = 0;
458  fActive = 0;
459  fFileNodes = 0;
460  fMaxPerfIdx = 1;
462  fMaxEntriesRatio = 2.;
463 
464  fMaxSlaveCnt = -1;
465  fPacketAsAFraction = 4;
466  fStrategy = 1;
467  fFilesToProcess = new TSortedList;
468  fFilesToProcess->SetOwner(kFALSE);
469 
470  if (!fProgressStatus) {
471  Error("TPacketizerAdaptive", "No progress status");
472  return;
473  }
474 
475  // Attempt to synchronize the packet size with the tree cache size
476  Int_t cpsync = -1;
477  if (TProof::GetParameter(input, "PROOF_PacketizerCachePacketSync", cpsync) != 0) {
478  // Check if there is a global cache-packet sync setting
479  cpsync = gEnv->GetValue("Packetizer.CachePacketSync", 1);
480  }
481  if (cpsync >= 0) fCachePacketSync = (cpsync > 0) ? kTRUE : kFALSE;
482 
483  // Max file entries to avg allowed ratio for cache-to-packet synchronization
484  // (applies only if fCachePacketSync is true; -1. disables the bound)
485  if (TProof::GetParameter(input, "PROOF_PacketizerMaxEntriesRatio", fMaxEntriesRatio) != 0) {
486  // Check if there is a global ratio setting
487  fMaxEntriesRatio = gEnv->GetValue("Packetizer.MaxEntriesRatio", 2.);
488  }
489 
490  // The possibility to change packetizer strategy to the basic TPacketizer's
491  // one (in which workers always process their local data first).
492  Int_t strategy = -1;
493  if (TProof::GetParameter(input, "PROOF_PacketizerStrategy", strategy) != 0) {
494  // Check if there is a global strategy setting
495  strategy = gEnv->GetValue("Packetizer.Strategy", 1);
496  }
497  if (strategy == 0) {
498  fStrategy = 0;
499  Info("TPacketizerAdaptive", "using the basic strategy of TPacketizer");
500  } else if (strategy != 1) {
501  Warning("TPacketizerAdaptive", "unsupported strategy index (%d): ignore", strategy);
502  }
503 
504  Long_t maxSlaveCnt = 0;
505  if (TProof::GetParameter(input, "PROOF_MaxSlavesPerNode", maxSlaveCnt) == 0) {
506  if (maxSlaveCnt < 0) {
507  Info("TPacketizerAdaptive",
508  "The value of PROOF_MaxSlavesPerNode must be positive");
509  maxSlaveCnt = 0;
510  }
511  } else {
512  // Try also with Int_t (recently supported in TProof::SetParameter)
513  Int_t mxslcnt = -1;
514  if (TProof::GetParameter(input, "PROOF_MaxSlavesPerNode", mxslcnt) == 0) {
515  if (mxslcnt < 0) {
516  Info("TPacketizerAdaptive",
517  "The value of PROOF_MaxSlavesPerNode must be positive");
518  mxslcnt = 0;
519  }
520  maxSlaveCnt = (Long_t) mxslcnt;
521  }
522  }
523 
524  if (!maxSlaveCnt)
525  maxSlaveCnt = gEnv->GetValue("Packetizer.MaxWorkersPerNode", 0);
526  if (maxSlaveCnt > 0) {
527  fMaxSlaveCnt = maxSlaveCnt;
528  Info("TPacketizerAdaptive", "Setting max number of workers per node to %ld",
529  fMaxSlaveCnt);
530  }
531 
532  // if forceLocal parameter is set to 1 then eliminate the cross-worker
533  // processing;
534  // This minimizes the network usage on the PROOF cluser at the expense of
535  // longer jobs processing times.
536  // To process successfully the session must have workers with all the data!
538  Int_t forceLocal = 0;
539  if (TProof::GetParameter(input, "PROOF_ForceLocal", forceLocal) == 0) {
540  if (forceLocal == 1)
541  fForceLocal = kTRUE;
542  else
543  Info("TPacketizerAdaptive",
544  "The only accepted value of PROOF_ForceLocal parameter is 1 !");
545  }
546 
547  // Below we provide a possibility to change the way packet size is
548  // calculated or define the packet time directly.
549  // fPacketAsAFraction can be interpreted as follows:
550  // packet time is (expected job proc. time) / fPacketSizeAsAFraction.
551  // It substitutes 20 in the old formula to calculate the fPacketSize:
552  // fPacketSize = fTotalEntries / (20 * nslaves)
553  Int_t packetAsAFraction = 0;
554  if (TProof::GetParameter(input, "PROOF_PacketAsAFraction", packetAsAFraction) == 0) {
555  if (packetAsAFraction > 0) {
556  fPacketAsAFraction = packetAsAFraction;
557  Info("TPacketizerAdaptive",
558  "using alternate fraction of query time as a packet size: %d",
559  packetAsAFraction);
560  } else
561  Info("TPacketizerAdaptive", "packetAsAFraction parameter must be higher than 0");
562  }
563 
564  // Packet re-assignement
565  fTryReassign = 0;
566  Int_t tryReassign = 0;
567  if (TProof::GetParameter(input, "PROOF_TryReassign", tryReassign) != 0)
568  tryReassign = gEnv->GetValue("Packetizer.TryReassign", 0);
569  fTryReassign = tryReassign;
570  if (fTryReassign != 0)
571  Info("TPacketizerAdaptive", "failed packets will be re-assigned");
572 
573  // Save the config parameters in the dedicated list so that they will be saved
574  // in the outputlist and therefore in the relevant TQueryResult
575  fConfigParams->Add(new TParameter<Int_t>("PROOF_PacketizerCachePacketSync", (Int_t)fCachePacketSync));
576  fConfigParams->Add(new TParameter<Double_t>("PROOF_PacketizerMaxEntriesRatio", fMaxEntriesRatio));
577  fConfigParams->Add(new TParameter<Int_t>("PROOF_PacketizerStrategy", fStrategy));
578  fConfigParams->Add(new TParameter<Int_t>("PROOF_MaxWorkersPerNode", (Int_t)fMaxSlaveCnt));
579  fConfigParams->Add(new TParameter<Int_t>("PROOF_ForceLocal", (Int_t)fForceLocal));
580  fConfigParams->Add(new TParameter<Int_t>("PROOF_PacketAsAFraction", fPacketAsAFraction));
581 
582  Double_t baseLocalPreference = 1.2;
583  fBaseLocalPreference = (Float_t)baseLocalPreference;
584  if (TProof::GetParameter(input, "PROOF_BaseLocalPreference", baseLocalPreference) == 0)
585  fBaseLocalPreference = (Float_t)baseLocalPreference;
586 
587  fFileNodes = new TList;
588  fFileNodes->SetOwner();
589  fUnAllocated = new TList;
591  fActive = new TList;
593 
594  fValid = kTRUE;
595 
596  // Resolve end-point urls to optmize distribution
597  // dset->Lookup(); // moved to TProofPlayerRemote::Process
598 
599  // Read list of mounted disks
600  TObjArray *partitions = 0;
601  TString partitionsStr;
602  if (TProof::GetParameter(input, "PROOF_PacketizerPartitions", partitionsStr) != 0)
603  partitionsStr = gEnv->GetValue("Packetizer.Partitions", "");
604  if (!partitionsStr.IsNull()) {
605  Info("TPacketizerAdaptive", "Partitions: %s", partitionsStr.Data());
606  partitions = partitionsStr.Tokenize(",");
607  }
608 
609  // Split into per host and disk entries
610  dset->Reset();
611  TDSetElement *e;
612  while ((e = (TDSetElement*)dset->Next())) {
613 
614  if (e->GetValid()) continue;
615 
616  // The dataset name, if any
617  if (fDataSet.IsNull() && e->GetDataSet() && strlen(e->GetDataSet()))
618  fDataSet = e->GetDataSet();
619 
620  TUrl url = e->GetFileName();
621  PDB(kPacketizer,2)
622  Info("TPacketizerAdaptive", "element name: %s (url: %s)", e->GetFileName(), url.GetUrl());
623 
624  // Map non URL filenames to dummy host
625  TString host;
626  if ( !url.IsValid() ||
627  (strncmp(url.GetProtocol(),"root", 4) &&
628  strncmp(url.GetProtocol(),"rfio", 4) &&
629  strncmp(url.GetProtocol(),"file", 4)) ) {
630  host = "no-host";
631  } else if ( url.IsValid() && !strncmp(url.GetProtocol(),"file", 4)) {
632  host = "localhost";
633  url.SetProtocol("root");
634  } else {
635  host = url.GetHostFQDN();
636  }
637  // Get full name for local hosts
638  if (host.Contains("localhost") || host == "127.0.0.1") {
639  url.SetHost(gSystem->HostName());
640  host = url.GetHostFQDN();
641  }
642 
643  // Find on which disk is the file, if any
644  TString disk;
645  if (partitions) {
646  TIter iString(partitions);
647  TObjString* os = 0;
648  while ((os = (TObjString *)iString())) {
649  // Compare begining of the url with disk mountpoint
650  if (strncmp(url.GetFile(), os->GetName(), os->GetString().Length()) == 0) {
651  disk = os->GetName();
652  break;
653  }
654  }
655  }
656  // Node's url
657  TString nodeStr;
658  if (disk.IsNull())
659  nodeStr.Form("%s://%s", url.GetProtocol(), host.Data());
660  else
661  nodeStr.Form("%s://%s/%s", url.GetProtocol(), host.Data(), disk.Data());
662  TFileNode *node = (TFileNode *) fFileNodes->FindObject(nodeStr);
663 
664  if (node == 0) {
665  node = new TFileNode(nodeStr, fStrategy, fFilesToProcess);
666  fFileNodes->Add(node);
667  PDB(kPacketizer,2)
668  Info("TPacketizerAdaptive", "creating new node '%s' or the element", nodeStr.Data());
669  } else {
670  PDB(kPacketizer,2)
671  Info("TPacketizerAdaptive", "adding element to existing node '%s'", nodeStr.Data());
672  }
673 
674  node->Add(e, kFALSE);
675  }
676 
677  fSlaveStats = new TMap;
679 
680  TSlave *slave;
681  TIter si(slaves);
682  while ((slave = (TSlave*) si.Next())) {
683  fSlaveStats->Add( slave, new TSlaveStat(slave) );
684  fMaxPerfIdx = slave->GetPerfIdx() > fMaxPerfIdx ?
685  slave->GetPerfIdx() : fMaxPerfIdx;
686  }
687 
688  // Setup file & filenode structure
689  Reset();
690  // Optimize the number of files to be open when running on subsample
691  Int_t validateMode = 0;
692  Int_t gprc = TProof::GetParameter(input, "PROOF_ValidateByFile", validateMode);
693  Bool_t byfile = (gprc == 0 && validateMode > 0 && num > -1) ? kTRUE : kFALSE;
694  if (num > -1)
695  PDB(kPacketizer,2)
696  Info("TPacketizerAdaptive",
697  "processing subset of entries: validating by file? %s", byfile ? "yes": "no");
698  ValidateFiles(dset, slaves, num, byfile);
699 
700 
701  if (!fValid) return;
702 
703  // apply global range (first,num) to dset and rebuild structure
704  // ommitting TDSet elements that are not needed
705 
706  Int_t files = 0;
707  fTotalEntries = 0;
708  fUnAllocated->Clear(); // avoid dangling pointers
709  fActive->Clear();
710  fFileNodes->Clear(); // then delete all objects
711  PDB(kPacketizer,2)
712  Info("TPacketizerAdaptive",
713  "processing range: first %lld, num %lld", first, num);
714 
715  dset->Reset();
716  Long64_t cur = 0;
717  while (( e = (TDSetElement*)dset->Next())) {
718 
719  // Skip invalid or missing file; It will be moved
720  // from the dset to the 'MissingFiles' list in the player.
721  if (!e->GetValid()) continue;
722 
723  TUrl url = e->GetFileName();
724  Long64_t eFirst = e->GetFirst();
725  Long64_t eNum = e->GetNum();
726  PDB(kPacketizer,2)
727  Info("TPacketizerAdaptive", "processing element '%s'", e->GetFileName());
728  PDB(kPacketizer,2)
729  Info("TPacketizerAdaptive",
730  " --> first %lld, elenum %lld (cur %lld) (entrylist: %p)", eFirst, eNum, cur, e->GetEntryList());
731 
732  if (!e->GetEntryList()) {
733  // This element is before the start of the global range, skip it
734  if (cur + eNum < first) {
735  cur += eNum;
736  PDB(kPacketizer,2)
737  Info("TPacketizerAdaptive", " --> skip element cur %lld", cur);
738  continue;
739  }
740 
741  // This element is after the end of the global range, skip it
742  if (num != -1 && (first+num <= cur)) {
743  cur += eNum;
744  PDB(kPacketizer,2)
745  Info("TPacketizerAdaptive", " --> drop element cur %lld", cur);
746  continue; // break ??
747  }
748 
749  Bool_t inRange = kFALSE;
750  if (cur <= first || (num != -1 && (first+num <= cur+eNum))) {
751 
752  if (cur <= first) {
753  // If this element contains the start of the global range
754  // adjust its start and number of entries
755  e->SetFirst( eFirst + (first - cur) );
756  e->SetNum( e->GetNum() - (first - cur) );
757  PDB(kPacketizer,2)
758  Info("TPacketizerAdaptive", " --> adjust start %lld and end %lld",
759  eFirst + (first - cur), first + num - cur);
760  inRange = kTRUE;
761  }
762  if (num != -1 && (first+num <= cur+eNum)) {
763  // If this element contains the end of the global range
764  // adjust its number of entries
765  e->SetNum( first + num - e->GetFirst() - cur );
766  PDB(kPacketizer,2)
767  Info("TPacketizerAdaptive", " --> adjust end %lld", first + num - cur);
768  inRange = kTRUE;
769  }
770 
771  } else {
772  // Increment the counter ...
773  PDB(kPacketizer,2)
774  Info("TPacketizerAdaptive", " --> increment 'cur' by %lld", eNum);
775  cur += eNum;
776  }
777  // Re-adjust eNum and cur, if needed
778  if (inRange) {
779  cur += eNum;
780  eNum = e->GetNum();
781  }
782 
783  } else {
784  TEntryList *enl = dynamic_cast<TEntryList *>(e->GetEntryList());
785  if (enl) {
786  eNum = enl->GetN();
787  PDB(kPacketizer,2)
788  Info("TPacketizerAdaptive", " --> entry-list element: %lld entries", eNum);
789  } else {
790  TEventList *evl = dynamic_cast<TEventList *>(e->GetEntryList());
791  eNum = evl ? evl->GetN() : eNum;
792  PDB(kPacketizer,2)
793  Info("TPacketizerAdaptive", " --> event-list element: %lld entries (evl:%p)", eNum, evl);
794  }
795  if (!eNum) {
796  PDB(kPacketizer,2)
797  Info("TPacketizerAdaptive", " --> empty entry- or event-list element!");
798  continue;
799  }
800  }
801  PDB(kPacketizer,2)
802  Info("TPacketizerAdaptive", " --> next cur %lld", cur);
803 
804  // Map non URL filenames to dummy host
805  TString host;
806  if ( !url.IsValid() ||
807  (strncmp(url.GetProtocol(),"root", 4) &&
808  strncmp(url.GetProtocol(),"rfio", 4) &&
809  strncmp(url.GetProtocol(),"file", 4)) ) {
810  host = "no-host";
811  } else if ( url.IsValid() && !strncmp(url.GetProtocol(),"file", 4)) {
812  host = "localhost";
813  url.SetProtocol("root");
814  } else {
815  host = url.GetHostFQDN();
816  }
817  // Get full name for local hosts
818  if (host.Contains("localhost") || host == "127.0.0.1") {
819  url.SetHost(gSystem->HostName());
820  host = url.GetHostFQDN();
821  }
822 
823  // Find, on which disk is the file
824  TString disk;
825  if (partitions) {
826  TIter iString(partitions);
827  TObjString* os = 0;
828  while ((os = (TObjString *)iString())) {
829  // Compare begining of the url with disk mountpoint
830  if (strncmp(url.GetFile(), os->GetName(), os->GetString().Length()) == 0) {
831  disk = os->GetName();
832  break;
833  }
834  }
835  }
836  // Node's url
837  TString nodeStr;
838  if (disk.IsNull())
839  nodeStr.Form("%s://%s", url.GetProtocol(), host.Data());
840  else
841  nodeStr.Form("%s://%s/%s", url.GetProtocol(), host.Data(), disk.Data());
842  TFileNode *node = (TFileNode*) fFileNodes->FindObject(nodeStr);
843 
844 
845  if (node == 0) {
846  node = new TFileNode(nodeStr, fStrategy, fFilesToProcess);
847  fFileNodes->Add( node );
848  PDB(kPacketizer, 2)
849  Info("TPacketizerAdaptive", " --> creating new node '%s' for element", nodeStr.Data());
850  } else {
851  PDB(kPacketizer, 2)
852  Info("TPacketizerAdaptive", " --> adding element to exiting node '%s'", nodeStr.Data());
853  }
854 
855  ++files;
856  fTotalEntries += eNum;
857  node->Add(e, kTRUE);
858  node->IncEvents(eNum);
859  PDB(kPacketizer,2) e->Print("a");
860  }
861  PDB(kPacketizer,1)
862  Info("TPacketizerAdaptive", "processing %lld entries in %d files on %d hosts",
863  fTotalEntries, files, fFileNodes->GetSize());
864 
865  // Set the total number for monitoring
866  if (gPerfStats)
867  gPerfStats->SetNumEvents(fTotalEntries);
868 
869  Reset();
870 
871  InitStats();
872 
873  if (!fValid)
875 
876  PDB(kPacketizer,1) Info("TPacketizerAdaptive", "return");
877 }
878 
879 ////////////////////////////////////////////////////////////////////////////////
880 /// Destructor.
881 
883 {
884  if (fSlaveStats) {
886  }
887 
892  SafeDelete(fFilesToProcess);
893 }
894 
895 ////////////////////////////////////////////////////////////////////////////////
896 /// (re)initialise the statistics
897 /// called at the begining or after a worker dies.
898 
900 {
901  // calculating how many files from TDSet are not cached on
902  // any slave
903  Int_t noRemoteFiles = 0;
904  fNEventsOnRemLoc = 0;
905  Int_t totalNumberOfFiles = 0;
907  while (TFileNode *fn = (TFileNode*)next()) {
908  totalNumberOfFiles += fn->GetNumberOfFiles();
909  if (fn->GetMySlaveCnt() == 0) {
910  noRemoteFiles += fn->GetNumberOfFiles();
911  fNEventsOnRemLoc += (fn->GetNEvents() - fn->GetProcessed());
912  }
913  }
914 
915  if (totalNumberOfFiles == 0) {
916  Info("InitStats", "no valid or non-empty file found: setting invalid");
917  // No valid files: set invalid and return
918  fValid = kFALSE;
919  return;
920  }
921 
922  fFractionOfRemoteFiles = (1.0 * noRemoteFiles) / totalNumberOfFiles;
923  Info("InitStats",
924  "fraction of remote files %f", fFractionOfRemoteFiles);
925 
926  if (!fValid)
928 
929  PDB(kPacketizer,1) Info("InitStats", "return");
930 }
931 
932 ////////////////////////////////////////////////////////////////////////////////
933 /// Get next unallocated file from 'node' or other nodes:
934 /// First try 'node'. If there is no more files, keep trying to
935 /// find an unallocated file on other nodes.
936 
937 TPacketizerAdaptive::TFileStat *TPacketizerAdaptive::GetNextUnAlloc(TFileNode *node, const char *nodeHostName)
938 {
939  TFileStat *file = 0;
940 
941  if (node != 0) {
942  PDB(kPacketizer, 2)
943  Info("GetNextUnAlloc", "looking for file on node %s", node->GetName());
944  file = node->GetNextUnAlloc();
945  if (file == 0) RemoveUnAllocNode(node);
946  } else {
947  if (nodeHostName && strlen(nodeHostName) > 0) {
948 
949  TFileNode *fn;
950  // Make sure that they are in the corrected order
951  fUnAllocated->Sort();
952  PDB(kPacketizer,2) fUnAllocated->Print();
953 
954  // Loop over unallocated fileNode list
955  for (int i = 0; i < fUnAllocated->GetSize(); i++) {
956 
957  if ((fn = (TFileNode *) fUnAllocated->At(i))) {
958  TUrl uu(fn->GetName());
959  PDB(kPacketizer, 2)
960  Info("GetNextUnAlloc", "comparing %s with %s...", nodeHostName, uu.GetHost());
961 
962  // Check, whether node's hostname is matching with current fileNode (fn)
963  if (!strcmp(nodeHostName, uu.GetHost())) {
964  node = fn;
965 
966  // Fetch next unallocated file from this node
967  if ((file = node->GetNextUnAlloc()) == 0) {
968  RemoveUnAllocNode(node);
969  node = 0;
970  } else {
971  PDB(kPacketizer, 2)
972  Info("GetNextUnAlloc", "found! (host: %s)", uu.GetHost());
973  break;
974  }
975  }
976  } else {
977  Warning("GetNextUnAlloc", "unallocate entry %d is empty!", i);
978  }
979  }
980 
981  if (node != 0 && fMaxSlaveCnt > 0 && node->GetExtSlaveCnt() >= fMaxSlaveCnt) {
982  // Unlike in TPacketizer we look at the number of ext slaves only.
983  PDB(kPacketizer,1)
984  Info("GetNextUnAlloc", "reached Workers-per-Node Limit (%ld)", fMaxSlaveCnt);
985  node = 0;
986  }
987  }
988 
989  if (node == 0) {
990  while (file == 0 && ((node = NextNode()) != 0)) {
991  PDB(kPacketizer, 2)
992  Info("GetNextUnAlloc", "looking for file on node %s", node->GetName());
993  if ((file = node->GetNextUnAlloc()) == 0) RemoveUnAllocNode(node);
994  }
995  }
996  }
997 
998  if (file != 0) {
999  // if needed make node active
1000  if (fActive->FindObject(node) == 0) {
1001  fActive->Add(node);
1002  }
1003  }
1004 
1005  PDB(kPacketizer, 2) {
1006  if (!file) {
1007  Info("GetNextUnAlloc", "no file found!");
1008  } else {
1009  file->Print();
1010  }
1011  }
1012 
1013  return file;
1014 }
1015 
1016 ////////////////////////////////////////////////////////////////////////////////
1017 /// Get next node which has unallocated files.
1018 /// the order is determined by TFileNode::Compare
1019 
1020 TPacketizerAdaptive::TFileNode *TPacketizerAdaptive::NextNode()
1021 {
1022  fUnAllocated->Sort();
1023  PDB(kPacketizer,2) {
1024  fUnAllocated->Print();
1025  }
1026 
1027  TFileNode *fn = (TFileNode*) fUnAllocated->First();
1028  if (fn != 0 && fMaxSlaveCnt > 0 && fn->GetExtSlaveCnt() >= fMaxSlaveCnt) {
1029  // unlike in TPacketizer we look at the number of ext slaves only.
1030  PDB(kPacketizer,1)
1031  Info("NextNode", "reached Workers-per-Node Limit (%ld)", fMaxSlaveCnt);
1032  fn = 0;
1033  }
1034 
1035  return fn;
1036 }
1037 
1038 ////////////////////////////////////////////////////////////////////////////////
1039 /// Remove unallocated node.
1040 
1042 {
1043  fUnAllocated->Remove(node);
1044 }
1045 
1046 ////////////////////////////////////////////////////////////////////////////////
1047 /// Get next active file.
1048 
1049 TPacketizerAdaptive::TFileStat *TPacketizerAdaptive::GetNextActive()
1050 {
1051  TFileNode *node;
1052  TFileStat *file = 0;
1053 
1054  while (file == 0 && ((node = NextActiveNode()) != 0)) {
1055  file = node->GetNextActive();
1056  if (file == 0) RemoveActiveNode(node);
1057  }
1058 
1059  return file;
1060 }
1061 
1062 
1063 ////////////////////////////////////////////////////////////////////////////////
1064 /// Get next active node.
1065 
1066 TPacketizerAdaptive::TFileNode *TPacketizerAdaptive::NextActiveNode()
1067 {
1068  fActive->Sort();
1069  PDB(kPacketizer,2) {
1070  Info("NextActiveNode", "enter");
1071  fActive->Print();
1072  }
1073 
1074  TFileNode *fn = (TFileNode*) fActive->First();
1075  // look at only ext slaves
1076  if (fn != 0 && fMaxSlaveCnt > 0 && fn->GetExtSlaveCnt() >= fMaxSlaveCnt) {
1077  PDB(kPacketizer,1)
1078  Info("NextActiveNode","reached Workers-per-Node limit (%ld)", fMaxSlaveCnt);
1079  fn = 0;
1080  }
1081 
1082  return fn;
1083 }
1084 
1085 ////////////////////////////////////////////////////////////////////////////////
1086 /// Remove file from the list of actives.
1087 
1089 {
1090  TFileNode *node = file->GetNode();
1091 
1092  node->RemoveActive(file);
1093  if (node->GetNumberOfActiveFiles() == 0) RemoveActiveNode(node);
1094 }
1095 
1096 ////////////////////////////////////////////////////////////////////////////////
1097 /// Remove node from the list of actives.
1098 
1100 {
1101  fActive->Remove(node);
1102 }
1103 
1104 ////////////////////////////////////////////////////////////////////////////////
1105 /// Reset the internal data structure for packet distribution.
1106 
1108 {
1109  fUnAllocated->Clear();
1111 
1112  fActive->Clear();
1113 
1114  TIter files(fFileNodes);
1115  TFileNode *fn;
1116  while ((fn = (TFileNode*) files.Next()) != 0) {
1117  fn->Reset();
1118  }
1119 
1120  TIter slaves(fSlaveStats);
1121  TObject *key;
1122  while ((key = slaves.Next()) != 0) {
1123  TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue(key);
1124  if (!slstat) {
1125  Warning("Reset", "TSlaveStat associated to key '%s' is NULL", key->GetName());
1126  continue;
1127  }
1128  // Find out which file nodes are on the worker machine and assign the
1129  // one with less workers assigned
1130  TFileNode *fnmin = 0;
1131  Int_t fncnt = fSlaveStats->GetSize();
1132  files.Reset();
1133  while ((fn = (TFileNode*) files.Next()) != 0) {
1134  if (!strcmp(slstat->GetName(), TUrl(fn->GetName()).GetHost())) {
1135  if (fn->GetMySlaveCnt() < fncnt) {
1136  fnmin = fn;
1137  fncnt = fn->GetMySlaveCnt();
1138  }
1139  }
1140  }
1141  if (fnmin != 0 ) {
1142  slstat->SetFileNode(fnmin);
1143  fnmin->IncMySlaveCnt();
1144  PDB(kPacketizer, 2)
1145  Info("Reset","assigning node '%s' to '%s' (cnt: %d)",
1146  fnmin->GetName(), slstat->GetName(), fnmin->GetMySlaveCnt());
1147  }
1148  slstat->fCurFile = 0;
1149  }
1150 }
1151 
1152 ////////////////////////////////////////////////////////////////////////////////
1153 /// Check existence of file/dir/tree an get number of entries.
1154 /// Assumes the files have been setup.
1155 
1157  Long64_t maxent, Bool_t byfile)
1158 {
1159  TMap slaves_by_sock;
1160  TMonitor mon;
1161  TList workers;
1162 
1163 
1164  // Setup the communication infrastructure
1165 
1166  workers.AddAll(slaves);
1167  TIter si(slaves);
1168  TSlave *slm;
1169  while ((slm = (TSlave*)si.Next()) != 0) {
1170  PDB(kPacketizer,3)
1171  Info("ValidateFiles","socket added to monitor: %p (%s)",
1172  slm->GetSocket(), slm->GetName());
1173  mon.Add(slm->GetSocket());
1174  slaves_by_sock.Add(slm->GetSocket(), slm);
1175  }
1176 
1177  mon.DeActivateAll();
1178 
1179  ((TProof*)gProof)->DeActivateAsyncInput();
1180 
1181  // Some monitoring systems (TXSocketHandler) need to know this
1182  ((TProof*)gProof)->fCurrentMonitor = &mon;
1183 
1184  // Identify the type
1185  if (!strcmp(dset->GetType(), "TTree")) SetBit(TVirtualPacketizer::kIsTree);
1186 
1187  // Preparing for client notification
1188  TString msg("Validating files");
1189  UInt_t n = 0;
1190  UInt_t tot = dset->GetListOfElements()->GetSize();
1191  Bool_t st = kTRUE;
1192 
1193  Long64_t totent = 0, nopenf = 0;
1194  while (kTRUE) {
1195 
1196  // send work
1197  while (TSlave *s = (TSlave *)workers.First()) {
1198 
1199  workers.Remove(s);
1200 
1201  // find a file
1202 
1203  TSlaveStat *slstat = (TSlaveStat*)fSlaveStats->GetValue(s);
1204  if (!slstat) {
1205  Error("ValidateFiles", "TSlaveStat associated to slave '%s' is NULL", s->GetName());
1206  continue;
1207  }
1208 
1209  TFileNode *node = 0;
1210  TFileStat *file = 0;
1211 
1212  // try its own node first
1213  if ((node = slstat->GetFileNode()) != 0) {
1214  PDB(kPacketizer,3) node->Print();
1215  file = GetNextUnAlloc(node);
1216  if (file == 0)
1217  slstat->SetFileNode(0);
1218  }
1219 
1220  // look for a file on any other node if necessary
1221  if (file == 0)
1222  file = GetNextUnAlloc();
1223 
1224  if (file != 0) {
1225  // files are done right away
1226  RemoveActive(file);
1227 
1228  slstat->fCurFile = file;
1229  TDSetElement *elem = file->GetElement();
1230  Long64_t entries = elem->GetEntries(kTRUE, kFALSE);
1231  if (entries < 0 || strlen(elem->GetTitle()) <= 0) {
1232  // This is decremented when we get the reply
1233  file->GetNode()->IncExtSlaveCnt(slstat->GetName());
1235  m << dset->IsTree()
1236  << TString(elem->GetFileName())
1237  << TString(elem->GetDirectory())
1238  << TString(elem->GetObjName());
1239 
1240  s->GetSocket()->Send( m );
1241  mon.Activate(s->GetSocket());
1242  PDB(kPacketizer,2)
1243  Info("ValidateFiles",
1244  "sent to worker-%s (%s) via %p GETENTRIES on %s %s %s %s",
1245  s->GetOrdinal(), s->GetName(), s->GetSocket(),
1246  dset->IsTree() ? "tree" : "objects", elem->GetFileName(),
1247  elem->GetDirectory(), elem->GetObjName());
1248  } else {
1249  // Fill the info
1250  elem->SetTDSetOffset(entries);
1251  if (entries > 0) {
1252  // Most likely valid
1253  elem->SetValid();
1254  if (!elem->GetEntryList()) {
1255  if (elem->GetFirst() > entries) {
1256  Error("ValidateFiles",
1257  "first (%lld) higher then number of entries (%lld) in %s",
1258  elem->GetFirst(), entries, elem->GetFileName());
1259  // disable element
1260  slstat->fCurFile->SetDone();
1261  elem->Invalidate();
1262  dset->SetBit(TDSet::kSomeInvalid);
1263  }
1264  if (elem->GetNum() == -1) {
1265  elem->SetNum(entries - elem->GetFirst());
1266  } else if (elem->GetFirst() + elem->GetNum() > entries) {
1267  Warning("ValidateFiles", "num (%lld) + first (%lld) larger then number of"
1268  " keys/entries (%lld) in %s", elem->GetNum(), elem->GetFirst(),
1269  entries, elem->GetFileName());
1270  elem->SetNum(entries - elem->GetFirst());
1271  }
1272  PDB(kPacketizer,2)
1273  Info("ValidateFiles",
1274  "found elem '%s' with %lld entries", elem->GetFileName(), entries);
1275  }
1276  }
1277  // Count
1278  totent += entries;
1279  nopenf++;
1280  // Notify the client
1281  n++;
1282  gProof->SendDataSetStatus(msg, n, tot, st);
1283 
1284  // This worker is ready for the next validation
1285  workers.Add(s);
1286  }
1287  }
1288  }
1289 
1290  // Check if there is anything to wait for
1291  if (mon.GetActive() == 0) {
1292  if (byfile && maxent > 0) {
1293  // How many files do we still need ?
1294  Long64_t nrestf = (maxent - totent) * nopenf / totent ;
1295  if (nrestf <= 0 && maxent > totent) nrestf = 1;
1296  if (nrestf > 0) {
1297  PDB(kPacketizer,3)
1298  Info("ValidateFiles", "{%lld, %lld, %lld}: needs to validate %lld more files",
1299  maxent, totent, nopenf, nrestf);
1300  si.Reset();
1301  while ((slm = (TSlave *) si.Next()) && nrestf--) {
1302  workers.Add(slm);
1303  }
1304  continue;
1305  } else {
1306  PDB(kPacketizer,3)
1307  Info("ValidateFiles", "no need to validate more files");
1308  break;
1309  }
1310  } else {
1311  break;
1312  }
1313  }
1314 
1315  PDB(kPacketizer,3) {
1316  Info("ValidateFiles", "waiting for %d slaves:", mon.GetActive());
1317  TList *act = mon.GetListOfActives();
1318  TIter next(act);
1319  while (TSocket *s = (TSocket*) next()) {
1320  TSlave *sl = (TSlave *) slaves_by_sock.GetValue(s);
1321  if (sl)
1322  Info("ValidateFiles", " worker-%s (%s)",
1323  sl->GetOrdinal(), sl->GetName());
1324  }
1325  delete act;
1326  }
1327 
1328  TSocket *sock = mon.Select();
1329  // If we have been interrupted break
1330  if (!sock) {
1331  Error("ValidateFiles", "selection has been interrupted - STOP");
1332  mon.DeActivateAll();
1333  fValid = kFALSE;
1334  break;
1335  }
1336  mon.DeActivate(sock);
1337 
1338  PDB(kPacketizer,3) Info("ValidateFiles", "select returned: %p", sock);
1339 
1340  TSlave *slave = (TSlave *) slaves_by_sock.GetValue( sock );
1341  if (!sock->IsValid()) {
1342  // A socket got invalid during validation
1343  Error("ValidateFiles", "worker-%s (%s) got invalid - STOP",
1344  slave->GetOrdinal(), slave->GetName());
1345  ((TProof*)gProof)->MarkBad(slave, "socket got invalid during validation");
1346  fValid = kFALSE;
1347  break;
1348  }
1349 
1350  TMessage *reply;
1351 
1352  if (sock->Recv(reply) <= 0) {
1353  // Notify
1354  Error("ValidateFiles", "Recv failed! for worker-%s (%s)",
1355  slave->GetOrdinal(), slave->GetName());
1356  // Help! lost a slave? ('slave' is deleted inside here ...)
1357  ((TProof*)gProof)->MarkBad(slave, "receive failed during validation");
1358  fValid = kFALSE;
1359  continue;
1360  }
1361 
1362  if (reply->What() != kPROOF_GETENTRIES) {
1363  // Not what we want: handover processing to the central machinery
1364  Int_t what = reply->What();
1365  ((TProof*)gProof)->HandleInputMessage(slave, reply);
1366  if (what == kPROOF_FATAL) {
1367  Error("ValidateFiles", "kPROOF_FATAL from worker-%s (%s)",
1368  slave->GetOrdinal(), slave->GetName());
1369  fValid = kFALSE;
1370  } else {
1371  // Reactivate the socket
1372  mon.Activate(sock);
1373  }
1374  // Get next message
1375  continue;
1376  }
1377 
1378  TSlaveStat *slavestat = (TSlaveStat*) fSlaveStats->GetValue( slave );
1379  TDSetElement *e = slavestat->fCurFile->GetElement();
1380  slavestat->fCurFile->GetNode()->DecExtSlaveCnt(slavestat->GetName());
1381  Long64_t entries;
1382 
1383  (*reply) >> entries;
1384 
1385  // Extract object name, if there
1386  if ((reply->BufferSize() > reply->Length())) {
1387  TString objname;
1388  (*reply) >> objname;
1389  e->SetTitle(objname);
1390  }
1391 
1392  e->SetTDSetOffset(entries);
1393  if (entries > 0) {
1394 
1395  // This dataset element is most likely valid
1396  e->SetValid();
1397 
1398  if (!e->GetEntryList()) {
1399  if (e->GetFirst() > entries) {
1400  Error("ValidateFiles",
1401  "first (%lld) higher then number of entries (%lld) in %s",
1402  e->GetFirst(), entries, e->GetFileName());
1403 
1404  // Invalidate the element
1405  slavestat->fCurFile->SetDone();
1406  e->Invalidate();
1407  dset->SetBit(TDSet::kSomeInvalid);
1408  }
1409 
1410  if (e->GetNum() == -1) {
1411  e->SetNum(entries - e->GetFirst());
1412  } else if (e->GetFirst() + e->GetNum() > entries) {
1413  Error("ValidateFiles",
1414  "num (%lld) + first (%lld) larger then number of keys/entries (%lld) in %s",
1415  e->GetNum(), e->GetFirst(), entries, e->GetFileName());
1416  e->SetNum(entries - e->GetFirst());
1417  }
1418  }
1419 
1420  // Count
1421  totent += entries;
1422  nopenf++;
1423 
1424  // Notify the client
1425  n++;
1426  gProof->SendDataSetStatus(msg, n, tot, st);
1427 
1428  } else {
1429 
1430  Error("ValidateFiles", "cannot get entries for file: %s - skipping", e->GetFileName() );
1431  //
1432  // Need to fix this with a user option to allow incomplete file sets (rdm)
1433  //
1434  //fValid = kFALSE; // all element must be readable!
1435  if (gProofServ) {
1437  m << TString(Form("Cannot get entries for file: %s - skipping",
1438  e->GetFileName()));
1439  gProofServ->GetSocket()->Send(m);
1440  }
1441 
1442  // invalidate element
1443  e->Invalidate();
1444  dset->SetBit(TDSet::kSomeInvalid);
1445  }
1446  PDB(kPacketizer,3) Info("ValidateFiles", " %lld events validated", totent);
1447 
1448  // Ready for the next job, unless we have enough files
1449  if (maxent < 0 || ((totent < maxent) && !byfile))
1450  workers.Add(slave);
1451  }
1452 
1453  // report std. output from slaves??
1454 
1455  ((TProof*)gProof)->ActivateAsyncInput();
1456 
1457  // This needs to be reset
1458  ((TProof*)gProof)->fCurrentMonitor = 0;
1459 
1460  // No reason to continue if invalid
1461  if (!fValid)
1462  return;
1463 
1464  // compute the offset for each file element
1465  Long64_t offset = 0;
1466  Long64_t newOffset = 0;
1467  TIter next(dset->GetListOfElements());
1468  TDSetElement *el;
1469  while ( (el = dynamic_cast<TDSetElement*> (next())) ) {
1470  if (el->GetValid()) {
1471  newOffset = offset + el->GetTDSetOffset();
1472  el->SetTDSetOffset(offset);
1473  offset = newOffset;
1474  }
1475  }
1476 }
1477 
1478 ////////////////////////////////////////////////////////////////////////////////
1479 /// The result depends on the fStrategy
1480 
1482 {
1483  Long64_t num;
1484  if (fStrategy == 0) {
1485  // TPacketizer's heuristic for starting packet size
1486  // Constant packet size;
1487  Int_t nslaves = fSlaveStats->GetSize();
1488  if (nslaves > 0) {
1489  num = fTotalEntries / (fPacketAsAFraction * nslaves);
1490  } else {
1491  num = 1;
1492  }
1493  } else {
1494  // The dynamic heuristic for setting the packet size (default)
1495  // Calculates the packet size based on performance of this slave
1496  // and estimated time left until the end of the query.
1497  TSlaveStat* slstat = (TSlaveStat*)slStatPtr;
1498  Float_t rate = slstat->GetCurRate();
1499  if (!rate)
1500  rate = slstat->GetAvgRate();
1501  if (rate) {
1502 
1503  // Global average rate
1504  Float_t avgProcRate = (GetEntriesProcessed()/(GetCumProcTime() / fSlaveStats->GetSize()));
1505  Float_t packetTime = ((fTotalEntries - GetEntriesProcessed())/avgProcRate)/fPacketAsAFraction;
1506 
1507  // Bytes-to-Event conversion
1508  Float_t bevt = (GetEntriesProcessed() > 0) ? GetBytesRead() / GetEntriesProcessed() : -1.;
1509 
1510  // Make sure it is not smaller then the cache, if the info is available and the size
1511  // synchronization is required. But apply the cache-packet size synchronization only if there
1512  // are enough left files to process and the files are all of similar sizes. Otherwise we risk
1513  // to not exploit optimally all potentially active workers.
1514  Bool_t cpsync = fCachePacketSync;
1515  if (fMaxEntriesRatio > 0. && cpsync) {
1516  if (fFilesToProcess && fFilesToProcess->GetSize() <= fSlaveStats->GetSize()) {
1517  Long64_t remEntries = fTotalEntries - GetEntriesProcessed();
1518  Long64_t maxEntries = -1;
1519  if (fFilesToProcess->Last()) {
1520  TDSetElement *elem = (TDSetElement *) ((TPacketizerAdaptive::TFileStat *) fFilesToProcess->Last())->GetElement();
1521  if (elem) maxEntries = elem->GetNum();
1522  }
1523  if (maxEntries > remEntries / fSlaveStats->GetSize() * fMaxEntriesRatio) {
1524  PDB(kPacketizer,3) {
1525  Info("CalculatePacketSize", "%s: switching off synchronization of packet and cache sizes:", slstat->GetOrdinal());
1526  Info("CalculatePacketSize", "%s: few files (%d) remaining of very different sizes (max/avg = %.2f > %.2f)",
1527  slstat->GetOrdinal(), fFilesToProcess->GetSize(),
1528  (Double_t)maxEntries / remEntries * fSlaveStats->GetSize(), fMaxEntriesRatio);
1529  }
1530  cpsync = kFALSE;
1531  }
1532  }
1533  }
1534  if (bevt > 0. && cachesz > 0 && cpsync) {
1535  if ((Long64_t)(rate * packetTime * bevt) < cachesz)
1536  packetTime = cachesz / bevt / rate;
1537  }
1538 
1539  // Apply min-max again, if required
1540  if (fMaxPacketTime > 0. && packetTime > fMaxPacketTime) packetTime = fMaxPacketTime;
1541  if (fMinPacketTime > 0. && packetTime < fMinPacketTime) packetTime = fMinPacketTime;
1542 
1543  // Translate the packet length in number of entries
1544  num = (Long64_t)(rate * packetTime);
1545 
1546  // Notify
1547  PDB(kPacketizer,2)
1548  Info("CalculatePacketSize","%s: avgr: %f, rate: %f, left: %lld, pacT: %f, sz: %f (csz: %f), num: %lld",
1549  slstat->GetOrdinal(), avgProcRate, rate, fTotalEntries - GetEntriesProcessed(),
1550  packetTime, ((bevt > 0) ? num*bevt/1048576. : -1.), cachesz/1048576., num);
1551 
1552  } else {
1553  // First packet for this worker in this query
1554  // Twice the learning phase
1555  num = (learnent > 0) ? 5 * learnent : 1000;
1556 
1557  // Notify
1558  PDB(kPacketizer,2)
1559  Info("CalculatePacketSize","%s: num: %lld", slstat->GetOrdinal(), num);
1560  }
1561  }
1562  if (num < 1) num = 1;
1563  return num;
1564 }
1565 
1566 ////////////////////////////////////////////////////////////////////////////////
1567 /// To be used by GetNextPacket but also in reaction to kPROOF_STOPPROCESS
1568 /// message (when the worker was asked to stop processing during a packet).
1569 /// returns the #entries intended in the last packet - #processed entries
1570 
1573  Double_t latency,
1574  TList **listOfMissingFiles)
1575 {
1576  // find slave
1577  TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue( sl );
1578  if (!slstat) {
1579  Error("AddProcessed", "%s: TSlaveStat instance for worker %s not found!",
1580  (sl ? sl->GetOrdinal() : "x.x"),
1581  (sl ? sl->GetName() : "**undef**"));
1582  return -1;
1583  }
1584 
1585  // update stats & free old element
1586 
1587  if ( slstat->fCurElem != 0 ) {
1588  Long64_t expectedNumEv = slstat->fCurElem->GetNum();
1589  // Calculate the number of events processed in the last packet
1590  Long64_t numev;
1591  if (status && status->GetEntries() > 0)
1592  numev = status->GetEntries() - slstat->GetEntriesProcessed();
1593  else
1594  numev = 0;
1595 
1596  // Calculate the progress made in the last packet
1597  TProofProgressStatus *progress = 0;
1598  if (numev > 0) {
1599  // This also moves the pointer in the corrsponding TFileInfo
1600  progress = slstat->AddProcessed(status);
1601  if (progress) {
1602  (*fProgressStatus) += *progress;
1603  // update processing rate
1604  slstat->UpdateRates(status);
1605  }
1606  } else {
1607  progress = new TProofProgressStatus();
1608  }
1609  if (progress) {
1610  PDB(kPacketizer,2)
1611  Info("AddProcessed", "%s: %s: %lld %7.3lf %7.3lf %7.3lf %lld",
1612  sl->GetOrdinal(), sl->GetName(), progress->GetEntries(), latency,
1613  progress->GetProcTime(), progress->GetCPUTime(), progress->GetBytesRead());
1614 
1615  if (gPerfStats)
1616  gPerfStats->PacketEvent(sl->GetOrdinal(), sl->GetName(),
1617  slstat->fCurElem->GetFileName(),
1618  progress->GetEntries(),
1619  latency,
1620  progress->GetProcTime(),
1621  progress->GetCPUTime(),
1622  progress->GetBytesRead());
1623  delete progress;
1624  }
1625  if (numev != expectedNumEv) {
1626  // The last packet was not fully processed
1627  // and will be split in two:
1628  // - The completed part was marked as done.
1629  // - Create a new packet with the part to be resubmitted.
1630  TDSetElement *newPacket = new TDSetElement(*(slstat->fCurElem));
1631  if (newPacket && numev < expectedNumEv) {
1632  Long64_t first = newPacket->GetFirst();
1633  newPacket->SetFirst(first + numev);
1634  if (ReassignPacket(newPacket, listOfMissingFiles) == -1)
1635  SafeDelete(newPacket);
1636  } else
1637  Error("AddProcessed", "%s: processed too much? (%lld, %lld)",
1638  sl->GetOrdinal(), numev, expectedNumEv);
1639 
1640  // TODO: a signal handler which will send info from the worker
1641  // after a packet fails.
1642  /* Add it to the failed packets list.
1643  if (!fFailedPackets) {
1644  fFailedPackets = new TList();
1645  }
1646  fFailedPackets->Add(slstat->fCurElem);
1647  */
1648  }
1649 
1650  slstat->fCurElem = 0;
1651  return (expectedNumEv - numev);
1652  } else {
1653  // the kPROOF_STOPPRPOCESS message is send after the worker receives zero
1654  // as the reply to kPROOF_GETNEXTPACKET
1655  return -1;
1656  }
1657 }
1658 
1659 ////////////////////////////////////////////////////////////////////////////////
1660 /// Get next packet;
1661 /// A meaningfull difference to TPacketizer is the fact that this
1662 /// packetizer, for each worker, tries to predict whether the worker
1663 /// will finish processing it's local files before the end of the query.
1664 /// If yes, it allocates, to those workers, files from non-slave filenodes
1665 /// or from slaves that are overloaded. The check is done every time a new
1666 /// file needs to be assigned.
1667 
1669 {
1670  if ( !fValid ) {
1671  return 0;
1672  }
1673 
1674  // find slave
1675 
1676  TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue( sl );
1677  if (!slstat) {
1678  Error("GetNextPacket", "TSlaveStat instance for worker %s not found!",
1679  (sl ? sl->GetName() : "**undef**"));
1680  return 0;
1681  }
1682 
1683  // Attach to current file
1684  TFileStat *file = slstat->fCurFile;
1685 
1686  // Update stats & free old element
1687 
1688  Bool_t firstPacket = kFALSE;
1689  Long64_t cachesz = -1;
1690  Int_t learnent = -1;
1691  if ( slstat->fCurElem != 0 ) {
1692 
1693  Long64_t restEntries = 0;
1694  Double_t latency, proctime, proccpu;
1696  Bool_t fileNotOpen = kFALSE, fileCorrupted = kFALSE;
1697 
1698  if (sl->GetProtocol() > 18) {
1699 
1700  (*r) >> latency;
1701  (*r) >> status;
1702 
1703  if (sl->GetProtocol() > 25) {
1704  (*r) >> cachesz >> learnent;
1705  if (r->BufferSize() > r->Length()) (*r) >> restEntries;
1706  }
1707  fileNotOpen = status->TestBit(TProofProgressStatus::kFileNotOpen) ? kTRUE : kFALSE;
1708  fileCorrupted = status->TestBit(TProofProgressStatus::kFileCorrupted) ? kTRUE : kFALSE;
1709 
1710  } else {
1711 
1712  Long64_t bytesRead = -1;
1713 
1714  (*r) >> latency >> proctime >> proccpu;
1715  // only read new info if available
1716  if (r->BufferSize() > r->Length()) (*r) >> bytesRead;
1717  if (r->BufferSize() > r->Length()) (*r) >> restEntries;
1718  Long64_t totev = 0;
1719  if (r->BufferSize() > r->Length()) (*r) >> totev;
1720 
1721  status = new TProofProgressStatus(totev, bytesRead, -1, proctime, proccpu);
1722  fileNotOpen = (restEntries < 0) ? kTRUE : kFALSE;
1723  }
1724 
1725  if (!fileNotOpen && !fileCorrupted) {
1726  if (AddProcessed(sl, status, latency) != 0)
1727  Error("GetNextPacket", "%s: the worker processed a different # of entries", sl->GetOrdinal());
1730  Error("GetNextPacket", "%s: processed too many entries! (%lld, %lld)",
1732  // Send last timer message and stop the timer
1733  HandleTimer(0);
1735  }
1736  } else {
1737  if (file) {
1738  if (file->GetElement()) {
1739  if (fileCorrupted) {
1740  Info("GetNextPacket", "%s: file '%s' turned corrupted: invalidating file (%lld)",
1741  sl->GetOrdinal(), file->GetElement()->GetName(), restEntries);
1742  Int_t nunproc = AddProcessed(sl, status, latency);
1743  PDB(kPacketizer,1)
1744  Info("GetNextPacket", "%s: %d entries un-processed", sl->GetOrdinal(), nunproc);
1745  // Remaining to be processed
1746  Long64_t num = 0;
1747  if (file->GetElement()->TestBit(TDSetElement::kCorrupted)) {
1748  // Add the remainign entries in the packet to the ones already registered
1749  num = file->GetElement()->GetEntries() + restEntries;
1750  } else {
1751  // First call: add the remaining entries in the packet and those of the file
1752  // not yet assigned
1753  Long64_t rest = file->GetElement()->GetEntries() - file->GetNextEntry();
1754  num = restEntries + rest;
1755  }
1756  file->GetElement()->SetEntries(num);
1757  PDB(kPacketizer,1)
1758  Info("GetNextPacket", "%s: removed file: %s, entries left: %lld", sl->GetOrdinal(),
1759  file->GetElement()->GetName(), file->GetElement()->GetEntries());
1760  // Flag as corrupted
1761  file->GetElement()->SetBit(TDSetElement::kCorrupted);
1762  } else {
1763  Info("GetNextPacket", "%s: file '%s' could not be open: invalidating related element",
1764  sl->GetOrdinal(), file->GetElement()->GetName());
1765  }
1766  // Invalidate the element
1767  file->GetElement()->Invalidate();
1768  // Add it to the failed packets list
1769  if (!fFailedPackets) fFailedPackets = new TList();
1770  if (!fFailedPackets->FindObject(file->GetElement()))
1771  fFailedPackets->Add(file->GetElement());
1772  }
1773  // Deactivate this TFileStat
1774  file->SetDone();
1775  RemoveActive(file);
1776  } else {
1777  Info("GetNextPacket", "%s: error raised by worker, but TFileStat object invalid:"
1778  " protocol error?", sl->GetOrdinal());
1779  }
1780  }
1781  } else {
1782  firstPacket = kTRUE;
1783  }
1784 
1785  if ( fStop ) {
1786  HandleTimer(0);
1787  return 0;
1788  }
1789 
1790  TString nodeName;
1791  if (file != 0) nodeName = file->GetNode()->GetName();
1792  TString nodeHostName(slstat->GetName());
1793 
1794  PDB(kPacketizer,3)
1795  Info("GetNextPacket", "%s: entries processed: %lld - looking for a packet from node '%s'",
1796  sl->GetOrdinal(), fProgressStatus->GetEntries(), nodeName.Data());
1797 
1798  // If current file is just finished
1799  if ( file != 0 && file->IsDone() ) {
1800  file->GetNode()->DecExtSlaveCnt(slstat->GetName());
1801  file->GetNode()->DecRunSlaveCnt();
1802  if (gPerfStats)
1803  gPerfStats->FileEvent(sl->GetOrdinal(), sl->GetName(), file->GetNode()->GetName(),
1804  file->GetElement()->GetFileName(), kFALSE);
1805  file = 0;
1806  }
1807  // Reset the current file field
1808  slstat->fCurFile = file;
1809 
1810  Long64_t avgEventsLeftPerSlave =
1813  return 0;
1814  // Get a file if needed
1815  if ( file == 0) {
1816  // Needs a new file
1817  Bool_t openLocal;
1818  // Aiming for localPreference == 1 when #local == #remote events left
1819  Float_t localPreference = fBaseLocalPreference - (fNEventsOnRemLoc /
1820  (0.4 *(fTotalEntries - fProgressStatus->GetEntries())));
1821  if ( slstat->GetFileNode() != 0 ) {
1822  // Local file node exists and has more events to process.
1823  fUnAllocated->Sort();
1824  TFileNode* firstNonLocalNode = (TFileNode*)fUnAllocated->First();
1825  Bool_t nonLocalNodePossible;
1826  if (fForceLocal)
1827  nonLocalNodePossible = 0;
1828  else
1829  nonLocalNodePossible = firstNonLocalNode ?
1830  (fMaxSlaveCnt < 0 || (fMaxSlaveCnt > 0 && firstNonLocalNode->GetExtSlaveCnt() < fMaxSlaveCnt))
1831  : 0;
1832  openLocal = !nonLocalNodePossible;
1833  Float_t slaveRate = slstat->GetAvgRate();
1834  if ( nonLocalNodePossible && fStrategy == 1) {
1835  // OpenLocal is set to kFALSE
1836  if ( slstat->GetFileNode()->GetRunSlaveCnt() >
1837  slstat->GetFileNode()->GetMySlaveCnt() - 1 )
1838  // External slaves help slstat -> don't open nonlocal files
1839  // -1 because, at this point slstat is not running
1840  openLocal = kTRUE;
1841  else if ( slaveRate == 0 ) { // first file for this slave
1842  // GetLocalEventsLeft() counts the potential slave
1843  // as running on its fileNode.
1844  if ( slstat->GetLocalEventsLeft() * localPreference
1845  > (avgEventsLeftPerSlave))
1846  openLocal = kTRUE;
1847  else if ( (firstNonLocalNode->GetEventsLeftPerSlave())
1848  < slstat->GetLocalEventsLeft() * localPreference )
1849  openLocal = kTRUE;
1850  else if ( firstNonLocalNode->GetExtSlaveCnt() > 1 )
1851  openLocal = kTRUE;
1852  else if ( firstNonLocalNode->GetRunSlaveCnt() == 0 )
1853  openLocal = kTRUE;
1854  } else {
1855  // At this point slstat has a non zero avg rate > 0
1856  Float_t slaveTime = slstat->GetLocalEventsLeft()/slaveRate;
1857  // And thus fCumProcTime, fProcessed > 0
1858  Float_t avgTime = avgEventsLeftPerSlave
1860  if (slaveTime * localPreference > avgTime)
1861  openLocal = kTRUE;
1862  else if ((firstNonLocalNode->GetEventsLeftPerSlave())
1863  < slstat->GetLocalEventsLeft() * localPreference)
1864  openLocal = kTRUE;
1865  }
1866  }
1867  if (openLocal || fStrategy == 0) {
1868  // Try its own node
1869  file = slstat->GetFileNode()->GetNextUnAlloc();
1870  if (!file)
1871  file = slstat->GetFileNode()->GetNextActive();
1872  if ( file == 0 ) {
1873  // No more files on this worker
1874  slstat->SetFileNode(0);
1875  }
1876  }
1877  }
1878 
1879  // Try to find an unused filenode first
1880  if(file == 0 && !fForceLocal)
1881  file = GetNextUnAlloc(0, nodeHostName);
1882 
1883  // Then look at the active filenodes
1884  if(file == 0 && !fForceLocal)
1885  file = GetNextActive();
1886 
1887  if (file == 0) return 0;
1888 
1889  PDB(kPacketizer,3) if (fFilesToProcess) fFilesToProcess->Print();
1890 
1891  slstat->fCurFile = file;
1892  // if remote and unallocated file
1893  if (file->GetNode()->GetMySlaveCnt() == 0 &&
1894  file->GetElement()->GetFirst() == file->GetNextEntry()) {
1895  fNEventsOnRemLoc -= file->GetElement()->GetNum();
1896  if (fNEventsOnRemLoc < 0) {
1897  Error("GetNextPacket",
1898  "inconsistent value for fNEventsOnRemLoc (%lld): stop delivering packets!",
1900  return 0;
1901  }
1902  }
1903  file->GetNode()->IncExtSlaveCnt(slstat->GetName());
1904  file->GetNode()->IncRunSlaveCnt();
1905  if (gPerfStats)
1906  gPerfStats->FileEvent(sl->GetOrdinal(), sl->GetName(),
1907  file->GetNode()->GetName(),
1908  file->GetElement()->GetFileName(), kTRUE);
1909  }
1910 
1911  Long64_t num = CalculatePacketSize(slstat, cachesz, learnent);
1912 
1913  // Get a packet
1914 
1915  TDSetElement *base = file->GetElement();
1916  Long64_t first = file->GetNextEntry();
1917  Long64_t last = base->GetFirst() + base->GetNum();
1918 
1919  // If the remaining part is smaller than the (packetsize * 1.5)
1920  // then increase the packetsize
1921 
1922  if ( first + num * 1.5 >= last ) {
1923  num = last - first;
1924  file->SetDone(); // done
1925  // Delete file from active list (unalloc list is single pass, no delete needed)
1926  RemoveActive(file);
1927  }
1928 
1929  // Update NextEntry in the file object
1930  file->MoveNextEntry(num);
1931 
1932  slstat->fCurElem = CreateNewPacket(base, first, num);
1933  if (base->GetEntryList())
1934  slstat->fCurElem->SetEntryList(base->GetEntryList(), first, num);
1935 
1936  // Flag the first packet of a new run (dataset)
1937  if (firstPacket)
1938  slstat->fCurElem->SetBit(TDSetElement::kNewRun);
1939  else
1940  slstat->fCurElem->ResetBit(TDSetElement::kNewRun);
1941 
1942  PDB(kPacketizer,2)
1943  Info("GetNextPacket","%s: %s %lld %lld (%lld)", sl->GetOrdinal(), base->GetFileName(), first, first + num - 1, num);
1944 
1945  return slstat->fCurElem;
1946 }
1947 
1948 ////////////////////////////////////////////////////////////////////////////////
1949 /// Return the number of workers still processing
1950 
1952 {
1953  Int_t actw = 0;
1954  TIter nxw(fSlaveStats);
1955  TObject *key;
1956  while ((key = nxw())) {
1957  TSlaveStat *wrkstat = (TSlaveStat *) fSlaveStats->GetValue(key);
1958  if (wrkstat && wrkstat->fCurFile) actw++;
1959  }
1960  // Done
1961  return actw;
1962 }
1963 
1964 ////////////////////////////////////////////////////////////////////////////////
1965 /// Get Estimation of the current rate; just summing the current rates of
1966 /// the active workers
1967 
1969 {
1970  all = kTRUE;
1971  // Loop over the workers
1972  Float_t currate = 0.;
1973  if (fSlaveStats && fSlaveStats->GetSize() > 0) {
1974  TIter nxw(fSlaveStats);
1975  TObject *key;
1976  while ((key = nxw()) != 0) {
1977  TSlaveStat *slstat = (TSlaveStat *) fSlaveStats->GetValue(key);
1978  if (slstat && slstat->GetProgressStatus() && slstat->GetEntriesProcessed() > 0) {
1979  // Sum-up the current rates
1980  currate += slstat->GetProgressStatus()->GetCurrentRate();
1981  } else {
1982  all = kFALSE;
1983  }
1984  }
1985  }
1986  // Done
1987  return currate;
1988 }
1989 
1990 ////////////////////////////////////////////////////////////////////////////////
1991 /// Get estimation for the number of processed entries and bytes read at time t,
1992 /// based on the numbers already processed and the latests worker measured speeds.
1993 /// If t <= 0 the current time is used.
1994 /// Only the estimation for the entries is currently implemented.
1995 /// This is needed to smooth the instantaneous rate plot.
1996 
1998  Long64_t &bytes, Long64_t &calls)
1999 {
2000  // Default value
2001  ent = GetEntriesProcessed();
2002  bytes = GetBytesRead();
2003  calls = GetReadCalls();
2004 
2005  // Parse option
2006  if (fUseEstOpt == kEstOff)
2007  // Do not use estimation
2008  return 0;
2009  Bool_t current = (fUseEstOpt == kEstCurrent) ? kTRUE : kFALSE;
2010 
2011  TTime tnow = gSystem->Now();
2012  Double_t now = (t > 0) ? (Double_t)t : Long64_t(tnow) / (Double_t)1000.;
2013  Double_t dt = -1;
2014 
2015  // Loop over the workers
2016  Bool_t all = kTRUE;
2017  Float_t trate = 0.;
2018  if (fSlaveStats && fSlaveStats->GetSize() > 0) {
2019  ent = 0;
2020  TIter nxw(fSlaveStats);
2021  TObject *key;
2022  while ((key = nxw()) != 0) {
2023  TSlaveStat *slstat = (TSlaveStat *) fSlaveStats->GetValue(key);
2024  if (slstat) {
2025  // Those surely processed
2026  Long64_t e = slstat->GetEntriesProcessed();
2027  if (e <= 0) all = kFALSE;
2028  // Time elapsed since last update
2029  dt = now - slstat->GetProgressStatus()->GetLastUpdate();
2030  // Add estimated entries processed since last update
2031  Float_t rate = (current && slstat->GetCurRate() > 0) ? slstat->GetCurRate()
2032  : slstat->GetAvgRate();
2033  trate += rate;
2034  // Add estimated entries processed since last update
2035  e += (Long64_t) (dt * rate);
2036  // Add to the total
2037  ent += e;
2038  // Notify
2039  PDB(kPacketizer,3)
2040  Info("GetEstEntriesProcessed","%s: e:%lld rate:%f dt:%f e:%lld",
2041  slstat->fSlave->GetOrdinal(),
2042  slstat->GetEntriesProcessed(), rate, dt, e);
2043  }
2044  }
2045  }
2046  // Notify
2047  dt = now - fProgressStatus->GetLastUpdate();
2048  PDB(kPacketizer,2)
2049  Info("GetEstEntriesProcessed",
2050  "dt: %f, estimated entries: %lld (%lld), bytes read: %lld rate: %f (all: %d)",
2051  dt, ent, GetEntriesProcessed(), bytes, trate, all);
2052 
2053  // Check values
2054  ent = (ent > 0) ? ent : fProgressStatus->GetEntries();
2055  ent = (ent <= fTotalEntries) ? ent : fTotalEntries;
2056  bytes = (bytes > 0) ? bytes : fProgressStatus->GetBytesRead();
2057 
2058  // Done
2059  return ((all) ? 0 : 1);
2060 }
2061 
2062 ////////////////////////////////////////////////////////////////////////////////
2063 /// This method can be called at any time during processing
2064 /// as an effect of handling kPROOF_STOPPROCESS
2065 /// If the output list from this worker is going to be sent back to the master,
2066 /// the 'status' includes the number of entries processed by the slave.
2067 /// From this we calculate the remaining part of the packet.
2068 /// 0 indicates that the results from that worker were lost completely.
2069 /// Assume that the filenodes for which we have a TFileNode object
2070 /// are still up and running.
2071 
2073  TList **listOfMissingFiles)
2074 {
2075  TSlaveStat *slaveStat = (TSlaveStat *)(fSlaveStats->GetValue(s));
2076  if (!slaveStat) {
2077  Error("MarkBad", "Worker does not exist");
2078  return;
2079  }
2080  // Update worker counters
2081  if (slaveStat->fCurFile && slaveStat->fCurFile->GetNode()) {
2082  slaveStat->fCurFile->GetNode()->DecExtSlaveCnt(slaveStat->GetName());
2083  slaveStat->fCurFile->GetNode()->DecRunSlaveCnt();
2084  }
2085 
2086  // If status is defined, the remaining part of the last packet is
2087  // reassigned in AddProcessed called from handling kPROOF_STOPPROCESS
2088  if (!status) {
2089  // Get the subset processed by the bad worker.
2090  TList *subSet = slaveStat->GetProcessedSubSet();
2091  if (subSet) {
2092  // Take care of the current packet
2093  if (slaveStat->fCurElem) {
2094  subSet->Add(slaveStat->fCurElem);
2095  }
2096  // Merge overlapping or subsequent elements
2097  Int_t nmg = 0, ntries = 100;
2098  TDSetElement *e = 0, *enxt = 0;
2099  do {
2100  nmg = 0;
2101  e = (TDSetElement *) subSet->First();
2102  while ((enxt = (TDSetElement *) subSet->After(e))) {
2103  if (e->MergeElement(enxt) >= 0) {
2104  nmg++;
2105  subSet->Remove(enxt);
2106  delete enxt;
2107  } else {
2108  e = enxt;
2109  }
2110  }
2111  } while (nmg > 0 && --ntries > 0);
2112  // reassign the packets assigned to the bad slave and save the size;
2113  SplitPerHost(subSet, listOfMissingFiles);
2114  // the elements were reassigned so should not be deleted
2115  subSet->SetOwner(0);
2116  } else {
2117  Warning("MarkBad", "subset processed by bad worker not found!");
2118  }
2119  (*fProgressStatus) -= *(slaveStat->GetProgressStatus());
2120  }
2121  // remove slavestat from the map
2122  fSlaveStats->Remove(s);
2123  delete slaveStat;
2124  // recalculate fNEventsOnRemLoc and others
2125  InitStats();
2126 }
2127 
2128 ////////////////////////////////////////////////////////////////////////////////
2129 /// The file in the listOfMissingFiles can appear several times;
2130 /// in order to fix that, a TDSetElement::Merge method is needed.
2131 
2133  TList **listOfMissingFiles)
2134 {
2135  if (!e) {
2136  Error("ReassignPacket", "empty packet!");
2137  return -1;
2138  }
2139  // Check the old filenode
2140  TUrl url = e->GetFileName();
2141  // Check the host from which 'e' was previously read.
2142  // Map non URL filenames to dummy host
2143  TString host;
2144  if ( !url.IsValid() ||
2145  (strncmp(url.GetProtocol(),"root", 4) &&
2146  strncmp(url.GetProtocol(),"rfio", 4))) {
2147  host = "no-host";
2148  } else {
2149  host = url.GetHost();
2150  }
2151 
2152  // If accessible add it back to the old node
2153  // and do DecProcessed
2154  TFileNode *node = (TFileNode*) fFileNodes->FindObject( host );
2155  if (node && fTryReassign) {
2156  // The packet 'e' was processing data from this node.
2157  node->DecreaseProcessed(e->GetNum());
2158  // The file should be already in fFilesToProcess ...
2159  node->Add(e, kFALSE);
2160  if (!fUnAllocated->FindObject(node))
2161  fUnAllocated->Add(node);
2162  return 0;
2163  } else {
2164  // Add to the list of missing files
2165  TFileInfo *fi = e->GetFileInfo();
2166  if (listOfMissingFiles && *listOfMissingFiles)
2167  (*listOfMissingFiles)->Add((TObject *)fi);
2168  return -1;
2169  }
2170 }
2171 
2172 ////////////////////////////////////////////////////////////////////////////////
2173 /// Split into per host entries
2174 /// The files in the listOfMissingFiles can appear several times;
2175 /// in order to fix that, a TDSetElement::Merge method is needed.
2176 
2178  TList **listOfMissingFiles)
2179 {
2180  if (!elements) {
2181  Error("SplitPerHost", "Empty list of packets!");
2182  return;
2183  }
2184  if (elements->GetSize() <= 0) {
2185  Error("SplitPerHost", "The input list contains no elements");
2186  return;
2187  }
2188  TIter subSetIter(elements);
2189  TDSetElement *e;
2190  while ((e = (TDSetElement*) subSetIter.Next())) {
2191  if (ReassignPacket(e, listOfMissingFiles) == -1) {
2192  // Remove from the list in order to delete it.
2193  if (elements->Remove(e))
2194  Error("SplitPerHost", "Error removing a missing file");
2195  delete e;
2196  }
2197 
2198  }
2199 }
const char * GetHost() const
Definition: TUrl.h:76
const char * GetName() const
Returns name of object.
Definition: TObjString.h:42
virtual const char * GetTitle() const
Returns title of object.
Definition: TNamed.h:52
void MarkBad(TSlave *s, TProofProgressStatus *status, TList **missingFiles)
This method can be called at any time during processing as an effect of handling kPROOF_STOPPROCESS I...
An array of TObjects.
Definition: TObjArray.h:39
const char * GetOrdinal() const
Definition: TSlave.h:135
TFileInfo * GetFileInfo(const char *type="TTree")
Return the content of this element in the form of a TFileInfo.
Definition: TDSet.cxx:234
void Reset()
Reset the internal data structure for packet distribution.
Long64_t GetEntries(Bool_t istree=kTRUE, Bool_t openfile=kTRUE)
Returns number of entries in tree or objects in file.
Definition: TDSet.cxx:432
void Add(THist< DIMENSION, PRECISIONA > &to, THist< DIMENSION, PRECISIONB > &from)
Definition: THist.h:335
long long Long64_t
Definition: RtypesCore.h:69
TSocket * GetSocket() const
Definition: TSlave.h:138
void SetProtocol(const char *proto, Bool_t setDefaultPort=kFALSE)
Set protocol and, optionally, change the port accordingly.
Definition: TUrl.cxx:518
ClassImp(TSeqCollection) Int_t TSeqCollection TIter next(this)
Return index of object in collection.
Ssiz_t Length() const
Definition: TString.h:390
Collectable string class.
Definition: TObjString.h:32
float Float_t
Definition: RtypesCore.h:53
const char Option_t
Definition: RtypesCore.h:62
virtual Bool_t IsValid() const
Definition: TSocket.h:162
void SplitPerHost(TList *elements, TList **listOfMissingFiles)
Split into per host entries The files in the listOfMissingFiles can appear several times; in order to...
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
Definition: TSocket.cxx:520
void SetTDSetOffset(Long64_t offset)
Definition: TDSet.h:131
This class represents a WWW compatible URL.
Definition: TUrl.h:41
virtual Int_t Recv(TMessage *&mess)
Receive a TMessage object.
Definition: TSocket.cxx:818
Definition: TDSet.h:153
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
const char * GetProtocol() const
Definition: TUrl.h:73
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition: TObject.cxx:892
Long64_t GetBytesRead() const
void ValidateFiles(TDSet *dset, TList *slaves, Long64_t maxent=-1, Bool_t byfile=kFALSE)
Check existence of file/dir/tree an get number of entries.
Double_t GetLastUpdate() const
Int_t GetEstEntriesProcessed(Float_t, Long64_t &ent, Long64_t &bytes, Long64_t &calls)
Get estimation for the number of processed entries and bytes read at time t, based on the numbers alr...
void Add(TObject *obj)
This function may not be used (but we need to provide it since it is a pure virtual in TCollection)...
Definition: TMap.cxx:52
#define R__ASSERT(e)
Definition: TError.h:98
virtual void Add(TSocket *sock, Int_t interest=kRead)
Add socket to the monitor's active list.
Definition: TMonitor.cxx:168
Basic string class.
Definition: TString.h:137
TList * GetListOfActives() const
Returns a list with all active sockets.
Definition: TMonitor.cxx:498
TAlienJobStatus * status
Definition: TAlienJob.cxx:51
int Int_t
Definition: RtypesCore.h:41
bool Bool_t
Definition: RtypesCore.h:59
const Bool_t kFALSE
Definition: Rtypes.h:92
virtual TObject * FindObject(const char *name) const
Find an object in this list using its name.
Definition: TList.cxx:496
void Print(Option_t *options="") const
Print a TDSetElement. When option="a" print full data.
Definition: TDSet.cxx:264
Basic time type with millisecond precision.
Definition: TTime.h:29
virtual void DeActivateAll()
De-activate all activated sockets.
Definition: TMonitor.cxx:302
Bool_t GetValid() const
Definition: TDSet.h:121
virtual TObject * At(Int_t idx) const
Returns the object at position idx. Returns 0 if idx is out of range.
Definition: TList.cxx:310
void Reset()
Definition: TCollection.h:161
virtual Bool_t HandleTimer(TTimer *timer)
Send progress message to client.
Long64_t GetEntriesProcessed() const
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
Definition: TObject.cxx:732
virtual ~TPacketizerAdaptive()
Destructor.
Int_t GetActiveWorkers()
Return the number of workers still processing.
TFileNode * NextNode()
Get next node which has unallocated files.
Double_t GetProcTime() const
const char * GetObjName() const
Definition: TDSet.h:122
virtual void Sort(Bool_t order=kSortAscending)
Sort linked list.
Definition: TList.cxx:770
virtual Int_t GetN() const
Definition: TEventList.h:58
Long64_t GetNum() const
Definition: TDSet.h:116
const char * Data() const
Definition: TString.h:349
Double_t GetCPUTime() const
#define SafeDelete(p)
Definition: RConfig.h:436
Double_t GetCumProcTime() const
Long64_t GetTDSetOffset() const
Definition: TDSet.h:130
Bool_t IsTree() const
Definition: TDSet.h:225
TFileStat * GetNextActive()
Get next active file.
virtual TObject * After(const TObject *obj) const
Returns the object after object obj.
Definition: TList.cxx:288
TDSetElement * CreateNewPacket(TDSetElement *base, Long64_t first, Long64_t num)
Creates a new TDSetElement from from base packet starting from the first entry with num entries...
#define PDB(mask, level)
Definition: TProofDebug.h:58
virtual void DeActivate(TSocket *sock)
De-activate a socket.
Definition: TMonitor.cxx:284
Long64_t GetBytesRead() const
Int_t AddProcessed(TSlave *sl, TProofProgressStatus *st, Double_t latency, TList **listOfMissingFiles=0)
To be used by GetNextPacket but also in reaction to kPROOF_STOPPROCESS message (when the worker was a...
virtual void AddAll(const TCollection *col)
TList * GetListOfElements() const
Definition: TDSet.h:231
A sorted doubly linked list.
Definition: TSortedList.h:30
std::vector< std::vector< double > > Data
TObject * GetValue(const char *keyname) const
Returns a pointer to the value associated with keyname as name of the key.
Definition: TMap.cxx:234
TDSetElement * GetNextPacket(TSlave *sl, TMessage *r)
Get next packet; A meaningfull difference to TPacketizer is the fact that this packetizer, for each worker, tries to predict whether the worker will finish processing it's local files before the end of the query.
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition: TObject.cxx:918
TSocket * Select()
Return pointer to socket for which an event is waiting.
Definition: TMonitor.cxx:322
TObject * GetParameter(const char *par) const
Get specified parameter.
Definition: TProof.cxx:10485
TProofProgressStatus * fProgressStatus
Long64_t GetFirst() const
Definition: TDSet.h:114
Bool_t IsValid() const
Definition: TUrl.h:88
virtual Bool_t IsSortable() const
Definition: TObject.h:139
virtual void Print(Option_t *option="") const
This method must be overridden when a class wants to print itself.
Definition: TObject.cxx:594
A doubly linked list.
Definition: TList.h:47
static const char * what
Definition: stlLoader.cc:6
const char * GetName() const
Returns name of object.
Definition: TSlave.h:128
void SetLastEntries(Long64_t entries)
const int nEvents
Definition: testRooFit.cxx:42
Named parameter, streamable and storable.
Definition: TParameter.h:49
TSocket * GetSocket() const
Definition: TProofServ.h:269
virtual TTime Now()
Get current time in milliseconds since 0:00 Jan 1 1995.
Definition: TSystem.cxx:467
void SendDataSetStatus(const char *msg, UInt_t n, UInt_t tot, Bool_t st)
Send or notify data set status.
Definition: TProof.cxx:9903
const char * GetFileName() const
Definition: TDSet.h:113
TString GetString() const
Definition: TObjString.h:50
ROOT::R::TRInterface & r
Definition: Object.C:4
Float_t GetProcTime() const
R__EXTERN TSystem * gSystem
Definition: TSystem.h:549
TObject * Remove(TObject *key)
Remove the (key,value) pair with key from the map.
Definition: TMap.cxx:294
virtual Int_t GetValue(const char *name, Int_t dflt)
Returns the integer value for a resource.
Definition: TEnv.cxx:494
virtual const char * ClassName() const
Returns name of class to which the object belongs.
Definition: TObject.cxx:187
virtual TObject * Remove(TObject *obj)
Remove object from the list.
Definition: TList.cxx:674
TObject * Next()
Definition: TCollection.h:158
Int_t CalculatePacketSize(TObject *slstat, Long64_t cachesz, Int_t learnent)
The result depends on the fStrategy.
TFileNode * NextActiveNode()
Get next active node.
const char * GetHostFQDN() const
Return fully qualified domain name of url host.
Definition: TUrl.cxx:467
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
Definition: TString.cxx:2321
unsigned int UInt_t
Definition: RtypesCore.h:42
Bool_t TestBit(UInt_t f) const
Definition: TObject.h:173
TMarker * m
Definition: textangle.C:8
char * Form(const char *fmt,...)
TObject * GetEntryList() const
Definition: TDSet.h:133
void Invalidate()
Definition: TDSet.h:136
Int_t ReassignPacket(TDSetElement *e, TList **listOfMissingFiles)
The file in the listOfMissingFiles can appear several times; in order to fix that, a TDSetElement::Merge method is needed.
A TEventList object is a list of selected events (entries) in a TTree.
Definition: TEventList.h:33
virtual const char * GetName() const
Returns name of object.
Definition: TNamed.h:51
Long64_t GetReadCalls() const
Bool_t IsNull() const
Definition: TString.h:387
void Reset(Detail::TBranchProxy *x)
Long64_t GetEntries() const
Int_t MergeElement(TDSetElement *elem)
Check if 'elem' is overlapping or subsequent and, if the case, return a merged element.
Definition: TDSet.cxx:185
virtual void Activate(TSocket *sock)
Activate a de-activated socket.
Definition: TMonitor.cxx:250
TObjArray * Tokenize(const TString &delim) const
This function is used to isolate sequential tokens in a TString.
Definition: TString.cxx:2240
void DeleteValues()
Remove all (key,value) pairs from the map AND delete the values when they are allocated on the heap...
Definition: TMap.cxx:149
#define Printf
Definition: TGeoToOCC.h:18
#define gPerfStats
const char * GetUrl(Bool_t withDeflt=kFALSE) const
Return full URL.
Definition: TUrl.cxx:385
void InitStats()
(re)initialise the statistics called at the begining or after a worker dies.
virtual void Print(Option_t *option="") const
Default print for collections, calls Print(option, 1).
virtual TObject * Last() const
Return the last object in the list. Returns 0 when list is empty.
Definition: TList.cxx:580
const char * GetDataSet() const
Definition: TDSet.h:124
void SetHost(const char *host)
Definition: TUrl.h:93
long Long_t
Definition: RtypesCore.h:50
Int_t GetPerfIdx() const
Definition: TSlave.h:136
R__EXTERN TProof * gProof
Definition: TProof.h:1110
virtual Int_t GetSize() const
Definition: TCollection.h:95
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
Definition: TMonitor.cxx:438
virtual const char * GetName() const
Returns name of object.
Definition: TObject.cxx:415
void SetValid()
Definition: TDSet.h:137
double Double_t
Definition: RtypesCore.h:55
virtual const char * HostName()
Return the system's host name.
Definition: TSystem.cxx:307
TMap implements an associative array of (key,value) pairs using a THashTable for efficient retrieval ...
Definition: TMap.h:44
ClassImp(TPacketizerAdaptive) TPacketizerAdaptive
Constructor.
R__EXTERN TEnv * gEnv
Definition: TEnv.h:174
TFileStat * GetNextUnAlloc(TFileNode *node=0, const char *nodeHostName=0)
Get next unallocated file from 'node' or other nodes: First try 'node'.
TSortedList * fFilesToProcess
Definition: TProof.h:339
Float_t GetCurrentRate(Bool_t &all)
Get Estimation of the current rate; just summing the current rates of the active workers.
void SetNum(Long64_t num)
Definition: TDSet.h:120
UInt_t What() const
Definition: TMessage.h:80
virtual void Clear(Option_t *option="")
Remove all objects from the list.
Definition: TList.cxx:348
Int_t GetProtocol() const
Definition: TSlave.h:137
virtual Long64_t GetN() const
Definition: TEntryList.h:77
Mother of all ROOT objects.
Definition: TObject.h:58
virtual TObject * First() const
Return the first object in the list. Returns 0 when list is empty.
Definition: TList.cxx:556
Int_t BufferSize() const
Definition: TBuffer.h:92
R__EXTERN TProofServ * gProofServ
Definition: TProofServ.h:359
virtual void Add(TObject *obj)
Definition: TList.h:81
Int_t Length() const
Definition: TBuffer.h:94
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
Definition: TString.h:567
Double_t GetRate() const
void RemoveActiveNode(TFileNode *)
Remove node from the list of actives.
virtual Int_t Compare(const TObject *obj) const
Compare abstract method.
Definition: TObject.cxx:218
ClassImp(TSlaveInfo) Int_t TSlaveInfo const TSlaveInfo * si
Used to sort slaveinfos by ordinal.
Definition: TProof.cxx:167
const char * GetType() const
Definition: TDSet.h:228
void RemoveUnAllocNode(TFileNode *)
Remove unallocated node.
Class describing a generic file including meta information.
Definition: TFileInfo.h:50
TProofProgressStatus * GetProgressStatus()
Definition: TSlave.h:50
const Bool_t kTRUE
Definition: Rtypes.h:91
void SetFirst(Long64_t first)
Definition: TDSet.h:115
virtual void SetTitle(const char *title="")
Change (i.e. set) the title of the TNamed.
Definition: TNamed.cxx:152
TObject * obj
A List of entry numbers in a TTree or TChain.
Definition: TEntryList.h:27
virtual TProofProgressStatus * AddProcessed(TProofProgressStatus *st)=0
const Int_t n
Definition: legend1.C:16
const char * GetFile() const
Definition: TUrl.h:78
const char * GetDirectory() const
Return directory where to look for object.
Definition: TDSet.cxx:256
void RemoveActive(TFileStat *file)
Remove file from the list of actives.
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition: TObject.cxx:904