69   void        UpdatePerformance(
Double_t time);
 
   74class TPacketizerFile::TIterObj : 
public TObject {
 
   81   TIterObj(
const char *
n, 
TIter *iter) : fName(
n), fIter(iter) { }
 
   82   virtual ~TIterObj() { 
if (fIter) 
delete fIter; }
 
   84   const char *
GetName()
 const {
return fName;}
 
   85   TIter      *GetIter()
 const {
return fIter;}
 
   98   PDB(kPacketizer,1) 
Info(
"TPacketizerFile", 
"enter");
 
  105   if (!input || (input && input->
GetSize() <= 0)) {
 
  106      Error(
"TPacketizerFile", 
"input file is undefined or empty!");
 
  112   Int_t procnotass = 1;
 
  114      if (procnotass == 0) {
 
  115         Info(
"TPacketizerFile", 
"files not assigned to workers will not be processed");
 
  121   Int_t addfileinfo = 0;
 
  123      if (addfileinfo == 1) {
 
  124         Info(
"TPacketizerFile",
 
  125              "TFileInfo object will be included in the packet as associated object");
 
  132      Error(
"TPacketizerFile", 
"map of files to be processed/created not found");
 
  148      Info(
"TPacketizerFile", 
"worker: %s", wrkname.
Data());
 
  162   while ((key = nxl()) != 0) {
 
  166         if (
fc) wrklist = 
fc->GetList();
 
  175               Info(
"TPacketizerFile", 
"%d files of '%s' (fqdn: '%s') assigned to '%s'",
 
  185               Info(
"TPacketizerFile", 
"%d files of '%s' (fqdn: '%s') not assigned",
 
  197      Error(
"TPacketizerFile", 
"no file path in the map!");
 
  209   PDB(kPacketizer,1) 
Info(
"TPacketizerFile", 
"return");
 
  249      while ((key = nxw()) != 0) {
 
  251         if (wrkstat && wrkstat->GetProgressStatus() && wrkstat->GetEntriesProcessed() > 0) {
 
  253            currate += wrkstat->GetProgressStatus()->GetCurrentRate();
 
  274      Error(
"GetNextPacket", 
"could not find stat object for worker '%s'!", wrk->
GetName());
 
  282   Double_t latency = 0., proctime = 0., proccpu = 0.;
 
  297         numev = status->
GetEntries() - wrkstat->GetEntriesProcessed();
 
  298         progress = wrkstat->AddProcessed(status);
 
  303            totev  = status->GetEntries(); 
 
  309          Error(
"GetNextPacket", 
"no status came in the kPROOF_GETPACKET message");
 
  312      (*r) >> latency >> proctime >> proccpu;
 
  315      if (
r->BufferSize() > 
r->Length()) (*r) >> bytesRead;
 
  316      if (
r->BufferSize() > 
r->Length()) (*r) >> totalEntries;
 
  317      if (
r->BufferSize() > 
r->Length()) (*r) >> totev;
 
  319      numev = totev - wrkstat->GetEntriesProcessed();
 
  320      wrkstat->GetProgressStatus()->IncEntries(numev);
 
  321      wrkstat->GetProgressStatus()->SetLastUpdate();
 
  328      Info(
"GetNextPacket",
"worker-%s (%s): %lld %7.3lf %7.3lf %7.3lf %lld",
 
  330                           numev, latency, proctime, proccpu, bytesRead);
 
  334                              latency, proctime, proccpu, bytesRead);
 
  350      Info(
"GetNextPacket", 
"worker-%s (%s): getting next files ... ", wrk->
GetOrdinal(),
 
  362         nextfile = io->GetIter()->Next();
 
  371            nextfile = io->GetIter()->Next();
 
  376   if (!nextfile) 
return elem;
 
  382   if ((os = 
dynamic_cast<TObjString *
>(nextfile))) {
 
  385      if ((fi = 
dynamic_cast<TFileInfo *
>(nextfile)))
 
  390      Warning(
"GetNextPacket", 
"found unsupported object of type '%s' in list: it must" 
  391                               " be 'TObjString' or 'TFileInfo'", nextfile->
ClassName());
 
  396      Info(
"GetNextPacket", 
"worker-%s: assigning: '%s' (remaining %lld files)",
 
  418TPacketizerFile::TSlaveStat::TSlaveStat(
TSlave *slave, 
TList *input)
 
  420                              fSpeed(0), fTimeInstant(0), fCircLvl(5)
 
  423   fCircNtp = 
new TNtupleD(
"Speed Circ Ntp", 
"Circular process info",
"tm:ev");
 
  425   fCircLvl = (fCircLvl > 0) ? fCircLvl : 5;
 
  426   fCircNtp->SetCircular(fCircLvl);
 
  434TPacketizerFile::TSlaveStat::~TSlaveStat()
 
  442void TPacketizerFile::TSlaveStat::UpdatePerformance(
Double_t time)
 
  446   Int_t ne = fCircNtp->GetEntries();
 
  449      fCircNtp->Fill(0., 0);
 
  454   fCircNtp->GetEntry(ne-1);
 
  456   fCircNtp->Fill(ttot, GetEntriesProcessed());
 
  459   fCircNtp->GetEntry(0);
 
  460   Double_t dtime = (ttot > ar[0]) ? ttot - ar[0] : ne+1 ;
 
  462   fSpeed = nevts / dtime;
 
  464      Info("UpdatePerformance", "time:%
f, dtime:%
f, nevts:%lld, speed: %
f",
 
  465                                time, dtime, nevts, fSpeed);
 
  479      fStatus->SetLastProcTime(0.);
 
  487      Error(
"AddProcessed", 
"status arg undefined");
 
  498          ((GetIter() && GetIter()->GetCollection()) ? GetIter()->GetCollection()->GetSize()
 
void Info(const char *location, const char *msgfmt,...)
void Error(const char *location, const char *msgfmt,...)
void Printf(const char *fmt,...)
static struct mg_connection * fc(struct mg_context *ctx)
virtual void Print(Option_t *option="") const
Default print for collections, calls Print(option, 1).
void SetName(const char *name)
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
virtual Int_t GetSize() const
Return the capacity of the collection, i.e.
Manages an element of a TDSet.
void AddAssocObj(TObject *assocobj)
Add an associated object to the list.
Class that contains a list of TFileInfo's and accumulated meta data information about its entries.
Class describing a generic file including meta information.
void Print(Option_t *options="") const
Print information about this object.
TUrl * GetCurrentUrl() const
Return the current url.
THashList implements a hybrid collection class consisting of a hash table and a list to store TObject...
virtual void Add(TObject *obj)
virtual TObject * FindObject(const char *name) const
Find an object in this list using its name.
TMap implements an associative array of (key,value) pairs using a THashTable for efficient retrieval ...
void Add(TObject *obj)
This function may not be used (but we need to provide it since it is a pure virtual in TCollection).
TObject * GetValue(const char *keyname) const
Returns a pointer to the value associated with keyname as name of the key.
A simple TTree restricted to a list of double variables only.
Collectable string class.
const char * GetName() const
Returns name of object.
Mother of all ROOT objects.
virtual const char * GetName() const
Returns name of object.
virtual const char * ClassName() const
Returns name of class to which the object belongs.
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
virtual void Print(Option_t *option="") const
This method must be overridden when a class wants to print itself.
@ kInvalidObject
if object ctor succeeded but object should not be used
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
This packetizer generates packets which contain a single file path to be used in process.
Float_t GetCurrentRate(Bool_t &all)
Get Estimation of the current rate; just summing the current rates of the active workers.
virtual ~TPacketizerFile()
Destructor.
TDSetElement * GetNextPacket(TSlave *wrk, TMessage *r)
Get next packet.
Double_t GetCurrentTime()
Get current time.
Container class for processing statistics.
void SetLastUpdate(Double_t updtTime=0)
Update time stamp either with the passed value (if > 0) or with the current time.
Double_t GetProcTime() const
Long64_t GetEntries() const
void SetLastEntries(Long64_t entries)
Double_t GetCPUTime() const
void IncEntries(Long64_t entries=1)
Long64_t GetBytesRead() const
TObject * GetParameter(const char *par) const
Get specified parameter.
Class describing a PROOF worker server.
Int_t GetProtocol() const
const char * GetName() const
Returns name of object.
const char * GetOrdinal() const
Double_t RealTime()
Stop the stopwatch (if it is running) and return the realtime (in seconds) passed between the start a...
void Start(Bool_t reset=kTRUE)
Start the stopwatch.
void Continue()
Resume a stopped stopwatch.
const char * Data() const
This class represents a WWW compatible URL.
const char * GetUrl(Bool_t withDeflt=kFALSE) const
Return full URL.
const char * GetHostFQDN() const
Return fully qualified domain name of url host.
virtual TProofProgressStatus * AddProcessed(TProofProgressStatus *st)=0
The packetizer is a load balancing object created for each query.
Long64_t GetEntries(Bool_t tree, TDSetElement *e)
Get entries.
TProofProgressStatus * fProgressStatus
virtual Bool_t HandleTimer(TTimer *timer)
Send progress message to client.
std::string GetName(const std::string &scope_name)
void Print(std::ostream &os, const OptionType &opt)