Logo ROOT   6.10/09
Reference Guide
TPacketizerAdaptive.h
Go to the documentation of this file.
1 // @(#)root/proofplayer:$Id$
2 // Author: Jan Iwaszkiewicz 11/12/06
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2006, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 #ifndef ROOT_TPacketizerAdaptive
13 #define ROOT_TPacketizerAdaptive
14 
15 //////////////////////////////////////////////////////////////////////////
16 // //
17 // TPacketizerAdaptive //
18 // //
19 // This packetizer is based on TPacketizer but uses different //
20 // load-balancing algorithms and data structures. //
21 // Two main improvements in the load-balancing strategy: //
22 // - First one was to change the order in which the files are assigned //
23 // to the computing nodes in such a way that network transfers are //
24 // evenly distributed in the query time. Transfer of the remote files //
25 // was often becoming a bottleneck at the end of a query. //
26 // - The other improvement is the use of time-based packet size. We //
27 // measure the processing rate of all the nodes and calculate the //
28 // packet size, so that it takes certain amount of time. In this way //
29 // packetizer prevents the situation where the query can't finish //
30 // because of one slow node. //
31 // //
32 // The data structures: TFileStat, TFileNode and TSlaveStat are //
33 // enriched + changed and TFileNode::Compare method is changed. //
34 // //
35 //////////////////////////////////////////////////////////////////////////
36 
37 #include "TVirtualPacketizer.h"
38 
39 
40 class TMessage;
41 class TTree;
42 class TMap;
43 class TNtupleD;
44 class TProofStats;
45 class TRandom;
46 class TSortedList;
47 
49 
50 public: // public because of Sun CC bug
51  class TFileNode;
52  class TFileStat;
53  class TSlaveStat;
54 
55 private:
56  TList *fFileNodes; // nodes with files
57  TList *fUnAllocated; // nodes with unallocated files
58  TList *fActive; // nodes with unfinished files
59  Int_t fMaxPerfIdx; // maximum of our slaves' performance index
60  TList *fPartitions; // list of partitions on nodes
61 
62  TSortedList *fFilesToProcess; // Global list of files (TFileStat) to be processed
63 
64  Bool_t fCachePacketSync; // control synchronization of cache and packet sizes
65  Double_t fMaxEntriesRatio; // max file entries to avg allowed ratio for cache-to-packet sync
66 
67  Float_t fFractionOfRemoteFiles; // fraction of TDSetElements that are on non-workers
68  Long64_t fNEventsOnRemLoc; // number of events in currently
69  // unalloc files on non-worker loc.
70  Float_t fBaseLocalPreference; // indicates how much more likely the nodes will be
71  // to open their local files (1 means indifferent)
72  Bool_t fForceLocal; // if 1 - eliminate the remote processing
73 
74  Long_t fMaxSlaveCnt; // maximum number of workers per filenode (Long_t to avoid
75  // warnings from backward compatibility support)
76  Int_t fPacketAsAFraction; // used to calculate the packet size
77  // fPacketSize = fTotalEntries / (fPacketAsAFraction * nslaves)
78  // fPacketAsAFraction can be interpreted as follows:
79  // assuming all slaves have equal processing rate, packet size
80  // is (#events processed by 1 slave) / fPacketSizeAsAFraction.
81  // It can be set with PROOF_PacketAsAFraction in input list.
82  Int_t fStrategy; // 0 means the classic and 1 (default) - the adaptive strategy
83  Int_t fTryReassign; // Controls attempts to reassign packets (0 == no reassignment)
84 
86  TPacketizerAdaptive(const TPacketizerAdaptive&); // no implementation, will generate
87  void InitStats(); // initialise the stats
88  void operator=(const TPacketizerAdaptive&); // error on accidental usage
89 
90  TFileNode *NextNode();
91  void RemoveUnAllocNode(TFileNode *);
92 
93  TFileNode *NextActiveNode();
94  void RemoveActiveNode(TFileNode *);
95 
96  TFileStat *GetNextUnAlloc(TFileNode *node = 0, const char *nodeHostName = 0);
97  TFileStat *GetNextActive();
98  void RemoveActive(TFileStat *file);
99 
100  void Reset();
101  void ValidateFiles(TDSet *dset, TList *slaves, Long64_t maxent = -1, Bool_t byfile = kFALSE);
102  Int_t ReassignPacket(TDSetElement *e, TList **listOfMissingFiles);
103  void SplitPerHost(TList *elements, TList **listOfMissingFiles);
104 
105 public:
106  TPacketizerAdaptive(TDSet *dset, TList *slaves, Long64_t first, Long64_t num,
107  TList *input, TProofProgressStatus *st);
108  virtual ~TPacketizerAdaptive();
109 
111  Double_t latency, TList **listOfMissingFiles = 0);
114  Int_t CalculatePacketSize(TObject *slstat, Long64_t cachesz, Int_t learnent);
116  void MarkBad(TSlave *s, TProofProgressStatus *status, TList **missingFiles);
117 
119 
120  ClassDef(TPacketizerAdaptive,0) //Generate work packets for parallel processing
121 };
122 
123 #endif
void MarkBad(TSlave *s, TProofProgressStatus *status, TList **missingFiles)
This method can be called at any time during processing as an effect of handling kPROOF_STOPPROCESS I...
void Reset()
Reset the internal data structure for packet distribution.
long long Long64_t
Definition: RtypesCore.h:69
A simple TTree restricted to a list of double variables only.
Definition: TNtupleD.h:28
float Float_t
Definition: RtypesCore.h:53
void SplitPerHost(TList *elements, TList **listOfMissingFiles)
Split into per host entries The files in the listOfMissingFiles can appear several times; in order to...
This class implements a data set to be used for PROOF processing.
Definition: TDSet.h:151
void ValidateFiles(TDSet *dset, TList *slaves, Long64_t maxent=-1, Bool_t byfile=kFALSE)
Check existence of file/dir/tree an get number of entries.
Int_t GetEstEntriesProcessed(Float_t, Long64_t &ent, Long64_t &bytes, Long64_t &calls)
Get estimation for the number of processed entries and bytes read at time t, based on the numbers alr...
int Int_t
Definition: RtypesCore.h:41
bool Bool_t
Definition: RtypesCore.h:59
virtual ~TPacketizerAdaptive()
Destructor.
Int_t GetActiveWorkers()
Return the number of workers still processing.
TFileNode * NextNode()
Get next node which has unallocated files.
Manages an element of a TDSet.
Definition: TDSet.h:66
void operator=(const TPacketizerAdaptive &)
TFileStat * GetNextActive()
Get next active file.
#define ClassDef(name, id)
Definition: Rtypes.h:297
This is the base class for the ROOT Random number generators.
Definition: TRandom.h:27
Int_t AddProcessed(TSlave *sl, TProofProgressStatus *st, Double_t latency, TList **listOfMissingFiles=0)
To be used by GetNextPacket but also in reaction to kPROOF_STOPPROCESS message (when the worker was a...
A sorted doubly linked list.
Definition: TSortedList.h:28
TDSetElement * GetNextPacket(TSlave *sl, TMessage *r)
Get next packet; A meaningfull difference to TPacketizer is the fact that this packetizer, for each worker, tries to predict whether the worker will finish processing it's local files before the end of the query.
This packetizer is based on TPacketizer but uses different load-balancing algorithms and data structu...
A doubly linked list.
Definition: TList.h:43
TRandom2 r(17)
Int_t CalculatePacketSize(TObject *slstat, Long64_t cachesz, Int_t learnent)
The result depends on the fStrategy.
TFileNode * NextActiveNode()
Get next active node.
Int_t ReassignPacket(TDSetElement *e, TList **listOfMissingFiles)
The file in the listOfMissingFiles can appear several times; in order to fix that, a TDSetElement::Merge method is needed.
const Bool_t kFALSE
Definition: RtypesCore.h:92
void InitStats()
(re)initialise the statistics called at the begining or after a worker dies.
long Long_t
Definition: RtypesCore.h:50
The packetizer is a load balancing object created for each query.
double Double_t
Definition: RtypesCore.h:55
TMap implements an associative array of (key,value) pairs using a THashTable for efficient retrieval ...
Definition: TMap.h:40
TFileStat * GetNextUnAlloc(TFileNode *node=0, const char *nodeHostName=0)
Get next unallocated file from 'node' or other nodes: First try 'node'.
TSortedList * fFilesToProcess
Float_t GetCurrentRate(Bool_t &all)
Get Estimation of the current rate; just summing the current rates of the active workers.
you should not use this method at all Int_t Int_t Double_t Double_t Double_t e
Definition: TRolke.cxx:630
Mother of all ROOT objects.
Definition: TObject.h:37
Definition: file.py:1
void RemoveActiveNode(TFileNode *)
Remove node from the list of actives.
void RemoveUnAllocNode(TFileNode *)
Remove unallocated node.
A TTree object has a header with a name and a title.
Definition: TTree.h:78
Definition: first.py:1
Class describing a PROOF worker server.
Definition: TSlave.h:46
Container class for processing statistics.
void RemoveActive(TFileStat *file)
Remove file from the list of actives.