#include "TPacketizerUnit.h"
#include "Riostream.h"
#include "TDSet.h"
#include "TError.h"
#include "TEventList.h"
#include "TMap.h"
#include "TMessage.h"
#include "TMonitor.h"
#include "TNtupleD.h"
#include "TObject.h"
#include "TParameter.h"
#include "TPerfStats.h"
#include "TProofDebug.h"
#include "TProof.h"
#include "TProofPlayer.h"
#include "TProofServ.h"
#include "TSlave.h"
#include "TSocket.h"
#include "TStopwatch.h"
#include "TTimer.h"
#include "TUrl.h"
#include "TClass.h"
#include "TMath.h"
#include "TObjString.h"
using namespace TMath;
class TPacketizerUnit::TSlaveStat : public TObject {
friend class TPacketizerUnit;
private:
   TSlave   *fSlave;         
   Long64_t  fProcessed;     
   Long64_t  fLastProcessed; 
   Double_t  fSpeed;         
   Double_t  fTimeInstant;   
   TNtupleD *fCircNtp;       
   Long_t    fCircLvl;       
public:
   TSlaveStat(TSlave *sl, TList *input);
   ~TSlaveStat();
   void        GetCurrentTime();
   const char *GetName() const { return fSlave->GetName(); }
   Long64_t    GetEntriesProcessed() const { return fProcessed; }
   void        UpdatePerformance(Double_t time);
};
TPacketizerUnit::TSlaveStat::TSlaveStat(TSlave *slave, TList *input)
                            : fSlave(slave), fProcessed(0), fLastProcessed(0),
                              fSpeed(0), fTimeInstant(0)
{
   
   
   fCircNtp = new TNtupleD("Speed Circ Ntp", "Circular process info","tm:ev");
   TProof::GetParameter(input, "PROOF_TPacketizerUnitCircularity", fCircLvl);
   fCircLvl = (fCircLvl > 0) ? fCircLvl : 5;
   fCircNtp->SetCircular(fCircLvl);
}
TPacketizerUnit::TSlaveStat::~TSlaveStat()
{
   
   SafeDelete(fCircNtp);
}
void TPacketizerUnit::TSlaveStat::UpdatePerformance(Double_t time)
{
   
   Double_t ttot = time;
   Double_t *ar = fCircNtp->GetArgs();
   Int_t ne = fCircNtp->GetEntries();
   if (ne <= 0) {
      
      fCircNtp->Fill(0., 0);
      fSpeed = 0.;
      return;
   }
   
   fCircNtp->GetEntry(ne-1);
   ttot = ar[0] + time;
   fCircNtp->Fill(ttot, fProcessed);
   
   fCircNtp->GetEntry(0);
   Double_t dtime = (ttot > ar[0]) ? ttot - ar[0] : ne+1 ;
   Long64_t nevts = fProcessed - (Long64_t)ar[1];
   fSpeed = nevts / dtime;
   PDB(kPacketizer,2)
      Info("UpdatePerformance", "time:%f, dtime:%f, nevts:%lld, speed: %f",
                                time, dtime, nevts, fSpeed);
}
ClassImp(TPacketizerUnit)
TPacketizerUnit::TPacketizerUnit(TList *slaves, Long64_t num, TList *input)
                : TVirtualPacketizer(input)
{
   
   PDB(kPacketizer,1) Info("TPacketizerUnit", "enter (num %lld)", num);
   
   fSlaveStats = 0;
   fPackets = 0;
   fTimeLimit = 1;
   TProof::GetParameter(input, "PROOF_PacketizerTimeLimit", fTimeLimit);
   PDB(kPacketizer,1)
      Info("TPacketizerUnit", "time limit is %lf", fTimeLimit);
   fProcessing = 0;
   fStopwatch = new TStopwatch();
   fPackets = new TList;
   fPackets->SetOwner();
   fSlaveStats = new TMap;
   fSlaveStats->SetOwner(kFALSE);
   TSlave *slave;
   TIter si(slaves);
   while ((slave = (TSlave*) si.Next()))
      fSlaveStats->Add(slave, new TSlaveStat(slave, input));
   fTotalEntries = num;
   fStopwatch->Start();
   PDB(kPacketizer,1) Info("TPacketizerUnit", "return");
}
TPacketizerUnit::~TPacketizerUnit()
{
   
   if (fSlaveStats)
      fSlaveStats->DeleteValues();
   SafeDelete(fSlaveStats);
   SafeDelete(fPackets);
   SafeDelete(fStopwatch);
}
Double_t TPacketizerUnit::GetCurrentTime()
{
   
   Double_t retValue = fStopwatch->RealTime();
   fStopwatch->Continue();
   return retValue;
}
Long64_t TPacketizerUnit::GetEntriesProcessed(TSlave *slave) const
{
   
   if (!fSlaveStats)
      return 0;
   TSlaveStat *slstat = (TSlaveStat *) fSlaveStats->GetValue(slave);
   if (!slstat)
      return 0;
   return slstat->GetEntriesProcessed();
}
TDSetElement *TPacketizerUnit::GetNextPacket(TSlave *sl, TMessage *r)
{
   
   if (!fValid)
      return 0;
   
   TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue(sl);
   R__ASSERT(slstat != 0);
   
   Double_t latency, proctime, proccpu;
   Long64_t bytesRead = -1;
   Long64_t totalEntries = -1;
   (*r) >> latency >> proctime >> proccpu;
   
   if (r->BufferSize() > r->Length()) (*r) >> bytesRead;
   if (r->BufferSize() > r->Length()) (*r) >> totalEntries;
   Long64_t totev = 0;
   if (r->BufferSize() > r->Length()) (*r) >> totev;
   Long64_t numev = totev - slstat->fProcessed;
   PDB(kPacketizer,2)
      Info("GetNextPacket","worker-%s: fProcessed %lld\t", sl->GetOrdinal(), fProcessed);
   fProcessing = 0;
   PDB(kPacketizer,2)
      Info("GetNextPacket","worker-%s (%s): %lld %7.3lf %7.3lf %7.3lf %lld",
                           sl->GetOrdinal(), sl->GetName(),
                           numev, latency, proctime, proccpu, bytesRead);
   if (gPerfStats != 0) {
      gPerfStats->PacketEvent(sl->GetOrdinal(), sl->GetName(), "", numev,
                              latency, proctime, proccpu, bytesRead);
   }
   if (fProcessed == fTotalEntries) {
      
      HandleTimer(0);
      SafeDelete(fProgress);
   }
   if (fStop) {
      
      HandleTimer(0);
      return 0;
   }
   Long64_t num;
   
   Double_t cTime = GetCurrentTime();
   if (slstat->fCircNtp->GetEntries() <= 0) {
      
      PDB(kPacketizer,2) Info("GetNextPacket"," calibration: "
                              "total entries %lld, workers %d",
                              fTotalEntries, fSlaveStats->GetSize());
      Long64_t avg = fTotalEntries/(fSlaveStats->GetSize());
      num = (avg > 5) ? 5 : avg;
      
      slstat->UpdatePerformance(0.);
   } else {
      
      
      slstat->fProcessed += slstat->fLastProcessed;
      slstat->UpdatePerformance(proctime);
      TMapIter *iter = (TMapIter *)fSlaveStats->MakeIterator();
      TSlave * tmpSlave;
      TSlaveStat * tmpStat;
      Double_t sumSpeed = 0;
      Double_t sumBusy = 0;
      
      
      
      
      
      
      
      
      while ((tmpSlave = (TSlave *)iter->Next())) {
         tmpStat = (TSlaveStat *)fSlaveStats->GetValue(tmpSlave);
         
         if ((cTime - tmpStat->fTimeInstant) > 4*fTimeLimit)
            tmpStat->fSpeed = 0;
         PDB(kPacketizer,2)
            Info("GetNextPacket",
                 "worker-%s: speed %lf", tmpSlave->GetOrdinal(), tmpStat->fSpeed);
         if (tmpStat->fSpeed > 0) {
            
            sumSpeed += tmpStat->fSpeed;
            
            if ((tmpStat->fTimeInstant) && (cTime - tmpStat->fTimeInstant > 0)) {
               
               
               
               
               Double_t busyspeed =
                  ((tmpStat->fTimeInstant - cTime)*tmpStat->fSpeed + tmpStat->fLastProcessed);
               if (busyspeed > 0) 
                  sumBusy += busyspeed;
            }
         }
      }
      PDB(kPacketizer,2)
         Info("GetNextPacket", "worker-%s: sum speed: %lf, sum busy: %f",
                               sl->GetOrdinal(), sumSpeed, sumBusy);
      
      if ((fTotalEntries - fProcessed)/(slstat->fSpeed) < fTimeLimit) {
         num = (fTotalEntries - fProcessed);
      } else {
         
         Double_t optTime = (fTotalEntries - fProcessed + sumBusy)/sumSpeed;
         
         
         
         num = (optTime > fTimeLimit) ? Nint(fTimeLimit*(slstat->fSpeed)) : Nint(optTime*(slstat->fSpeed));
         PDB(kPacketizer,2) 
            Info("GetNextPacket", "opTime %lf", optTime);
      }
   }
   fProcessing = (num < (fTotalEntries - fProcessed)) ? num : (fTotalEntries - fProcessed);
   
   slstat->fLastProcessed = fProcessing;
   
   slstat->fTimeInstant = cTime;
   PDB(kPacketizer,2)
      Info("GetNextPacket", "worker-%s: num  %lld processing %lld remained %lld",sl->GetOrdinal(),
                            num, fProcessing, (fTotalEntries - fProcessed - fProcessing));
   TDSetElement *elem = new TDSetElement("", "", "", 0, fProcessing);
   elem->SetBit(TDSetElement::kEmpty);
   
   fProcessed += slstat->fLastProcessed;
   return elem;
}
Last change: Tue May 13 17:22:24 2008
Last generated: 2008-05-13 17:22
This page has been automatically generated. If you have any comments or suggestions about the page layout send a mail to ROOT support, or contact the developers with any questions or problems regarding ROOT.