// @(#)root/proofplayer:$Id: TPacketizerProgressive.h 20882 2007-11-19 11:31:26Z rdm $
// Author: Zev Benjamin  13/09/2005

/*************************************************************************
 * Copyright (C) 1995-2005, Rene Brun and Fons Rademakers.               *
 * All rights reserved.                                                  *
 *                                                                       *
 * For the licensing terms see $ROOTSYS/LICENSE.                         *
 * For the list of contributors see $ROOTSYS/README/CREDITS.             *
 *************************************************************************/

//////////////////////////////////////////////////////////////////////////
//                                                                      //
// TPacketizerProgressive                                               //
//                                                                      //
// This class is one of PROOF packetizers.                              //
// Packetizer generates packets to be processed on PROOF worker servers.//
// A packet is an event range (begin entry and number of entries) or    //
// object range (first object and number of objects) in a TTree         //
// (entries) or a directory (objects) in a file.                        //
// Packets are generated taking into account the performance of the     //
// remote machine, the time it took to process a previous packet on     //
// the remote machine, the locality of the database files, etc.         //
//                                                                      //
// The TPacketizerProgressive does not pre-open the files to calculate  //
// the total number of events. It just walks sequentially through the   //
// list of files.                                                       //
//                                                                      //
//////////////////////////////////////////////////////////////////////////

#ifndef ROOT_TPacketizerProgressive
#define ROOT_TPacketizerProgressive

#ifndef ROOT_TVirtualPacketizer
#include "TVirtualPacketizer.h"
#endif
#ifndef ROOT_TString
#include "TString.h"
#endif

class TDSet;
class TDSetElement;
class THashTable;
class TList;
class TMap;
class TMessage;
class TSlave;
class TTimer;

class TPacketizerProgressive : public TVirtualPacketizer {

public:
   class TFileStat;

   class TFileNode : public TObject {
   private:
      TString        fNodeName;        // FQDN of the node
      TList         *fFiles;           // TDSetElements (files) stored on this node
      TObject       *fUnAllocFileNext; // cursor in fFiles
      TList         *fActFiles;        // files with work remaining
      TObject       *fActFileNext;     // cursor in fActFiles
      Int_t          fMySlaveCnt;      // number of slaves running on this node
      Int_t          fSlaveCnt;        // number of external slaves processing files on this node
   public:
      TFileNode(const char *name);
      ~TFileNode();

      void        IncMySlaveCnt() { fMySlaveCnt++; }
      void        IncSlaveCnt(const char *slave) { if (fNodeName != slave) fSlaveCnt++; }
      void        DecSlaveCnt(const char *slave);
      Int_t       GetSlaveCnt() const { return fMySlaveCnt + fSlaveCnt; }
      Int_t       GetNumberOfActiveFiles() const;
      Bool_t      IsSortable() const { return kTRUE; }
      const char *GetName() const { return fNodeName; }
      void        Add(TDSetElement *elem);
      TFileStat  *GetNextUnAlloc();
      TFileStat  *GetNextActive();
      void        RemoveActive(TFileStat *file);
      Bool_t      HasActiveFiles();
      Bool_t      HasUnAllocFiles() {if (fUnAllocFileNext) return kTRUE; return kFALSE; }
      Int_t       Compare(const TObject *other) const;
      void        Print(Option_t *opt ="") const;
      void        Reset();
   };

   class TFileStat : public TObject {
   private:
      Bool_t        fIsDone;       // is this element processed
      TFileNode    *fNode;         // my FileNode
      TDSetElement *fElement;      // location of the file and its range
      Long64_t      fNextEntry;    // cursor in the range, -1 when done
   public:
      TFileStat(TFileNode *node, TDSetElement *elem);

