Logo ROOT   6.08/07
Reference Guide
TVirtualPacketizer.h
Go to the documentation of this file.
1 // @(#)root/proofplayer:$Id$
2 // Author: Maarten Ballintijn 9/7/2002
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2002, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 #ifndef ROOT_TVirtualPacketizer
13 #define ROOT_TVirtualPacketizer
14 
15 //////////////////////////////////////////////////////////////////////////
16 // //
17 // TVirtualPacketizer //
18 // //
19 // Packetizer is a load balancing object created for each query. //
20 // It generates packets to be processed on PROOF worker servers. //
21 // A packet is an event range (begin entry and number of entries) or //
22 // object range (first object and number of objects) in a TTree //
23 // (entries) or a directory (objects) in a file. //
24 // Packets are generated taking into account the performance of the //
25 // remote machine, the time it took to process a previous packet on //
26 // the remote machine, the locality of the database files, etc. //
27 // //
28 // TVirtualPacketizer includes common parts of PROOF packetizers. //
29 // Look in subclasses for details. //
30 // The default packetizer is TPacketizerAdaptive. //
31 // To use an alternative one, for instance - the TPacketizer, call: //
32 // proof->SetParameter("PROOF_Packetizer", "TPacketizer"); //
33 // //
34 //////////////////////////////////////////////////////////////////////////
35 
36 #ifndef ROOT_TObject
37 #include "TObject.h"
38 #endif
39 #ifndef ROOT_TSlave
40 #include "TSlave.h"
41 #endif
42 #ifndef ROOT_TProofProgressStatus
43 #include "TProofProgressStatus.h"
44 #endif
45 #ifndef ROOT_TTime
46 #include "TTime.h"
47 #endif
48 
49 
50 class TDSet;
51 class TDSetElement;
52 class TList;
53 class TMap;
54 class TMessage;
55 class TNtuple;
56 class TNtupleD;
57 class TProofProgressInfo;
58 class TSlave;
59 
60 
61 class TVirtualPacketizer : public TObject {
62 
63 public: // public because of Sun CC bug
64  class TVirtualSlaveStat;
65 
66 protected:
67  enum EUseEstOpt { // Option for usage of estimated values
68  kEstOff = 0,
71  };
72 
73  // General configuration parameters
74  Double_t fMinPacketTime; // minimum packet time
75  Double_t fMaxPacketTime; // maximum packet time
76  TList *fConfigParams; // List of configuration parameters
77 
78  TMap *fSlaveStats; // slave status, keyed by correspondig TSlave
79 
80  TProofProgressStatus *fProgressStatus; // pointer to status in the player.
81  TTimer *fProgress; // progress updates timer
82 
83  Long64_t fTotalEntries; // total number of entries to be distributed;
84  // not used in the progressive packetizer
85  TList *fFailedPackets;// a list of packets that failed while processing
86 
87  // Members for progress info
88  TTime fStartTime; // time offset
89  Float_t fInitTime; // time before processing
90  Float_t fProcTime; // time since start of processing
91  Float_t fTimeUpdt; // time between updates
92  TNtupleD *fCircProg; // Keeps circular info for "instantenous"
93  // rate calculations
94  Long_t fCircN; // Circularity
95 
96  TNtuple *fProgressPerf; // {Active workers, evt rate, MBs read} as a function of processing time
97  Float_t fProcTimeLast; // Time of the last measurement
98  Int_t fActWrksLast; // Active workers at fProcTimeLast
99  Float_t fEvtRateLast; // Evt rate at fProcTimeLast
100  Float_t fMBsReadLast; // MBs read at fProcTimeLast
101  Float_t fEffSessLast; // Number of effective sessions at fProcTimeLast
102  Bool_t fAWLastFill; // Whether to fill the last measurement
103  Float_t fReportPeriod; // Time between reports if nothing changes (estimated proc time / 100)
104 
105  EUseEstOpt fUseEstOpt; // Control usage of estimated values for the progress info
106 
107  Bool_t fValid; // Constructed properly?
108  Bool_t fStop; // Termination of Process() requested?
109 
110  TString fDataSet; // Name of the dataset being processed (for dataset-driven runs)
111 
112  TList *fInput; // Input list
113 
115  TVirtualPacketizer(const TVirtualPacketizer &); // no implementation, will generate
116  void operator=(const TVirtualPacketizer &); // error on accidental usage
117 
119  Long64_t GetEntries(Bool_t tree, TDSetElement *e); // Num of entries or objects
120  virtual Bool_t HandleTimer(TTimer *timer);
121 
122 public:
123  enum EStatusBits { kIsInitializing = BIT(16), kIsDone = BIT(17), kIsTree = BIT(18) };
124  virtual ~TVirtualPacketizer();
125 
126  virtual Int_t AssignWork(TDSet* /*dset*/, Long64_t /*first*/, Long64_t /*num*/) { return -1; }
127  Bool_t IsValid() const { return fValid; }
128  Long64_t GetEntriesProcessed() const { return (fProgressStatus? fProgressStatus->GetEntries() : 0); }
130  { ent = GetEntriesProcessed(); bytes = GetBytesRead(); calls = GetReadCalls(); return 0; }
131  virtual Float_t GetCurrentRate(Bool_t &all) { all = kTRUE; return (fProgressStatus? fProgressStatus->GetCurrentRate() : 0.); }
133  virtual TDSetElement *GetNextPacket(TSlave *sl, TMessage *r);
134  virtual void SetInitTime();
135  virtual void StopProcess(Bool_t abort, Bool_t stoptimer = kFALSE);
137  void SetFailedPackets(TList *list) { fFailedPackets = list; }
138  virtual Int_t AddWorkers(TList *workers);
139 
140  Long64_t GetBytesRead() const { return (fProgressStatus? fProgressStatus->GetBytesRead() : 0); }
141  Long64_t GetReadCalls() const { return (fProgressStatus? fProgressStatus->GetReadCalls() : 0); }
142  Double_t GetCumProcTime() const { return fProgressStatus->GetProcTime(); }
143  Float_t GetInitTime() const { return fInitTime; }
144  Float_t GetProcTime() const { return fProcTime; }
145  TNtuple *GetProgressPerf(Bool_t steal = kFALSE) { if (steal) { TNtuple *n = fProgressPerf; fProgressPerf = 0; return n;
146  } else { return fProgressPerf;} }
147  TList *GetConfigParams(Bool_t steal = kFALSE) { if (steal) { TList *l = fConfigParams; fConfigParams = 0; return l;
148  } else { return fConfigParams;} }
149  virtual void MarkBad(TSlave * /*s*/, TProofProgressStatus * /*status*/, TList ** /*missingFiles*/) { return; }
150  virtual Int_t AddProcessed(TSlave * /*sl*/, TProofProgressStatus * /*st*/,
151  Double_t /*lat*/, TList ** /*missingFiles*/) { return 0; }
153  void SetProgressStatus(TProofProgressStatus *st) { fProgressStatus = st; }
154  void SetTotalEntries(Long64_t ent) { fTotalEntries = ent; }
155 
156  TMap *GetSlaveStats() const { return fSlaveStats; }
157 
158  virtual Int_t GetActiveWorkers() { return -1; }
159 
160  ClassDef(TVirtualPacketizer,0) //Generate work packets for parallel processing
161 };
162 
163 //------------------------------------------------------------------------------
164 
166 
167 friend class TPacketizerAdaptive;
168 friend class TPacketizer;
169 
170 protected:
171  TString fWrkFQDN; // Worker FQDN
172  TSlave *fSlave; // corresponding TSlave record
173  TProofProgressStatus *fStatus; // status as of the last finished packet
174 
175 public:
176  const char *GetName() const { return fWrkFQDN.Data(); }
177  const char *GetOrdinal() const { return fSlave->GetOrdinal(); }
178  Long64_t GetEntriesProcessed() const { return fStatus?fStatus->GetEntries():-1; }
179  Double_t GetProcTime() const { return fStatus?fStatus->GetProcTime():-1; }
180  Float_t GetAvgRate() { return fStatus->GetRate(); }
183 };
184 
185 #endif
virtual Int_t AddProcessed(TSlave *, TProofProgressStatus *, Double_t, TList **)
TNtuple * GetProgressPerf(Bool_t steal=kFALSE)
virtual Float_t GetCurrentRate(Bool_t &all)
long long Long64_t
Definition: RtypesCore.h:69
virtual void MarkBad(TSlave *, TProofProgressStatus *, TList **)
A simple TTree restricted to a list of double variables only.
Definition: TNtupleD.h:30
float Float_t
Definition: RtypesCore.h:53
This class implements a data set to be used for PROOF processing.
Definition: TDSet.h:153
Long64_t GetReadCalls() const
#define BIT(n)
Definition: Rtypes.h:120
Long64_t GetBytesRead() const
Double_t GetProcTime() const
Float_t GetProcTime() const
const char * GetOrdinal() const
Definition: TSlave.h:135
Basic string class.
Definition: TString.h:137
int Int_t
Definition: RtypesCore.h:41
bool Bool_t
Definition: RtypesCore.h:59
const Bool_t kFALSE
Definition: Rtypes.h:92
Basic time type with millisecond precision.
Definition: TTime.h:29
Long64_t GetEntries() const
virtual void SetInitTime()
Set the initialization time.
Double_t GetCumProcTime() const
virtual Bool_t HandleTimer(TTimer *timer)
Send progress message to client.
virtual void StopProcess(Bool_t abort, Bool_t stoptimer=kFALSE)
Stop process.
This class generates packets to be processed on PROOF worker servers.
Definition: TPacketizer.h:41
Manages an element of a TDSet.
Definition: TDSet.h:68
const char * GetName() const
Returns name of object.
TStopwatch timer
Definition: pirndm.C:37
TDSetElement * CreateNewPacket(TDSetElement *base, Long64_t first, Long64_t num)
Creates a new TDSetElement from from base packet starting from the first entry with num entries...
#define ClassDef(name, id)
Definition: Rtypes.h:254
void SetProgressStatus(TProofProgressStatus *st)
TVirtualPacketizer(TList *input, TProofProgressStatus *st=0)
Constructor.
TMap * GetSlaveStats() const
virtual Int_t GetEstEntriesProcessed(Float_t, Long64_t &ent, Long64_t &bytes, Long64_t &calls)
Double_t GetRate() const
TProofProgressStatus * fProgressStatus
This packetizer is based on TPacketizer but uses different load-balancing algorithms and data structu...
Long64_t GetTotalEntries() const
A doubly linked list.
Definition: TList.h:47
Double_t GetCurrentRate() const
Get current rate. Rteunr the average rate if the current is not defined.
void SetTotalEntries(Long64_t ent)
TRandom2 r(17)
A simple TTree restricted to a list of float variables only.
Definition: TNtuple.h:30
virtual Int_t AssignWork(TDSet *, Long64_t, Long64_t)
TLine * l
Definition: textangle.C:4
Handles synchronous and a-synchronous timer events.
Definition: TTimer.h:57
virtual ~TVirtualPacketizer()
Destructor.
Long64_t GetEntries(Bool_t tree, TDSetElement *e)
Get entries.
long Long_t
Definition: RtypesCore.h:50
The packetizer is a load balancing object created for each query.
double Double_t
Definition: RtypesCore.h:55
Long64_t GetEntriesProcessed() const
EStatusBits
Definition: TObject.h:55
TMap implements an associative array of (key,value) pairs using a THashTable for efficient retrieval ...
Definition: TMap.h:44
you should not use this method at all Int_t Int_t Double_t Double_t Double_t e
Definition: TRolke.cxx:630
virtual TDSetElement * GetNextPacket(TSlave *sl, TMessage *r)
Get next packet.
void SetFailedPackets(TList *list)
Bool_t IsValid() const
Long64_t GetReadCalls() const
Mother of all ROOT objects.
Definition: TObject.h:37
Long64_t GetBytesRead() const
void operator=(const TVirtualPacketizer &)
virtual Int_t GetActiveWorkers()
Definition: tree.py:1
TProofProgressStatus * GetStatus()
Definition: first.py:1
TList * GetConfigParams(Bool_t steal=kFALSE)
TProofProgressStatus * GetProgressStatus()
Class describing a PROOF worker server.
Definition: TSlave.h:50
Container class for processing statistics.
const Bool_t kTRUE
Definition: Rtypes.h:91
virtual TProofProgressStatus * AddProcessed(TProofProgressStatus *st)=0
const Int_t n
Definition: legend1.C:16
Float_t GetInitTime() const
const char * Data() const
Definition: TString.h:349
virtual Int_t AddWorkers(TList *workers)
Adds new workers.