// @(#)root/treeplayer:$Name:  $:$Id: TPacketGenerator.cxx,v 1.1.1.1 2000/05/16 17:00:44 rdm Exp $
// Author: Fons Rademakers   28/03/97

/*************************************************************************
 * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers.               *
 * All rights reserved.                                                  *
 *                                                                       *
 * For the licensing terms see $ROOTSYS/LICENSE.                         *
 * For the list of contributors see $ROOTSYS/README/CREDITS.             *
 *************************************************************************/

//////////////////////////////////////////////////////////////////////////
//                                                                      //
// TPacketGenerator                                                     //
//                                                                      //
// This class generates packets to be processed on PROOF slave servers. //
// A packet is an entry range (begin entry and number of entries).      //
// Packets are generated taking into account the performance of the     //
// remote machine, the time it took to process a previous packet on     //
// the remote machine, the locality of the database files, etc.         //
//                                                                      //
//////////////////////////////////////////////////////////////////////////

#include "TPacketGenerator.h"
#include "TObjArray.h"
#include "TList.h"
#include "TSlave.h"
#include "TTree.h"
#include "TTime.h"
#include "TSystem.h"


// TPacket class used by the TPacketGenerator.

class TPacket : public TObject {

private:
   TSlave   *fSlave;    // slave on which packet has been processed
   Stat_t    fFirst;    // entry number of first entry in packet
   Int_t     fNentries; // number of entries in packet
   TTime     fTime;     // time when packet was generated

public:
   TPacket(TSlave *sl, Stat_t first, Int_t nentries);
   ~TPacket() { }

   Stat_t  GetFirstEntry() const { return fFirst; }
   Int_t   GetPacketSize() const { return fNentries; }
   TSlave *GetSlave() const { return fSlave; }
   TTime   GetTime() const { return fTime; }
   void    Print(Option_t *option ="");
};

TPacket::TPacket(TSlave *sl, Stat_t first, Int_t nentries)
{
   fSlave    = sl;
   fFirst    = first;
   fNentries = nentries;
   fTime     = gSystem->Now();
}

void TPacket::Print(Option_t *)
{
   Printf("    Processed on host:     %s",   fSlave->GetName());
   Printf("    Begin entry:           %.0f", fFirst);
   Printf("    Number of entries:     %d",   fNentries);
   Printf("    Starting time:         %lu",  long(fTime));
}


ClassImp(TPacketGenerator)

//______________________________________________________________________________
 TPacketGenerator::TPacketGenerator()
{
   // Private default ctor.

   fTree            = 0;
   fSlaves          = 0;
   fLastPackets     = 0;
   fPackets         = 0;
   fEntriesPerSlave = 0;
}

//______________________________________________________________________________
 TPacketGenerator::TPacketGenerator(Stat_t firstEntry, Stat_t lastEntry, TTree *tree, TList *slaves)
{
   // Initialize TPacketGenerator using a TTree and a list of slave processors.

   fNextEntry         = firstEntry;
   fLastEntry         = lastEntry;
   fTree              = tree;
   fInitialPacketSize = fTree->GetPacketSize();
   fSlaves            = slaves;

   Init();
}

//______________________________________________________________________________
 TPacketGenerator::TPacketGenerator(Stat_t firstEntry, Stat_t lastEntry, Int_t packetSize, TList *slaves)
{
   // Initialize TPacketGenerator using the total number of entries, the
   // initial packet size and the list of slaves.

   fNextEntry         = firstEntry;
   fLastEntry         = lastEntry;
   fInitialPacketSize = packetSize;
   fSlaves            = slaves;
   fTree              = 0;

   Init();
}

//______________________________________________________________________________
 TPacketGenerator::~TPacketGenerator()
{
   // TPacketGenerator dtor. Deletes the list of packets.

   if (fPackets) fPackets->Delete();
   delete fPackets;
   delete fLastPackets;
   delete [] fEntriesPerSlave;
}

//______________________________________________________________________________
 void TPacketGenerator::Init()
{
   // Initialize the TPacketGenerator. Called by the ctors.

   // loop over all slaves and find highest ordinal number
   TIter   next(fSlaves);
   TSlave *sl;
   Int_t   mxo = 0;
   while ((sl = (TSlave*) next()))
      mxo = TMath::Max(mxo, sl->GetOrdinal());
   fMaxOrd = mxo;
   mxo++;

   fEntriesProcessed = 0;
   fMinPacketSize    = fInitialPacketSize/5;  // 20% of initial packet size
   fSmallestPacket   = fInitialPacketSize;
   fEntriesPerSlave  = new Stat_t[mxo];
   fLastPackets      = new TObjArray(mxo);
   fPackets          = new TList;
}

//______________________________________________________________________________
 Stat_t TPacketGenerator::GetEntriesProcessed(TSlave *sl) const
{
   // Return number of entries processed by slave sl.

   return fEntriesPerSlave[sl->GetOrdinal()];
}

