Logo ROOT   6.08/07
Reference Guide
Go to the documentation of this file.
1 // @(#)root/proofplayer:$Id$
2 // Author: Long Tran-Thanh 22/07/07
3 // Revised: G. Ganis, May 2011
5 /*************************************************************************
6  * Copyright (C) 1995-2002, Rene Brun and Fons Rademakers. *
7  * All rights reserved. *
8  * *
9  * For the licensing terms see $ROOTSYS/LICENSE. *
10  * For the list of contributors see $ROOTSYS/README/CREDITS. *
11  *************************************************************************/
13 /** \class TPacketizerUnit
14 \ingroup proofkernel
16 This packetizer generates packets of generic units, representing the
17 number of times an operation cycle has to be repeated by the worker
18 node, e.g. the number of Monte carlo events to be generated.
19 Packets sizes are generated taking into account the performance of
20 worker nodes, based on the time needed to process previous packets,
21 with the goal of having all workers ending at the same time.
23 */
26 #include "TPacketizerUnit.h"
28 #include "Riostream.h"
29 #include "TDSet.h"
30 #include "TError.h"
31 #include "TEventList.h"
32 #include "TMap.h"
33 #include "TMessage.h"
34 #include "TMonitor.h"
35 #include "TNtupleD.h"
36 #include "TObject.h"
37 #include "TParameter.h"
38 #include "TPerfStats.h"
39 #include "TProofDebug.h"
40 #include "TProof.h"
41 #include "TProofPlayer.h"
42 #include "TProofServ.h"
43 #include "TSlave.h"
44 #include "TSocket.h"
45 #include "TStopwatch.h"
46 #include "TTimer.h"
47 #include "TUrl.h"
48 #include "TClass.h"
49 #include "TMath.h"
50 #include "TObjString.h"
53 using namespace TMath;
54 //
55 // The following utility class manage the state of the
56 // work to be performed and the slaves involved in the process.
57 //
58 // The list of TSlaveStat(s) keep track of the work (being) done
59 // by each slave
60 //
62 //------------------------------------------------------------------------------
64 class TPacketizerUnit::TSlaveStat : public TVirtualPacketizer::TVirtualSlaveStat {
66 friend class TPacketizerUnit;
68 private:
69  Long64_t fLastProcessed; // Number of processed entries of the last packet
70  Double_t fRate; // Estimated processing rate averaged over circularity
71  Double_t fTimeInstant; // Starting time of the current packet
72  TNtupleD *fCircNtp; // Keeps circular info for speed calculations
73  Long_t fCircLvl; // Circularity level
75 public:
76  TSlaveStat(TSlave *sl, TList *input);
77  ~TSlaveStat();
79 // void GetCurrentTime();
81  void UpdatePerformance(Double_t time);
84 // ClassDef(TPacketizerUnit::TSlaveStat, 0);
85 };
87 ////////////////////////////////////////////////////////////////////////////////
88 /// Main constructor
90 TPacketizerUnit::TSlaveStat::TSlaveStat(TSlave *slave, TList *input)
91  : fLastProcessed(0),
92  fRate(0), fTimeInstant(0), fCircLvl(5)
93 {
94  // Initialize the circularity ntple for speed calculations
95  fCircNtp = new TNtupleD("Speed Circ Ntp", "Circular process info","tm:ev");
96  fCircNtp->SetDirectory(0);
97  TProof::GetParameter(input, "PROOF_TPacketizerUnitCircularity", fCircLvl);
98  fCircLvl = (fCircLvl > 0) ? fCircLvl : 5;
99  fCircNtp->SetCircular(fCircLvl);
100  fSlave = slave;
101  fStatus = new TProofProgressStatus();
102 }
104 ////////////////////////////////////////////////////////////////////////////////
105 /// Destructor
107 TPacketizerUnit::TSlaveStat::~TSlaveStat()
108 {
109  SafeDelete(fCircNtp);
110 }
112 ////////////////////////////////////////////////////////////////////////////////
113 /// Update the circular ntple
115 void TPacketizerUnit::TSlaveStat::UpdatePerformance(Double_t time)
116 {
117  Double_t ttot = time;
118  Double_t *ar = fCircNtp->GetArgs();
119  Int_t ne = fCircNtp->GetEntries();
120  if (ne <= 0) {
121  // First call: just fill one ref entry and return
122  fCircNtp->Fill(0., 0);
123  fRate = 0.;
124  return;
125  }
126  // Fill the entry
127  fCircNtp->GetEntry(ne-1);
128  ttot = ar[0] + time;
129  fCircNtp->Fill(ttot, GetEntriesProcessed());
131  // Calculate the speed
132  fCircNtp->GetEntry(0);
133  Double_t dtime = (ttot > ar[0]) ? ttot - ar[0] : ne+1 ;
134  Long64_t nevts = GetEntriesProcessed() - (Long64_t)ar[1];
135  fRate = nevts / dtime;
136  PDB(kPacketizer,2)
137  Info("UpdatePerformance", "time:%f, dtime:%f, nevts:%lld, speed: %f",
138  time, dtime, nevts, fRate);
140 }
142 ////////////////////////////////////////////////////////////////////////////////
143 /// Update the status info to the 'st'.
144 /// return the difference (*st - *fStatus)
147 {
148  if (st) {
149  // The entriesis not correct in 'st'
150  Long64_t lastEntries = st->GetEntries() - fStatus->GetEntries();
151  // The last proc time should not be added
152  fStatus->SetLastProcTime(0.);
153  // Get the diff
154  TProofProgressStatus *diff = new TProofProgressStatus(*st - *fStatus);
155  *fStatus += *diff;
156  // Set the correct value
157  fStatus->SetLastEntries(lastEntries);
158  return diff;
159  } else {
160  Error("AddProcessed", "status arg undefined");
161  return 0;
162  }
163 }
165 //------------------------------------------------------------------------------
169 ////////////////////////////////////////////////////////////////////////////////
170 /// Constructor
174  : TVirtualPacketizer(input, st)
175 {
176  PDB(kPacketizer,1) Info("TPacketizerUnit", "enter (num %lld)", num);
178  // Init pointer members
179  fWrkStats = 0;
180  fPackets = 0;
181  fInput = input;
183  fFixedNum = kFALSE;
184  Int_t fixednum = -1;
185  if (TProof::GetParameter(input, "PROOF_PacketizerFixedNum", fixednum) != 0 || fixednum <= 0) {
186  fFixedNum = kFALSE;
187  }
188  else {
189  Info("TPacketizerUnit", "forcing the same cycles on each worker");
190  fFixedNum = kTRUE;
191  }
193  fCalibFrac = 0.01;
194  if (TProof::GetParameter(input, "PROOF_PacketizerCalibFrac", fCalibFrac) != 0 || fCalibFrac <= 0)
195  fCalibFrac = 0.01;
196  PDB(kPacketizer,1)
197  Info("TPacketizerUnit", "size of the calibration packets: %.2f %% of average number per worker", fCalibFrac);
199  fMaxPacketTime = 3.;
200  Double_t timeLimit = -1;
201  if (TProof::GetParameter(input, "PROOF_PacketizerTimeLimit", timeLimit) == 0) {
202  fMaxPacketTime = timeLimit;
203  Warning("TPacketizerUnit", "PROOF_PacketizerTimeLimit is deprecated: use PROOF_MaxPacketTime instead");
204  }
205  PDB(kPacketizer,1)
206  Info("TPacketizerUnit", "time limit is %lf", fMaxPacketTime);
208  // Different default for min packet time
209  fMinPacketTime = 1;
210  Double_t minPacketTime = 0;
211  if (TProof::GetParameter(input, "PROOF_MinPacketTime", minPacketTime) == 0) fMinPacketTime = minPacketTime;
212  TParameter<Double_t> *mpt = (TParameter<Double_t> *) fConfigParams->FindObject("PROOF_MinPacketTime");
213  if (mpt) {
214  mpt->SetVal(fMinPacketTime);
215  } else {
216  fConfigParams->Add(new TParameter<Double_t>("PROOF_MinPacketTime", fMinPacketTime));
217  }
219  fProcessing = 0;
220  fAssigned = 0;
221  fPacketSeq = 0;
223  fStopwatch = new TStopwatch();
225  fPackets = new TList;
226  fPackets->SetOwner();
228  fWrkStats = new TMap;
230  fWrkExcluded = 0;
232  TSlave *slave;
233  TIter si(slaves);
234  while ((slave = (TSlave*) si.Next())) {
235  if (slave->GetParallel() > 0) {
236  fWrkStats->Add(slave, new TSlaveStat(slave, input));
237  } else {
238  if (!fWrkExcluded) {
239  fWrkExcluded = new TList;
241  }
242  PDB(kPacketizer,2)
243  Info("TPacketizerUnit", "node '%s' has NO active worker: excluded from work distribution", slave->GetOrdinal());
244  fWrkExcluded->Add(slave);
245  }
246  }
248  fTotalEntries = 0;
249  fNumPerWorker = -1;
250  if (num > 0 && AssignWork(0,0,num) != 0)
251  Warning("TPacketizerUnit", "some problems assigning work");
253  // Save the config parameters in the dedicated list so that they will be saved
254  // in the outputlist and therefore in the relevant TQueryResult
255  fConfigParams->Add(new TParameter<Float_t>("PROOF_PacketizerCalibFrac", fCalibFrac));
257  fStopwatch->Start();
258  PDB(kPacketizer,1) Info("TPacketizerUnit", "return");
259 }
261 ////////////////////////////////////////////////////////////////////////////////
262 /// Assign work to be done to this packetizer
265 {
266  if (num < 0) {
267  Error("AssignWork", "assigned a negative number (%lld) of cycles - protocol error?", num);
268  return -1;
269  }
271  fTotalEntries += num;
272  PDB(kPacketizer,1)
273  Info("AssignWork", "assigned %lld additional cycles (new total: %lld)", num, fTotalEntries);
275  // Update fixed number counter
276  if (fFixedNum && fWrkStats->GetSize() > 0) {
277  // Approximate number: the exact number is determined in GetNextPacket
279  if (fNumPerWorker == 0) fNumPerWorker = 1;
280  }
282  // Update/Save the config parameters in the dedicated list so that they will be saved
283  // in the outputlist and therefore in the relevant TQueryResult
285  (TParameter<Long64_t> *) fConfigParams->FindObject("PROOF_PacketizerFixedNum");
286  if (fn) {
287  fn->SetVal(fNumPerWorker);
288  } else {
289  fConfigParams->Add(new TParameter<Long64_t>("PROOF_PacketizerFixedNum", fNumPerWorker));
290  }
292  // Done
293  return 0;
294 }
296 ////////////////////////////////////////////////////////////////////////////////
297 /// Destructor.
300 {
301  if (fWrkStats)
307 }
309 ////////////////////////////////////////////////////////////////////////////////
310 /// Get current time
313 {
314  Double_t retValue = fStopwatch->RealTime();
315  fStopwatch->Continue();
316  return retValue;
317 }
319 ////////////////////////////////////////////////////////////////////////////////
320 /// Get Estimation of the current rate; just summing the current rates of
321 /// the active workers
324 {
325  all = kTRUE;
326  // Loop over the workers
327  Float_t currate = 0.;
328  if (fWrkStats && fWrkStats->GetSize() > 0) {
329  TIter nxw(fWrkStats);
330  TObject *key;
331  while ((key = nxw()) != 0) {
332  TSlaveStat *slstat = (TSlaveStat *) fWrkStats->GetValue(key);
333  if (slstat && slstat->GetProgressStatus() && slstat->GetEntriesProcessed() > 0) {
334  // Sum-up the current rates
335  currate += slstat->GetProgressStatus()->GetCurrentRate();
336  } else {
337  all = kFALSE;
338  }
339  }
340  }
341  // Done
342  return currate;
343 }
345 ////////////////////////////////////////////////////////////////////////////////
346 /// Get next packet
349 {
350  if (!fValid)
351  return 0;
353  // Find slave
354  TSlaveStat *slstat = (TSlaveStat*) fWrkStats->GetValue(sl);
355  if (!slstat) {
356  Warning("GetNextPacket", "Received a packet request from an unknown slave: %s:%s",
357  sl->GetName(), sl->GetOrdinal());
358  return 0;
359  }
361  PDB(kPacketizer,2)
362  Info("GetNextPacket","worker-%s: fAssigned %lld\t", sl->GetOrdinal(), fAssigned);
364  // Update stats & free old element
365  Double_t latency = 0., proctime = 0., proccpu = 0.;
366  Long64_t bytesRead = -1;
367  Long64_t totalEntries = -1; // used only to read an old message type
368  Long64_t totev = 0;
369  Long64_t numev = -1;
371  TProofProgressStatus *status = 0;
372  if (sl->GetProtocol() > 18) {
373  (*r) >> latency;
374  (*r) >> status;
376  // Calculate the progress made in the last packet
377  TProofProgressStatus *progress = 0;
378  if (status) {
379  // update the worker status
380  numev = status->GetEntries() - slstat->GetEntriesProcessed();
381  progress = slstat->AddProcessed(status);
382  if (progress) {
383  // (*fProgressStatus) += *progress;
384  proctime = progress->GetProcTime();
385  proccpu = progress->GetCPUTime();
386  totev = status->GetEntries(); // for backward compatibility
387  bytesRead = progress->GetBytesRead();
388  delete progress;
389  }
390  delete status;
391  } else
392  Error("GetNextPacket", "no status came in the kPROOF_GETPACKET message");
393  } else {
395  (*r) >> latency >> proctime >> proccpu;
397  // only read new info if available
398  if (r->BufferSize() > r->Length()) (*r) >> bytesRead;
399  if (r->BufferSize() > r->Length()) (*r) >> totalEntries;
400  if (r->BufferSize() > r->Length()) (*r) >> totev;
402  numev = totev - slstat->GetEntriesProcessed();
403  slstat->GetProgressStatus()->IncEntries(numev);
404  slstat->GetProgressStatus()->SetLastUpdate();
405  }
407  fProgressStatus->IncEntries(numev);
410  fProcessing = 0;
412  PDB(kPacketizer,2)
413  Info("GetNextPacket","worker-%s (%s): %lld %7.3lf %7.3lf %7.3lf %lld",
414  sl->GetOrdinal(), sl->GetName(),
415  numev, latency, proctime, proccpu, bytesRead);
417  if (gPerfStats != 0) {
418  gPerfStats->PacketEvent(sl->GetOrdinal(), sl->GetName(), "", numev,
419  latency, proctime, proccpu, bytesRead);
420  }
422  if (fNumPerWorker > 0 && slstat->GetEntriesProcessed() >= fNumPerWorker) {
423  PDB(kPacketizer,2)
424  Info("GetNextPacket","worker-%s (%s) is done (%lld cycles)",
425  sl->GetOrdinal(), sl->GetName(), slstat->GetEntriesProcessed());
426  return 0;
427  }
429  if (fAssigned == fTotalEntries) {
430  Bool_t done = kTRUE;
431  // If we are on a submaster, check if there is something else to do
434  if (nxe) {
435  if (AssignWork(0,0,nxe->GetNum()) == 0) {
436  if (fAssigned < fTotalEntries) done = kFALSE;
437  } else {
438  Error("GetNextPacket", "problems assigning additional work: stop");
439  }
440  SafeDelete(nxe);
441  }
442  }
443  if (done) {
444  // Send last timer message
445  HandleTimer(0);
446  return 0;
447  }
448  }
450  if (fStop) {
451  // Send last timer message
452  HandleTimer(0);
453  return 0;
454  }
457  Long64_t num;
459  // Get the current time
460  Double_t cTime = GetCurrentTime();
462  if (slstat->fCircNtp->GetEntries() <= 0) {
463  // The calibration phase
465  num = (Long64_t) (fCalibFrac * avg);
466  if (num < 1) num = (avg >= 1) ? avg : 1;
467  PDB(kPacketizer,2)
468  Info("GetNextPacket", "calibration: total entries %lld, workers %d, frac: %.1f %%, raw num: %lld",
469  fTotalEntries, fWrkStats->GetSize(), fCalibFrac * 100., num);
471  // Create a reference entry
472  slstat->UpdatePerformance(0.);
474  } else {
476  if (fNumPerWorker < 0) {
478  // Schedule tasks for workers based on the currently estimated processing speeds
480  // Update performances
481  // slstat->fStatus was updated before;
482  slstat->UpdatePerformance(proctime);
484  // We need to estimate the total instantaneous rate: for the workers not having yet
485  // one we assume the average of those having a measurement
486  // The optimal number for worker j is
487  //
488  // n_j = r_j / Sum r_i * N_left
489  //
491  Int_t nrm = 0;
492  Double_t sumRate = 0.;
493  TIter nxwrk(fWrkStats);
494  TSlaveStat *wrkStat = 0;
495  TSlave *tmpWrk = 0;
496  while ((tmpWrk = (TSlave *)nxwrk())) {
497  if ((wrkStat = dynamic_cast<TSlaveStat *>(fWrkStats->GetValue(tmpWrk)))) {
498  if (wrkStat->fRate > 0) {
499  nrm++;
500  sumRate += wrkStat->fRate;
501  }
502  PDB(kPacketizer,3)
503  Info("GetNextPacket", "%d: worker-%s: rate %lf /s (sum: %lf /s)",
504  nrm, tmpWrk->GetOrdinal(), wrkStat->fRate, sumRate);
505  } else {
506  Warning("GetNextPacket", "dynamic_cast<TSlaveStat *> failing on value for '%s (%s)'! Skipping",
507  tmpWrk->GetName(), tmpWrk->GetOrdinal());
508  }
509  }
511  // Check consistency
512  if (nrm <= 0) {
513  Error("GetNextPacket", "no worker has consistent information: stop processing!");
514  return (TDSetElement *)0;
515  }
517  Double_t avgRate = sumRate / nrm;
518  // Check if all workers had meaningful rate information
519  if (nrm < fWrkStats->GetSize()) {
520  // For some workers the measurement is missing: use the average
521  sumRate += (fWrkStats->GetSize() - nrm) * avgRate;
522  }
523  PDB(kPacketizer,2)
524  Info("GetNextPacket", "rate: avg: %lf /s/wrk - sum: %lf /s (measurements %d out of %d)",
525  avgRate, sumRate, nrm, fWrkStats->GetSize());
527  // Packet size for this worker
528  Double_t wrkRate = (slstat->fRate > 0.) ? slstat->fRate : avgRate ;
529  num = (Long64_t) ((fTotalEntries - fAssigned) * wrkRate / sumRate);
530  PDB(kPacketizer,2)
531  Info("GetNextPacket", "worker-%s (%s): raw packet size: %lld", sl->GetOrdinal(), sl->GetName(), num);
533  // Apply time-per-packet limits
534  Double_t packTime = num / wrkRate;
535  if (fMaxPacketTime > 0. && packTime > fMaxPacketTime) {
536  num = (Long64_t) (fMaxPacketTime * wrkRate) ;
537  packTime = fMaxPacketTime;
538  PDB(kPacketizer,2)
539  Info("GetNextPacket", "worker-%s (%s): time-limited packet size: %lld (upper limit: %.2f secs)",
540  sl->GetOrdinal(), sl->GetName(), num, fMaxPacketTime);
541  }
542  if (fMinPacketTime > 0. && packTime < fMinPacketTime) {
543  num = (Long64_t) (fMinPacketTime * wrkRate);
544  PDB(kPacketizer,2)
545  Info("GetNextPacket", "worker-%s (%s): time-limited packet size: %lld (lower limit: %.2f secs)",
546  sl->GetOrdinal(), sl->GetName(), num, fMinPacketTime);
547  }
549  } else {
550  // Fixed number of cycles per worker
551  num = fNumPerWorker - slstat->fLastProcessed;
552  if (num > 1 && slstat->fRate > 0 && num / slstat->fRate > fMaxPacketTime) {
553  num = (Long64_t) (slstat->fRate * fMaxPacketTime);
554  }
555  }
556  }
557  // Minimum packet size
558  num = (num > 1) ? num : 1;
559  fProcessing = (num < (fTotalEntries - fAssigned)) ? num
560  : (fTotalEntries - fAssigned);
562  // Set the information of the current slave
563  slstat->fLastProcessed = fProcessing;
564  // Set the start time of the current packet
565  slstat->fTimeInstant = cTime;
567  // Update the sequential number
568  fPacketSeq++;
569  TString sseq = TString::Format("p%lld", fPacketSeq);
571  PDB(kPacketizer,2)
572  Info("GetNextPacket", "worker-%s: num %lld, processing %lld, remaining %lld",sl->GetOrdinal(),
574  TDSetElement *elem = new TDSetElement(sseq, sseq, "", fAssigned, fProcessing);
577  // Update the total counter
578  fAssigned += slstat->fLastProcessed;
580  return elem;
581 }
583 ////////////////////////////////////////////////////////////////////////////////
584 /// Adds new workers. Returns the number of workers added, or -1 on failure.
587 {
588  if (!workers) {
589  Error("AddWorkers", "Null list of new workers!");
590  return -1;
591  }
593  Int_t curNumOfWrks = fWrkStats->GetEntries();
595  TSlave *sl;
596  TIter next(workers);
597  while (( sl = dynamic_cast<TSlave*>(next()) ))
598  fWrkStats->Add(sl, new TSlaveStat(sl, fInput));
600  fNumPerWorker = -1;
601  if (fFixedNum && fWrkStats->GetSize() > 0) {
602  // Approximate number: the exact number is determined in GetNextPacket
603  fNumPerWorker = (fNumPerWorker * curNumOfWrks) / fWrkStats->GetSize();
604  if (fNumPerWorker == 0) fNumPerWorker = 1;
605  }
607  fConfigParams->Add(new TParameter<Long64_t>("PROOF_PacketizerFixedNum", fNumPerWorker));
609  return fWrkStats->GetEntries();
610 }
virtual Int_t AddProcessed(TSlave *, TProofProgressStatus *, Double_t, TList **)
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition: TObject.cxx:899
Double_t RealTime()
Stop the stopwatch (if it is running) and return the realtime (in seconds) passed between the start a...
Definition: TStopwatch.cxx:110
long long Long64_t
Definition: RtypesCore.h:69
void Start(Bool_t reset=kTRUE)
Start the stopwatch.
Definition: TStopwatch.cxx:58
A simple TTree restricted to a list of double variables only.
Definition: TNtupleD.h:30
float Float_t
Definition: RtypesCore.h:53
Int_t GetParallel() const
Definition: TSlave.h:145
TObject * GetParameter(const char *par) const
Get specified parameter.
Definition: TProof.cxx:9890
This class implements a data set to be used for PROOF processing.
Definition: TDSet.h:153
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
Double_t GetProcTime() const
virtual Int_t GetEntries() const
Definition: TCollection.h:92
Int_t AddWorkers(TList *workers)
Adds new workers. Returns the number of workers added, or -1 on failure.
void SetLastUpdate(Double_t updtTime=0)
Update time stamp either with the passed value (if > 0) or with the current time. ...
void Add(TObject *obj)
This function may not be used (but we need to provide it since it is a pure virtual in TCollection)...
Definition: TMap.cxx:53
void SetVal(const AParamType &val)
Definition: TParameter.h:79
const char * GetOrdinal() const
Definition: TSlave.h:135
int Int_t
Definition: RtypesCore.h:41
bool Bool_t
Definition: RtypesCore.h:59
const Bool_t kFALSE
Definition: Rtypes.h:92
const char * GetName() const
Returns name of object.
Definition: TSlave.h:128
Bool_t IsTopMaster() const
Definition: TProofServ.h:309
Long64_t GetEntries() const
Int_t GetProtocol() const
Definition: TSlave.h:137
virtual Bool_t HandleTimer(TTimer *timer)
Send progress message to client.
TDSetElement * GetNextPacket(TSlave *sl, TMessage *r)
Get next packet.
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
Definition: TObject.cxx:739
virtual TObject * FindObject(const char *name) const
Find an object in this list using its name.
Definition: TList.cxx:497
Int_t Length() const
Definition: TBuffer.h:96
Manages an element of a TDSet.
Definition: TDSet.h:68
#define SafeDelete(p)
Definition: RConfig.h:507
Float_t GetCurrentRate(Bool_t &all)
Get Estimation of the current rate; just summing the current rates of the active workers.
Long64_t GetNum() const
Definition: TDSet.h:116
static TString Format(const char *fmt,...)
Static method which formats a string using a printf style format descriptor and return a TString...
Definition: TString.cxx:2335
#define PDB(mask, level)
Definition: TProofDebug.h:58
Long64_t fProcessing
Int_t AssignWork(TDSet *, Long64_t, Long64_t num)
Assign work to be done to this packetizer.
void IncEntries(Long64_t entries=1)
TDSetElement * GetNextPacket(Long64_t totalEntries=-1)
Get next range of entries to be processed on this server.
void Continue()
Resume a stopped stopwatch.
Definition: TStopwatch.cxx:93
This packetizer generates packets of generic units, representing the number of times an operation cyc...
TProofProgressStatus * fProgressStatus
A doubly linked list.
Definition: TList.h:47
void SetLastEntries(Long64_t entries)
Named parameter, streamable and storable.
Definition: TParameter.h:49
Long64_t fNumPerWorker
TRandom2 r(17)
virtual Int_t AssignWork(TDSet *, Long64_t, Long64_t)
TObject * Next()
Definition: TCollection.h:158
virtual ~TPacketizerUnit()
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition: TObject.cxx:925
void DeleteValues()
Remove all (key,value) pairs from the map AND delete the values when they are allocated on the heap...
Definition: TMap.cxx:150
#define gPerfStats
long Long_t
Definition: RtypesCore.h:50
The packetizer is a load balancing object created for each query.
TStopwatch * fStopwatch
#define ClassImp(name)
Definition: Rtypes.h:279
double f(double x)
double Double_t
Definition: RtypesCore.h:55
Long64_t GetEntriesProcessed() const
Double_t GetCPUTime() const
TMap implements an associative array of (key,value) pairs using a THashTable for efficient retrieval ...
Definition: TMap.h:44
Int_t BufferSize() const
Definition: TBuffer.h:94
Mother of all ROOT objects.
Definition: TObject.h:37
Long64_t GetBytesRead() const
R__EXTERN TProofServ * gProofServ
Definition: TProofServ.h:361
virtual void Add(TObject *obj)
Definition: TList.h:81
Int_t AddProcessed(TSlave *wrk, TProofProgressStatus *st, Double_t lat, TList **missing)
TObject * GetValue(const char *keyname) const
Returns a pointer to the value associated with keyname as name of the key.
Definition: TMap.cxx:235
virtual Int_t GetSize() const
Definition: TCollection.h:95
Class describing a PROOF worker server.
Definition: TSlave.h:50
Container class for processing statistics.
const Bool_t kTRUE
Definition: Rtypes.h:91
Bool_t IsMaster() const
Definition: TProofServ.h:307
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition: TObject.cxx:911
Double_t GetCurrentTime()
Get current time.
Stopwatch class.
Definition: TStopwatch.h:30