Logo ROOT  
Reference Guide
Go to the documentation of this file.
1// @(#)root/proofplayer:$Id$
2// Author: Jan Iwaszkiewicz 11/12/06
5 * Copyright (C) 1995-2006, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
12#ifndef ROOT_TPacketizerAdaptive
13#define ROOT_TPacketizerAdaptive
16// //
17// TPacketizerAdaptive //
18// //
19// This packetizer is based on TPacketizer but uses different //
20// load-balancing algorithms and data structures. //
21// Two main improvements in the load-balancing strategy: //
22// - First one was to change the order in which the files are assigned //
23// to the computing nodes in such a way that network transfers are //
24// evenly distributed in the query time. Transfer of the remote files //
25// was often becoming a bottleneck at the end of a query. //
26// - The other improvement is the use of time-based packet size. We //
27// measure the processing rate of all the nodes and calculate the //
28// packet size, so that it takes certain amount of time. In this way //
29// packetizer prevents the situation where the query can't finish //
30// because of one slow node. //
31// //
32// The data structures: TFileStat, TFileNode and TSlaveStat are //
33// enriched + changed and TFileNode::Compare method is changed. //
34// //
37#include "TVirtualPacketizer.h"
40class TMessage;
41class TTree;
42class TMap;
43class TNtupleD;
44class TProofStats;
45class TRandom;
46class TSortedList;
50public: // public because of Sun CC bug
51 class TFileNode;
52 class TFileStat;
53 class TSlaveStat;
56 TList *fFileNodes; // nodes with files
57 TList *fUnAllocated; // nodes with unallocated files
58 TList *fActive; // nodes with unfinished files
59 Int_t fMaxPerfIdx; // maximum of our slaves' performance index
60 TList *fPartitions; // list of partitions on nodes
62 TSortedList *fFilesToProcess; // Global list of files (TFileStat) to be processed
64 Bool_t fCachePacketSync; // control synchronization of cache and packet sizes
65 Double_t fMaxEntriesRatio; // max file entries to avg allowed ratio for cache-to-packet sync
67 Float_t fFractionOfRemoteFiles; // fraction of TDSetElements that are on non-workers
68 Long64_t fNEventsOnRemLoc; // number of events in currently
69 // unalloc files on non-worker loc.
70 Float_t fBaseLocalPreference; // indicates how much more likely the nodes will be
71 // to open their local files (1 means indifferent)
72 Bool_t fForceLocal; // if 1 - eliminate the remote processing
74 Long_t fMaxSlaveCnt; // maximum number of workers per filenode (Long_t to avoid
75 // warnings from backward compatibility support)
76 Int_t fPacketAsAFraction; // used to calculate the packet size
77 // fPacketSize = fTotalEntries / (fPacketAsAFraction * nslaves)
78 // fPacketAsAFraction can be interpreted as follows:
79 // assuming all slaves have equal processing rate, packet size
80 // is (#events processed by 1 slave) / fPacketSizeAsAFraction.
81 // It can be set with PROOF_PacketAsAFraction in input list.
82 Int_t fStrategy; // 0 means the classic and 1 (default) - the adaptive strategy
83 Int_t fTryReassign; // Controls attempts to reassign packets (0 == no reassignment)
86 TPacketizerAdaptive(const TPacketizerAdaptive&); // no implementation, will generate
87 void InitStats(); // initialise the stats
88 void operator=(const TPacketizerAdaptive&); // error on accidental usage
96 TFileStat *GetNextUnAlloc(TFileNode *node = 0, const char *nodeHostName = 0);
100 void Reset();
101 void ValidateFiles(TDSet *dset, TList *slaves, Long64_t maxent = -1, Bool_t byfile = kFALSE);
102 Int_t ReassignPacket(TDSetElement *e, TList **listOfMissingFiles);
103 void SplitPerHost(TList *elements, TList **listOfMissingFiles);
107 TList *input, TProofProgressStatus *st);
108 virtual ~TPacketizerAdaptive();
111 Double_t latency, TList **listOfMissingFiles = 0);
114 Int_t CalculatePacketSize(TObject *slstat, Long64_t cachesz, Int_t learnent);
116 void MarkBad(TSlave *s, TProofProgressStatus *status, TList **missingFiles);
120 ClassDef(TPacketizerAdaptive,0) //Generate work packets for parallel processing
ROOT::R::TRInterface & r
Definition: Object.C:4
#define e(i)
Definition: RSha256.hxx:103
int Int_t
Definition: RtypesCore.h:45
const Bool_t kFALSE
Definition: RtypesCore.h:101
long Long_t
Definition: RtypesCore.h:54
bool Bool_t
Definition: RtypesCore.h:63
double Double_t
Definition: RtypesCore.h:59
long long Long64_t
Definition: RtypesCore.h:80
float Float_t
Definition: RtypesCore.h:57
#define ClassDef(name, id)
Definition: Rtypes.h:325
Manages an element of a TDSet.
Definition: TDSet.h:66
This class implements a data set to be used for PROOF processing.
Definition: TDSet.h:153
A doubly linked list.
Definition: TList.h:44
TMap implements an associative array of (key,value) pairs using a THashTable for efficient retrieval ...
Definition: TMap.h:40
A simple TTree restricted to a list of double variables only.
Definition: TNtupleD.h:28
Mother of all ROOT objects.
Definition: TObject.h:37
This packetizer is based on TPacketizer but uses different load-balancing algorithms and data structu...
Int_t AddProcessed(TSlave *sl, TProofProgressStatus *st, Double_t latency, TList **listOfMissingFiles=0)
To be used by GetNextPacket but also in reaction to kPROOF_STOPPROCESS message (when the worker was a...
Int_t ReassignPacket(TDSetElement *e, TList **listOfMissingFiles)
The file in the listOfMissingFiles can appear several times; in order to fix that,...
void SplitPerHost(TList *elements, TList **listOfMissingFiles)
Split into per host entries The files in the listOfMissingFiles can appear several times; in order to...
virtual ~TPacketizerAdaptive()
void operator=(const TPacketizerAdaptive &)
TFileNode * NextNode()
Get next node which has unallocated files.
TDSetElement * GetNextPacket(TSlave *sl, TMessage *r)
Get next packet; A meaningfull difference to TPacketizer is the fact that this packetizer,...
void RemoveActive(TFileStat *file)
Remove file from the list of actives.
Int_t GetEstEntriesProcessed(Float_t, Long64_t &ent, Long64_t &bytes, Long64_t &calls)
Get estimation for the number of processed entries and bytes read at time t, based on the numbers alr...
void ValidateFiles(TDSet *dset, TList *slaves, Long64_t maxent=-1, Bool_t byfile=kFALSE)
Check existence of file/dir/tree an get number of entries.
TPacketizerAdaptive(const TPacketizerAdaptive &)
void RemoveActiveNode(TFileNode *)
Remove node from the list of actives.
Int_t GetActiveWorkers()
Return the number of workers still processing.
void MarkBad(TSlave *s, TProofProgressStatus *status, TList **missingFiles)
This method can be called at any time during processing as an effect of handling kPROOF_STOPPROCESS I...
Int_t CalculatePacketSize(TObject *slstat, Long64_t cachesz, Int_t learnent)
The result depends on the fStrategy.
TFileNode * NextActiveNode()
Get next active node.
TFileStat * GetNextUnAlloc(TFileNode *node=0, const char *nodeHostName=0)
Get next unallocated file from 'node' or other nodes: First try 'node'.
void RemoveUnAllocNode(TFileNode *)
Remove unallocated node.
void Reset()
Reset the internal data structure for packet distribution.
void InitStats()
(re)initialise the statistics called at the begining or after a worker dies.
Float_t GetCurrentRate(Bool_t &all)
Get Estimation of the current rate; just summing the current rates of the active workers.
TSortedList * fFilesToProcess
TFileStat * GetNextActive()
Get next active file.
Container class for processing statistics.
This is the base class for the ROOT Random number generators.
Definition: TRandom.h:27
Class describing a PROOF worker server.
Definition: TSlave.h:46
A sorted doubly linked list.
Definition: TSortedList.h:28
A TTree represents a columnar dataset.
Definition: TTree.h:79
The packetizer is a load balancing object created for each query.
static constexpr double s
Definition: file.py:1
Definition: first.py:1