//______________________________________________________________________________
 Bool_t TPacketGenerator::GetNextPacket(TSlave *sl, Int_t &nentry, Stat_t &first)
{
   // Get next packet for slave sl. Returns kTRUE and sets first and nentry.
   // In case no more packets return kFALSE (and set first and nentry == 0).
   // The packet size for a slave is a linear function of the time diff between
   // the processing of two packets relative to the fastest slave (i.e. the one
   // with the smallest time diff). The fastest slave always processes
   // packets of size fTree->GetPacketSize() (or of the size specified in
   // the TPacketGenerator ctor).

   Int_t packetSize = fInitialPacketSize;

   // Update the statistics and calculate new packet size for the slave
   if (fLastPackets->UncheckedAt(sl->GetOrdinal())) {
      TPacket *p = (TPacket*)fLastPackets->UncheckedAt(sl->GetOrdinal());
      fEntriesPerSlave[sl->GetOrdinal()] += p->GetPacketSize();
      fEntriesProcessed += p->GetPacketSize();
      packetSize = p->GetPacketSize();
      TTime tdiff = gSystem->Now() - p->GetTime();
      if (long(fMinTimeDiff) == 0)
         fMinTimeDiff = tdiff;
      else if (tdiff < fMinTimeDiff) {
         if (packetSize == fInitialPacketSize)
            fMinTimeDiff = tdiff;
         else {
            Double_t frac = long(fMinTimeDiff - tdiff);
            frac /= long(fMinTimeDiff);
            packetSize += Int_t(frac*packetSize);
            if (packetSize > fInitialPacketSize)
               packetSize = fInitialPacketSize;
         }
      } else {
         Double_t frac = long(tdiff - fMinTimeDiff);
         frac /= long(fMinTimeDiff);
         packetSize -= Int_t(frac*packetSize);
         if (packetSize < fMinPacketSize) packetSize = fMinPacketSize;
         if (packetSize < fSmallestPacket) fSmallestPacket = packetSize;
      }
   }

   // If all entries have been processed return kFALSE
   if (fNextEntry > fLastEntry) {
      fLastPackets->AddAt(0, sl->GetOrdinal());
      first  = 0;
      nentry = -1;
      return kFALSE;
   }

   if (fNextEntry + packetSize > fLastEntry+1)
      packetSize = Int_t(fLastEntry - fNextEntry + 1);

   TPacket *pn = new TPacket(sl, fNextEntry, packetSize);
   fPackets->Add(pn);
   fLastPackets->AddAt(pn, sl->GetOrdinal());
   fNextEntry += packetSize;

   first  = pn->GetFirstEntry();
   nentry = pn->GetPacketSize();
   return kTRUE;
}

//______________________________________________________________________________
 Int_t TPacketGenerator::GetNoPackets() const
{
   // Number of packets generated.

   if (fPackets)
      return fPackets->GetSize();
   return 0;
}

//______________________________________________________________________________
 void TPacketGenerator::Print(Option_t *option)
{
   // Prints info about the packet generator. If option is "all", print also
   // info about all packets.

   Float_t tott = 0.0;
   Float_t aves = 0.0;
   Float_t avet = 0.0;
   if (fPackets) {
      tott = (long(((TPacket*)fPackets->Last())->GetTime()) -
              long(((TPacket*)fPackets->First())->GetTime())) / 1000.0;
      aves = fEntriesProcessed / fPackets->GetSize();
      avet = (long(((TPacket*)fPackets->Last())->GetTime()) -
              long(((TPacket*)fPackets->First())->GetTime())) /
              Float_t(fPackets->GetSize());
   }

   // Print generator summary
   Printf("Total entries processed:            %.0f", fEntriesProcessed);
   Printf("Total number of packets:            %d",   fPackets ? fPackets->GetSize() : 0);
   Printf("Default packet size:                %d",   fInitialPacketSize);
   Printf("Smallest packet size:               %d",   fSmallestPacket);
   Printf("Average packet size:                %.2f", aves);
   Printf("Total time (s):                     %.2f", tott);
   Printf("Average time between packets (ms):  %.2f", avet);
   Printf("Shortest time for packet (ms):      %ld",  long(fMinTimeDiff));
   Printf("Number of active slaves:            %d",   fSlaves->GetSize());

   TIter   next(fSlaves);
   TSlave *sl;
   int     i = 0;
   while ((sl = (TSlave*) next())) {
      Printf("   Number of entries processed by slave %d:  %.0f", i, fEntriesPerSlave[sl->GetOrdinal()]);
      i++;
   }

   if (fPackets && !strcasecmp(option,"all")) {
      fPackets->ForEach(TPacket,Print)(option);
   }
}

//______________________________________________________________________________
 void TPacketGenerator::Reset(Stat_t firstEntry, Stat_t lastEntry, TList *slaves)
{
   // Reset the packet generator for a new cycle.

   // Find highest ordinal slave number
   TIter   next(slaves);
   TSlave *sl;
   Int_t   mxo = 0;
   while ((sl = (TSlave*) next()))
      mxo = TMath::Max(mxo, sl->GetOrdinal());
   mxo++;

   if (fPackets) fPackets->Delete();

   if (mxo != fMaxOrd+1) {
      delete fLastPackets;
      delete [] fEntriesPerSlave;
      fLastPackets      = new TObjArray(mxo);
      fEntriesPerSlave  = new Stat_t[mxo];
      fMaxOrd = mxo - 1;
   } else {
      for (int i = 0; i < mxo; i++) {
         fLastPackets->AddAt(0,i);
         fEntriesPerSlave[i] = 0;
      }
   }

   if (fTree)
      fInitialPacketSize = fTree->GetPacketSize();

   fNextEntry        = firstEntry;
   fLastEntry        = lastEntry;
   fSlaves           = slaves;
   fEntriesProcessed = 0;
   fMinPacketSize    = fInitialPacketSize/5;  // 20% of initial packet size
   fMinTimeDiff      = 0;
   fSmallestPacket   = fInitialPacketSize;
}

//______________________________________________________________________________
 void TPacketGenerator::SetMinPacketSize(Int_t minps)
{
   // Set minimum packet size. The default is 20% of the initial packet size
   // (in case of TTree's the initial packet size is set in via
   // TTree::SetPacketSize() and its default is 100).

   if (minps > 0 && minps < fInitialPacketSize)
      fMinPacketSize = minps;
}


ROOT page - Class index - Top of the page

This page has been automatically generated. If you have any comments or suggestions about the page layout send a mail to ROOT support, or contact the developers with any questions or problems regarding ROOT.