      Bool_t        IsDone() const { return fIsDone; }
      void          SetDone() { fIsDone = kTRUE; }
      TFileNode    *GetNode() const { return fNode; }
      TDSetElement *GetElement() const { return fElement; }
      Long64_t      GetNextEntry() const { return fNextEntry; }
      void          MoveNextEntry(Long64_t step) { fNextEntry += step; }
   };

   class TSlaveStat : public TObject {
   private:
      TSlave       *fSlave;        // corresponding TSlave record
      TFileNode    *fFileNode;     // corresponding node or 0
      TFileStat    *fCurFile;      // file currently being processed
      TDSetElement *fCurElem;      // TDSetElement currently being processed
      Long64_t      fProcessed;    // number of entries processed
   public:
      TSlaveStat(TSlave *slave);

      TFileNode      *GetFileNode() const { return fFileNode; }
      TFileStat      *GetCurrentFile() const { return fCurFile; }
      TDSetElement   *GetCurrentElement() const { return fCurElem; }
      const char     *GetName() const;
      Long64_t        GetEntriesProcessed() const { return fProcessed; }
      void            SetFileNode(TFileNode *node) { fFileNode = node; }
      void            SetCurrentFile(TFileStat *file) { fCurFile = file; }
      void            SetCurrentElement(TDSetElement* elem) { fCurElem = elem; }
      void            IncEntriesProcessed(Long64_t n) { fProcessed += n; }
   };

private:
   enum {
      kSlaveHostConnLim    = 2,
      kNonSlaveHostConnLim = 2,
      kEntryListSize       = 5
   };

   TDSet      *fDset;
   TList      *fSlaves;
   TList      *fSlavesRemaining;  // slaves stilll working
   Long64_t    fFirstEvent;
   Long64_t    fTotalEvents;

   Long64_t    fEntriesSeen;      // number of entries found so far
   Long64_t    fFilesOpened;      // total number of files with their entries recorded
   Long64_t    fEstTotalEntries;  // estimated total number of entries
   TMap       *fSlaveStats;       // map of slave addresses to its TSlaveStat object
   THashTable *fNewFileSlaves;    // slaves that have just opened a new file and need to
                                  // record the number of entries in them (keyed by TSlaveStat)

   TList      *fUnAllocSlaves;    // slave hosts that have unallocated files
   TList      *fUnAllocNonSlaves; // non-slave hosts that have unallocated files
   TList      *fActiveSlaves;     // slave hosts that have active files
   TList      *fActiveNonSlaves;  // non-slave hosts that have active files

   TList      *fLastEntrySizes;   // list of the last kEntryListSize TDSetElement sizes (in entries)
   Long64_t    fPacketSize;       // current packet size based on estimate of total number of entries

   TPacketizerProgressive();
   TPacketizerProgressive(const TPacketizerProgressive&);

   void          RecalculatePacketSize(Long64_t newCount);
   TFileStat    *GetNextActive(TSlaveStat *stat);
   TFileStat    *GetNextUnAlloc(TSlaveStat *stat);
   TDSetElement *BuildPacket(TSlaveStat *stat, Long64_t num);

   void Init();

   virtual Bool_t HandleTimer(TTimer *timer);

public:
   TPacketizerProgressive(TDSet *dset, TList *slaves,
                          Long64_t first, Long64_t num,
                          TList *input);
   virtual ~TPacketizerProgressive();

   Long64_t      GetEntriesProcessed(TSlave *s) const;
   Long64_t      GetTotalEntries() const { return fEstTotalEntries; }
   TDSetElement *GetNextPacket(TSlave *s, TMessage *r);

   Long64_t      GetBytesRead() const { return 0; }
   Float_t       GetInitTime() const { return 0; }
   Float_t       GetProcTime() const { return 0; }

   ClassDef(TPacketizerProgressive, 0);  // Packetizer that does not pre-open any files
};

#endif

Last update: Thu Jan 17 09:00:48 2008

This page has been automatically generated. If you have any comments or suggestions about the page layout send a mail to ROOT support, or contact the developers with any questions or problems regarding ROOT.