Logo ROOT   6.08/07
Reference Guide
Go to the documentation of this file.
1 // @(#)root/proofplayer:$Id$
2 // Author: Jan Iwaszkiewicz 11/12/06
4 /*************************************************************************
5  * Copyright (C) 1995-2006, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
12 #ifndef ROOT_TPacketizerAdaptive
13 #define ROOT_TPacketizerAdaptive
15 //////////////////////////////////////////////////////////////////////////
16 // //
17 // TPacketizerAdaptive //
18 // //
19 // This packetizer is based on TPacketizer but uses different //
20 // load-balancing algorithms and data structures. //
21 // Two main improvements in the load-balancing strategy: //
22 // - First one was to change the order in which the files are assigned //
23 // to the computing nodes in such a way that network transfers are //
24 // evenly distributed in the query time. Transfer of the remote files //
25 // was often becoming a bottleneck at the end of a query. //
26 // - The other improvement is the use of time-based packet size. We //
27 // measure the processing rate of all the nodes and calculate the //
28 // packet size, so that it takes certain amount of time. In this way //
29 // packetizer prevents the situation where the query can't finish //
30 // because of one slow node. //
31 // //
32 // The data structures: TFileStat, TFileNode and TSlaveStat are //
33 // enriched + changed and TFileNode::Compare method is changed. //
34 // //
35 //////////////////////////////////////////////////////////////////////////
37 #ifndef ROOT_TVirtualPacketizer
38 #include "TVirtualPacketizer.h"
39 #endif
42 class TMessage;
43 class TTree;
44 class TMap;
45 class TNtupleD;
46 class TProofStats;
47 class TRandom;
48 class TSortedList;
52 public: // public because of Sun CC bug
53  class TFileNode;
54  class TFileStat;
55  class TSlaveStat;
57 private:
58  TList *fFileNodes; // nodes with files
59  TList *fUnAllocated; // nodes with unallocated files
60  TList *fActive; // nodes with unfinished files
61  Int_t fMaxPerfIdx; // maximum of our slaves' performance index
62  TList *fPartitions; // list of partitions on nodes
64  TSortedList *fFilesToProcess; // Global list of files (TFileStat) to be processed
66  Bool_t fCachePacketSync; // control synchronization of cache and packet sizes
67  Double_t fMaxEntriesRatio; // max file entries to avg allowed ratio for cache-to-packet sync
69  Float_t fFractionOfRemoteFiles; // fraction of TDSetElements that are on non-workers
70  Long64_t fNEventsOnRemLoc; // number of events in currently
71  // unalloc files on non-worker loc.
72  Float_t fBaseLocalPreference; // indicates how much more likely the nodes will be
73  // to open their local files (1 means indifferent)
74  Bool_t fForceLocal; // if 1 - eliminate the remote processing
76  Long_t fMaxSlaveCnt; // maximum number of workers per filenode (Long_t to avoid
77  // warnings from backward compatibility support)
78  Int_t fPacketAsAFraction; // used to calculate the packet size
79  // fPacketSize = fTotalEntries / (fPacketAsAFraction * nslaves)
80  // fPacketAsAFraction can be interpreted as follows:
81  // assuming all slaves have equal processing rate, packet size
82  // is (#events processed by 1 slave) / fPacketSizeAsAFraction.
83  // It can be set with PROOF_PacketAsAFraction in input list.
84  Int_t fStrategy; // 0 means the classic and 1 (default) - the adaptive strategy
85  Int_t fTryReassign; // Controls attempts to reassign packets (0 == no reassignment)
88  TPacketizerAdaptive(const TPacketizerAdaptive&); // no implementation, will generate
89  void InitStats(); // initialise the stats
90  void operator=(const TPacketizerAdaptive&); // error on accidental usage
92  TFileNode *NextNode();
93  void RemoveUnAllocNode(TFileNode *);
95  TFileNode *NextActiveNode();
96  void RemoveActiveNode(TFileNode *);
98  TFileStat *GetNextUnAlloc(TFileNode *node = 0, const char *nodeHostName = 0);
99  TFileStat *GetNextActive();
100  void RemoveActive(TFileStat *file);
102  void Reset();
103  void ValidateFiles(TDSet *dset, TList *slaves, Long64_t maxent = -1, Bool_t byfile = kFALSE);
104  Int_t ReassignPacket(TDSetElement *e, TList **listOfMissingFiles);
105  void SplitPerHost(TList *elements, TList **listOfMissingFiles);
107 public:
108  TPacketizerAdaptive(TDSet *dset, TList *slaves, Long64_t first, Long64_t num,
109  TList *input, TProofProgressStatus *st);
110  virtual ~TPacketizerAdaptive();
113  Double_t latency, TList **listOfMissingFiles = 0);
116  Int_t CalculatePacketSize(TObject *slstat, Long64_t cachesz, Int_t learnent);
118  void MarkBad(TSlave *s, TProofProgressStatus *status, TList **missingFiles);
122  ClassDef(TPacketizerAdaptive,0) //Generate work packets for parallel processing
123 };
125 #endif
void MarkBad(TSlave *s, TProofProgressStatus *status, TList **missingFiles)
This method can be called at any time during processing as an effect of handling kPROOF_STOPPROCESS I...
void Reset()
Reset the internal data structure for packet distribution.
long long Long64_t
Definition: RtypesCore.h:69
A simple TTree restricted to a list of double variables only.
Definition: TNtupleD.h:30
float Float_t
Definition: RtypesCore.h:53
void SplitPerHost(TList *elements, TList **listOfMissingFiles)
Split into per host entries The files in the listOfMissingFiles can appear several times; in order to...
This class implements a data set to be used for PROOF processing.
Definition: TDSet.h:153
void ValidateFiles(TDSet *dset, TList *slaves, Long64_t maxent=-1, Bool_t byfile=kFALSE)
Check existence of file/dir/tree an get number of entries.
Int_t GetEstEntriesProcessed(Float_t, Long64_t &ent, Long64_t &bytes, Long64_t &calls)
Get estimation for the number of processed entries and bytes read at time t, based on the numbers alr...
int Int_t
Definition: RtypesCore.h:41
bool Bool_t
Definition: RtypesCore.h:59
const Bool_t kFALSE
Definition: Rtypes.h:92
virtual ~TPacketizerAdaptive()
Int_t GetActiveWorkers()
Return the number of workers still processing.
TFileNode * NextNode()
Get next node which has unallocated files.
Manages an element of a TDSet.
Definition: TDSet.h:68
void operator=(const TPacketizerAdaptive &)
TFileStat * GetNextActive()
Get next active file.
#define ClassDef(name, id)
Definition: Rtypes.h:254
This is the base class for the ROOT Random number generators.
Definition: TRandom.h:31
Int_t AddProcessed(TSlave *sl, TProofProgressStatus *st, Double_t latency, TList **listOfMissingFiles=0)
To be used by GetNextPacket but also in reaction to kPROOF_STOPPROCESS message (when the worker was a...
A sorted doubly linked list.
Definition: TSortedList.h:30
TDSetElement * GetNextPacket(TSlave *sl, TMessage *r)
Get next packet; A meaningfull difference to TPacketizer is the fact that this packetizer, for each worker, tries to predict whether the worker will finish processing it's local files before the end of the query.
This packetizer is based on TPacketizer but uses different load-balancing algorithms and data structu...
A doubly linked list.
Definition: TList.h:47
TRandom2 r(17)
Int_t CalculatePacketSize(TObject *slstat, Long64_t cachesz, Int_t learnent)
The result depends on the fStrategy.
TFileNode * NextActiveNode()
Get next active node.
Int_t ReassignPacket(TDSetElement *e, TList **listOfMissingFiles)
The file in the listOfMissingFiles can appear several times; in order to fix that, a TDSetElement::Merge method is needed.
void InitStats()
(re)initialise the statistics called at the begining or after a worker dies.
long Long_t
Definition: RtypesCore.h:50
The packetizer is a load balancing object created for each query.
double Double_t
Definition: RtypesCore.h:55
TMap implements an associative array of (key,value) pairs using a THashTable for efficient retrieval ...
Definition: TMap.h:44
TFileStat * GetNextUnAlloc(TFileNode *node=0, const char *nodeHostName=0)
Get next unallocated file from 'node' or other nodes: First try 'node'.
TSortedList * fFilesToProcess
Float_t GetCurrentRate(Bool_t &all)
Get Estimation of the current rate; just summing the current rates of the active workers.
you should not use this method at all Int_t Int_t Double_t Double_t Double_t e
Definition: TRolke.cxx:630
Mother of all ROOT objects.
Definition: TObject.h:37
Definition: file.py:1
void RemoveActiveNode(TFileNode *)
Remove node from the list of actives.
void RemoveUnAllocNode(TFileNode *)
Remove unallocated node.
A TTree object has a header with a name and a title.
Definition: TTree.h:98
Definition: first.py:1
Class describing a PROOF worker server.
Definition: TSlave.h:50
Container class for processing statistics.
void RemoveActive(TFileStat *file)
Remove file from the list of actives.