ROOT  6.06/09
Reference Guide
TPacketizerFile.cxx
Go to the documentation of this file.
1 // @(#)root/proofplayer:$Id$
2 // Author: G. Ganis 2009
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2002, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 //////////////////////////////////////////////////////////////////////////
13 // //
14 // TPacketizerFile //
15 // //
16 // This packetizer generates packets which contain a single file path //
17 // to be used in process. Used for tasks generating files, like in //
18 // PROOF bench. //
19 // //
20 //////////////////////////////////////////////////////////////////////////
21 
22 #include "TPacketizerFile.h"
23 
24 #include "Riostream.h"
25 #include "TDSet.h"
26 #include "TError.h"
27 #include "TEventList.h"
28 #include "TMap.h"
29 #include "TMessage.h"
30 #include "TMonitor.h"
31 #include "TNtupleD.h"
32 #include "TObject.h"
33 #include "TParameter.h"
34 #include "TPerfStats.h"
35 #include "TProofDebug.h"
36 #include "TProof.h"
37 #include "TProofPlayer.h"
38 #include "TProofServ.h"
39 #include "TSlave.h"
40 #include "TSocket.h"
41 #include "TStopwatch.h"
42 #include "TTimer.h"
43 #include "TUrl.h"
44 #include "TClass.h"
45 #include "TMath.h"
46 #include "TObjString.h"
47 #include "TFileInfo.h"
48 #include "TFileCollection.h"
49 #include "THashList.h"
50 
51 //------------------------------------------------------------------------------
52 
53 class TPacketizerFile::TSlaveStat : public TVirtualPacketizer::TVirtualSlaveStat {
54 
55 friend class TPacketizerFile;
56 
57 private:
58  Long64_t fLastProcessed; // number of processed entries of the last packet
59  Double_t fSpeed; // estimated current average speed of the processing slave
60  Double_t fTimeInstant; // stores the time instant when the current packet started
61  TNtupleD *fCircNtp; // Keeps circular info for speed calculations
62  Long_t fCircLvl; // Circularity level
63 
64 public:
65  TSlaveStat(TSlave *sl, TList *input);
66  ~TSlaveStat();
67 
68  void GetCurrentTime();
69 
70  void UpdatePerformance(Double_t time);
72 };
73 
74 // Iterator wrapper
75 class TPacketizerFile::TIterObj : public TObject {
76 
77 private:
78  TString fName; // Name of reference
79  TIter *fIter; // Iterator
80 
81 public:
82  TIterObj(const char *n, TIter *iter) : fName(n), fIter(iter) { }
83  virtual ~TIterObj() { if (fIter) delete fIter; }
84 
85  const char *GetName() const {return fName;}
86  TIter *GetIter() const {return fIter;}
87  void Print(Option_t* option = "") const;
88 };
89 
91 
92 ////////////////////////////////////////////////////////////////////////////////
93 /// Constructor
94 
97  : TVirtualPacketizer(input, st)
98 {
99  PDB(kPacketizer,1) Info("TPacketizerFile", "enter");
100  ResetBit(TObject::kInvalidObject);
101  fValid = kFALSE;
102  fAssigned = 0;
103  fProcNotAssigned = kTRUE;
104  fAddFileInfo = kFALSE;
105 
106  if (!input || (input && input->GetSize() <= 0)) {
107  Error("TPacketizerFile", "input file is undefined or empty!");
108  SetBit(TObject::kInvalidObject);
109  return;
110  }
111 
112  // Check if the files not explicitly assigned have to be processed
113  Int_t procnotass = 1;
114  if (TProof::GetParameter(input, "PROOF_ProcessNotAssigned", procnotass) == 0) {
115  if (procnotass == 0) {
116  Info("TPacketizerFile", "files not assigned to workers will not be processed");
117  fProcNotAssigned = kFALSE;
118  }
119  }
120 
121  // Check if the TFileInfo object has to be added to the packet
122  Int_t addfileinfo = 0;
123  if (TProof::GetParameter(input, "PROOF_IncludeFileInfoInPacket", addfileinfo) == 0) {
124  if (addfileinfo == 1) {
125  Info("TPacketizerFile",
126  "TFileInfo object will be included in the packet as associated object");
127  fAddFileInfo = kTRUE;
128  }
129  }
130 
131  // These are the file to be created/processed per node; the information
132  if (!(fFiles = dynamic_cast<TMap *>(input->FindObject("PROOF_FilesToProcess")))) {
133  Error("TPacketizerFile", "map of files to be processed/created not found");
134  SetBit(TObject::kInvalidObject);
135  return;
136  }
137 
138  // The worker stats
139  fSlaveStats = new TMap;
140  fSlaveStats->SetOwner(kFALSE);
141 
142  TList nodes;
143  nodes.SetOwner(kTRUE);
144  TSlave *wrk;
145  TIter si(workers);
146  while ((wrk = (TSlave *) si.Next())) {
147  fSlaveStats->Add(wrk, new TSlaveStat(wrk, input));
148  TString wrkname = TUrl(wrk->GetName()).GetHostFQDN();
149  Info("TPacketizerFile", "worker: %s", wrkname.Data());
150  if (!nodes.FindObject(wrkname)) nodes.Add(new TObjString(wrkname));
151  }
152 
153  // The list of iterators
154  fIters = new TList;
155  fIters->SetOwner(kTRUE);
156 
157  // There must be something in
158  fTotalEntries = 0;
159  fNotAssigned = new TList;
160  fNotAssigned->SetName("*");
161  TIter nxl(fFiles);
162  TObject *key, *o = 0;
163  while ((key = nxl()) != 0) {
164  THashList *wrklist = dynamic_cast<THashList *>(fFiles->GetValue(key));
165  if (!wrklist) {
166  TFileCollection *fc = dynamic_cast<TFileCollection *>(fFiles->GetValue(key));
167  if (fc) wrklist = fc->GetList();
168  }
169  if (wrklist) {
170  TString hname = TUrl(key->GetName()).GetHostFQDN();
171  if ((o = nodes.FindObject(hname))) {
172  fTotalEntries += wrklist->GetSize();
173  fIters->Add(new TIterObj(hname, new TIter(wrklist)));
174  // Notify
175  PDB(kPacketizer,2)
176  Info("TPacketizerFile", "%d files of '%s' (fqdn: '%s') assigned to '%s'",
177  wrklist->GetSize(), key->GetName(), hname.Data(), o->GetName());
178  } else {
179  // We add all to the not assigned list so that they will be distributed
180  // according to the load
181  TIter nxf(wrklist);
182  while ((o = nxf()))
183  fNotAssigned->Add(o);
184  // Notify
185  PDB(kPacketizer,2)
186  Info("TPacketizerFile", "%d files of '%s' (fqdn: '%s') not assigned",
187  wrklist->GetSize(), key->GetName(), hname.Data());
188  }
189  }
190  }
191  if (fNotAssigned && fNotAssigned->GetSize() > 0) {
192  fTotalEntries += fNotAssigned->GetSize();
193  fIters->Add(new TIterObj("*", new TIter(fNotAssigned)));
194  Info("TPacketizerFile", "non-assigned files: %d", fNotAssigned->GetSize());
195  fNotAssigned->Print();
196  }
197  if (fTotalEntries <= 0) {
198  Error("TPacketizerFile", "no file path in the map!");
199  SetBit(TObject::kInvalidObject);
200  SafeDelete(fIters);
201  return;
202  } else {
203  Info("TPacketizerFile", "processing %lld files", fTotalEntries);
204  fIters->Print();
205  }
206 
207  fStopwatch = new TStopwatch();
208  fStopwatch->Start();
209  fValid = kTRUE;
210  PDB(kPacketizer,1) Info("TPacketizerFile", "return");
211 
212  // Done
213  return;
214 }
215 
216 ////////////////////////////////////////////////////////////////////////////////
217 /// Destructor.
218 
220 {
223  if (fIters) fIters->SetOwner(kTRUE);
226 }
227 
228 ////////////////////////////////////////////////////////////////////////////////
229 /// Get current time
230 
232 {
233  Double_t retValue = fStopwatch->RealTime();
234  fStopwatch->Continue();
235  return retValue;
236 }
237 
238 ////////////////////////////////////////////////////////////////////////////////
239 /// Get Estimation of the current rate; just summing the current rates of
240 /// the active workers
241 
243 {
244  all = kTRUE;
245  // Loop over the workers
246  Float_t currate = 0.;
247  if (fSlaveStats && fSlaveStats->GetSize() > 0) {
248  TIter nxw(fSlaveStats);
249  TObject *key;
250  while ((key = nxw()) != 0) {
251  TSlaveStat *wrkstat = (TSlaveStat *) fSlaveStats->GetValue(key);
252  if (wrkstat && wrkstat->GetProgressStatus() && wrkstat->GetEntriesProcessed() > 0) {
253  // Sum-up the current rates
254  currate += wrkstat->GetProgressStatus()->GetCurrentRate();
255  } else {
256  all = kFALSE;
257  }
258  }
259  }
260  // Done
261  return currate;
262 }
263 
264 ////////////////////////////////////////////////////////////////////////////////
265 /// Get next packet
266 
268 {
269  TDSetElement *elem = 0;
270  if (!fValid) return elem;
271 
272  // Find slave
273  TSlaveStat *wrkstat = (TSlaveStat *) fSlaveStats->GetValue(wrk);
274  if (!wrkstat) {
275  Error("GetNextPacket", "could not find stat object for worker '%s'!", wrk->GetName());
276  return elem;
277  }
278 
279  PDB(kPacketizer,2)
280  Info("GetNextPacket","worker-%s: fAssigned %lld / %lld", wrk->GetOrdinal(), fAssigned, fTotalEntries);
281 
282  // Update stats & free old element
283  Double_t latency = 0., proctime = 0., proccpu = 0.;
284  Long64_t bytesRead = -1;
285  Long64_t totalEntries = -1; // used only to read an old message type
286  Long64_t totev = 0;
287  Long64_t numev = -1;
288 
290  if (wrk->GetProtocol() > 18) {
291  (*r) >> latency;
292  (*r) >> status;
293 
294  // Calculate the progress made in the last packet
295  TProofProgressStatus *progress = 0;
296  if (status) {
297  // upadte the worker status
298  numev = status->GetEntries() - wrkstat->GetEntriesProcessed();
299  progress = wrkstat->AddProcessed(status);
300  if (progress) {
301  // (*fProgressStatus) += *progress;
302  proctime = progress->GetProcTime();
303  proccpu = progress->GetCPUTime();
304  totev = status->GetEntries(); // for backward compatibility
305  bytesRead = progress->GetBytesRead();
306  delete progress;
307  }
308  delete status;
309  } else
310  Error("GetNextPacket", "no status came in the kPROOF_GETPACKET message");
311  } else {
312 
313  (*r) >> latency >> proctime >> proccpu;
314 
315  // only read new info if available
316  if (r->BufferSize() > r->Length()) (*r) >> bytesRead;
317  if (r->BufferSize() > r->Length()) (*r) >> totalEntries;
318  if (r->BufferSize() > r->Length()) (*r) >> totev;
319 
320  numev = totev - wrkstat->GetEntriesProcessed();
321  wrkstat->GetProgressStatus()->IncEntries(numev);
322  wrkstat->GetProgressStatus()->SetLastUpdate();
323  }
324 
325  fProgressStatus->IncEntries(numev);
327 
328  PDB(kPacketizer,2)
329  Info("GetNextPacket","worker-%s (%s): %lld %7.3lf %7.3lf %7.3lf %lld",
330  wrk->GetOrdinal(), wrk->GetName(),
331  numev, latency, proctime, proccpu, bytesRead);
332 
333  if (gPerfStats != 0) {
334  gPerfStats->PacketEvent(wrk->GetOrdinal(), wrk->GetName(), "", numev,
335  latency, proctime, proccpu, bytesRead);
336  }
337 
338  if (fAssigned == fTotalEntries) {
339  // Send last timer message
340  HandleTimer(0);
341  return 0;
342  }
343 
344  if (fStop) {
345  // Send last timer message
346  HandleTimer(0);
347  return 0;
348  }
349 
350  PDB(kPacketizer,2)
351  Info("GetNextPacket", "worker-%s (%s): getting next files ... ", wrk->GetOrdinal(),
352  wrk->GetName());
353 
354  // Get next file now
355  TObject *nextfile = 0;
356 
357  // Find iterator associated to the worker
358  TString wrkname = TUrl(wrk->GetName()).GetHostFQDN();
359  TIterObj *io = dynamic_cast<TIterObj *>(fIters->FindObject(wrkname));
360  if (io) {
361  // Get next file to process in the list of the worker
362  if (io->GetIter())
363  nextfile = io->GetIter()->Next();
364  }
365 
366  // If not found or all files already processed, check if a generic iterator
367  // has still some files to process
368  if (!nextfile && fProcNotAssigned) {
369  if ((io = dynamic_cast<TIterObj *>(fIters->FindObject("*")))) {
370  // Get next file to process in the list of the worker
371  if (io->GetIter())
372  nextfile = io->GetIter()->Next();
373  }
374  }
375 
376  // Return if nothing to process
377  if (!nextfile) return elem;
378 
379  // The file name: we support TObjString or TFileInfo
381  TObjString *os = 0;
382  TFileInfo *fi = 0;
383  if ((os = dynamic_cast<TObjString *>(nextfile))) {
384  filename = os->GetName();
385  } else {
386  if ((fi = dynamic_cast<TFileInfo *>(nextfile)))
387  filename = fi->GetCurrentUrl()->GetUrl();
388  }
389  // Nothing to process
390  if (filename.IsNull()) {
391  Warning("GetNextPacket", "found unsupported object of type '%s' in list: it must"
392  " be 'TObjString' or 'TFileInfo'", nextfile->ClassName());
393  return elem;
394  }
395  // Prepare the packet
396  PDB(kPacketizer,2)
397  Info("GetNextPacket", "worker-%s: assigning: '%s' (remaining %lld files)",
398  wrk->GetOrdinal(), filename.Data(), (fTotalEntries - fAssigned));
399  elem = new TDSetElement(filename, "", "", 0, 1);
401 
402  // Add the element, if required
403  if (fAddFileInfo && fi) {
404  elem->AddAssocObj(fi);
405  PDB(kPacketizer,2) fi->Print("L");
406  }
407 
408  // Update the total counter
409  fAssigned += 1;
410 
411  return elem;
412 }
413 
414 //------------------------------------------------------------------------------
415 
416 ////////////////////////////////////////////////////////////////////////////////
417 /// Main constructor
418 
419 TPacketizerFile::TSlaveStat::TSlaveStat(TSlave *slave, TList *input)
420  : fLastProcessed(0),
421  fSpeed(0), fTimeInstant(0), fCircLvl(5)
422 {
423  // Initialize the circularity ntple for speed calculations
424  fCircNtp = new TNtupleD("Speed Circ Ntp", "Circular process info","tm:ev");
425  TProof::GetParameter(input, "PROOF_TPacketizerFileCircularity", fCircLvl);
426  fCircLvl = (fCircLvl > 0) ? fCircLvl : 5;
427  fCircNtp->SetCircular(fCircLvl);
428  fSlave = slave;
429  fStatus = new TProofProgressStatus();
430 }
431 
432 ////////////////////////////////////////////////////////////////////////////////
433 /// Destructor
434 
435 TPacketizerFile::TSlaveStat::~TSlaveStat()
436 {
437  SafeDelete(fCircNtp);
438 }
439 
440 ////////////////////////////////////////////////////////////////////////////////
441 /// Update the circular ntple
442 
443 void TPacketizerFile::TSlaveStat::UpdatePerformance(Double_t time)
444 {
445  Double_t ttot = time;
446  Double_t *ar = fCircNtp->GetArgs();
447  Int_t ne = fCircNtp->GetEntries();
448  if (ne <= 0) {
449  // First call: just fill one ref entry and return
450  fCircNtp->Fill(0., 0);
451  fSpeed = 0.;
452  return;
453  }
454  // Fill the entry
455  fCircNtp->GetEntry(ne-1);
456  ttot = ar[0] + time;
457  fCircNtp->Fill(ttot, GetEntriesProcessed());
458 
459  // Calculate the speed
460  fCircNtp->GetEntry(0);
461  Double_t dtime = (ttot > ar[0]) ? ttot - ar[0] : ne+1 ;
462  Long64_t nevts = GetEntriesProcessed() - (Long64_t)ar[1];
463  fSpeed = nevts / dtime;
464  PDB(kPacketizer,2)
465  Info("UpdatePerformance", "time:%f, dtime:%f, nevts:%lld, speed: %f",
466  time, dtime, nevts, fSpeed);
467 
468 }
469 
470 ////////////////////////////////////////////////////////////////////////////////
471 /// Update the status info to the 'st'.
472 /// return the difference (*st - *fStatus)
473 
474 TProofProgressStatus *TPacketizerFile::TSlaveStat::AddProcessed(TProofProgressStatus *st)
475 {
476  if (st) {
477  // The entriesis not correct in 'st'
478  Long64_t lastEntries = st->GetEntries() - fStatus->GetEntries();
479  // The last proc time should not be added
480  fStatus->SetLastProcTime(0.);
481  // Get the diff
482  TProofProgressStatus *diff = new TProofProgressStatus(*st - *fStatus);
483  *fStatus += *diff;
484  // Set the correct value
485  fStatus->SetLastEntries(lastEntries);
486  return diff;
487  } else {
488  Error("AddProcessed", "status arg undefined");
489  return 0;
490  }
491 }
492 
493 ////////////////////////////////////////////////////////////////////////////////
494 /// Printf info
495 
497 {
498  Printf("Iterator '%s' controls %d units", GetName(),
499  ((GetIter() && GetIter()->GetCollection()) ? GetIter()->GetCollection()->GetSize()
500  : -1));
501 }
const char * GetName() const
Returns name of object.
Definition: TObjString.h:42
const char * GetOrdinal() const
Definition: TSlave.h:135
Double_t RealTime()
Stop the stopwatch (if it is running) and return the realtime (in seconds) passed between the start a...
Definition: TStopwatch.cxx:108
long long Long64_t
Definition: RtypesCore.h:69
void Print(Option_t *options="") const
Print information about this object.
Definition: TFileInfo.cxx:475
A simple TTree restricted to a list of double variables only.
Definition: TNtupleD.h:30
Collectable string class.
Definition: TObjString.h:32
float Float_t
Definition: RtypesCore.h:53
const char Option_t
Definition: RtypesCore.h:62
This class represents a WWW compatible URL.
Definition: TUrl.h:41
virtual ~TPacketizerFile()
Destructor.
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition: TObject.cxx:892
Long64_t GetBytesRead() const
void SetLastUpdate(Double_t updtTime=0)
Update time stamp either with the passed value (if > 0) or with the current time. ...
static const char * filename()
void AddAssocObj(TObject *assocobj)
Add an associated object to the list.
Definition: TDSet.cxx:634
Basic string class.
Definition: TString.h:137
TAlienJobStatus * status
Definition: TAlienJob.cxx:51
int Int_t
Definition: RtypesCore.h:41
bool Bool_t
Definition: RtypesCore.h:59
const Bool_t kFALSE
Definition: Rtypes.h:92
virtual TObject * FindObject(const char *name) const
Find an object in this list using its name.
Definition: TList.cxx:496
Double_t GetCurrentTime()
Get current time.
virtual Bool_t HandleTimer(TTimer *timer)
Send progress message to client.
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
Definition: TObject.cxx:732
Double_t GetProcTime() const
const char * Data() const
Definition: TString.h:349
Double_t GetCPUTime() const
static struct mg_connection * fc(struct mg_context *ctx)
Definition: civetweb.c:839
#define SafeDelete(p)
Definition: RConfig.h:436
#define PDB(mask, level)
Definition: TProofDebug.h:58
THashList implements a hybrid collection class consisting of a hash table and a list to store TObject...
Definition: THashList.h:36
void IncEntries(Long64_t entries=1)
std::map< std::string, std::string >::const_iterator iter
Definition: TAlienJob.cxx:54
void Info(const char *location, const char *msgfmt,...)
TObject * GetValue(const char *keyname) const
Returns a pointer to the value associated with keyname as name of the key.
Definition: TMap.cxx:234
void Continue()
Resume a stopped stopwatch.
Definition: TStopwatch.cxx:91
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition: TObject.cxx:918
void Error(const char *location, const char *msgfmt,...)
TObject * GetParameter(const char *par) const
Get specified parameter.
Definition: TProof.cxx:10485
TProofProgressStatus * fProgressStatus
virtual void Print(Option_t *option="") const
This method must be overridden when a class wants to print itself.
Definition: TObject.cxx:594
A doubly linked list.
Definition: TList.h:47
const char * GetName() const
Returns name of object.
Definition: TSlave.h:128
void SetLastEntries(Long64_t entries)
TDSetElement * GetNextPacket(TSlave *wrk, TMessage *r)
Get next packet.
ROOT::R::TRInterface & r
Definition: Object.C:4
THashList * GetList()
TObject * Next()
Definition: TCollection.h:158
Bool_t IsNull() const
Definition: TString.h:387
void SetName(const char *name)
Definition: TCollection.h:116
Long64_t GetEntries() const
#define Printf
Definition: TGeoToOCC.h:18
Long64_t GetEntries(Bool_t tree, TDSetElement *e)
Get entries.
#define gPerfStats
const char * GetUrl(Bool_t withDeflt=kFALSE) const
Return full URL.
Definition: TUrl.cxx:385
long Long_t
Definition: RtypesCore.h:50
ClassImp(TPacketizerFile) TPacketizerFile
Constructor.
virtual Int_t GetSize() const
Definition: TCollection.h:95
void Print(std::ostream &os, const OptionType &opt)
virtual const char * GetName() const
Returns name of object.
Definition: TObject.cxx:415
double Double_t
Definition: RtypesCore.h:55
TMap implements an associative array of (key,value) pairs using a THashTable for efficient retrieval ...
Definition: TMap.h:44
Int_t GetProtocol() const
Definition: TSlave.h:137
Mother of all ROOT objects.
Definition: TObject.h:58
Int_t BufferSize() const
Definition: TBuffer.h:92
TStopwatch * fStopwatch
TUrl * GetCurrentUrl() const
Return the current url.
Definition: TFileInfo.cxx:246
virtual void Add(TObject *obj)
Definition: TList.h:81
Int_t Length() const
Definition: TBuffer.h:94
Class that contains a list of TFileInfo's and accumulated meta data information about its entries...
ClassImp(TSlaveInfo) Int_t TSlaveInfo const TSlaveInfo * si
Used to sort slaveinfos by ordinal.
Definition: TProof.cxx:167
Class describing a generic file including meta information.
Definition: TFileInfo.h:50
Definition: TSlave.h:50
const Bool_t kTRUE
Definition: Rtypes.h:91
Float_t GetCurrentRate(Bool_t &all)
Get Estimation of the current rate; just summing the current rates of the active workers.
virtual TProofProgressStatus * AddProcessed(TProofProgressStatus *st)=0
const Int_t n
Definition: legend1.C:16
Stopwatch class.
Definition: TStopwatch.h:30
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition: TObject.cxx:904