66 class TPacketizer::TFileStat :
public TObject {
77 Bool_t IsDone()
const {
return fIsDone;}
78 void SetDone() {fIsDone =
kTRUE;}
79 TFileNode *GetNode()
const {
return fNode;}
81 Long64_t GetNextEntry()
const {
return fNextEntry;}
82 void MoveNextEntry(
Long64_t step) {fNextEntry += step;}
86 TPacketizer::TFileStat::TFileStat(TFileNode *node,
TDSetElement *elem)
87 : fIsDone(
kFALSE), fNode(node), fElement(elem), fNextEntry(elem->GetFirst())
94 class TPacketizer::TFileNode :
public TObject {
106 TFileNode(
const char *
name);
107 ~TFileNode() {
delete fFiles;
delete fActFiles; }
109 void IncMySlaveCnt() { fMySlaveCnt++; }
110 void IncSlaveCnt(
const char *slave) {
if (fNodeName != slave) fSlaveCnt++; }
111 void DecSlaveCnt(
const char *slave) {
if (fNodeName != slave) fSlaveCnt--;
R__ASSERT(fSlaveCnt >= 0); }
112 Int_t GetSlaveCnt()
const {
return fMySlaveCnt + fSlaveCnt;}
113 Int_t GetNumberOfActiveFiles()
const {
return fActFiles->
GetSize(); }
116 const char *
GetName()
const {
return fNodeName.
Data(); }
120 TFileStat *
f =
new TFileStat(
this,elem);
122 if (fUnAllocFileNext == 0) fUnAllocFileNext = fFiles->
First();
125 TFileStat *GetNextUnAlloc()
127 TObject *next = fUnAllocFileNext;
131 fActFiles->
Add(next);
132 if (fActFileNext == 0) fActFileNext = fActFiles->
First();
135 fUnAllocFileNext = fFiles->
After(fUnAllocFileNext);
138 return (TFileStat *) next;
141 TFileStat *GetNextActive()
145 if (fActFileNext != 0) {
146 fActFileNext = fActFiles->
After(fActFileNext);
147 if (fActFileNext == 0) fActFileNext = fActFiles->
First();
150 return (TFileStat *) next;
153 void RemoveActive(TFileStat *
file)
155 if (fActFileNext == file) fActFileNext = fActFiles->
After(file);
157 if (fActFileNext == 0) fActFileNext = fActFiles->
First();
164 const TFileNode *obj =
dynamic_cast<const TFileNode*
>(other);
166 Error(
"Compare",
"input is not a TPacketizer::TFileNode object");
170 Int_t myVal = GetSlaveCnt();
171 Int_t otherVal = obj->GetSlaveCnt();
172 if (myVal < otherVal) {
174 }
else if (myVal > otherVal) {
183 std::cout <<
"OBJ: " << IsA()->GetName() <<
"\t" << fNodeName
184 <<
"\tMySlaveCount " << fMySlaveCnt
185 <<
"\tSlaveCount " << fSlaveCnt << std::endl;
190 fUnAllocFileNext = fFiles->
First();
199 TPacketizer::TFileNode::TFileNode(
const char *
name)
200 : fNodeName(name), fFiles(new
TList), fUnAllocFileNext(0),fActFiles(new
TList),
201 fActFileNext(0), fMySlaveCnt(0), fSlaveCnt(0)
217 TFileNode *fFileNode;
222 TSlaveStat(
TSlave *slave);
225 TFileNode *GetFileNode()
const {
return fFileNode; }
227 void SetFileNode(TFileNode *node) { fFileNode = node; }
231 TPacketizer::TSlaveStat::TSlaveStat(
TSlave *slave)
232 : fFileNode(0), fCurFile(0), fCurElem(0)
241 TPacketizer::TSlaveStat::~TSlaveStat()
255 fStatus->SetLastProcTime(0.);
263 Error(
"AddProcessed",
"status arg undefined");
279 PDB(kPacketizer,1)
Info(
"TPacketizer",
"Enter (first %lld, num %lld)", first, num);
292 Error(
"TPacketizer",
"No progress status");
298 if (maxSlaveCnt < 0) {
299 Warning(
"TPacketizer",
"PROOF_MaxSlavesPerNode must be positive");
308 Warning(
"TPacketizer",
"PROOF_MaxSlavesPerNode must be positive");
311 maxSlaveCnt = (
Long_t) mxslcnt;
316 maxSlaveCnt =
gEnv->
GetValue(
"Packetizer.MaxWorkersPerNode", slaves->GetSize());
319 if (maxSlaveCnt > 0) {
322 Info(
"TPacketizer",
"setting max number of workers per node to %ld",
fMaxSlaveCnt);
363 if (host.
Contains(
"localhost") || host ==
"127.0.0.1") {
371 node =
new TFileNode(host);
383 Info(
"TPacketizer",
"Initial number of workers: %d", nwrks);
388 Int_t validateMode = 0;
394 "processing subset of entries: validating by file? %s", byfile ?
"yes":
"no");
407 PDB(kPacketizer,2)
Info(
"TPacketizer",
"processing range: first %lld, num %lld", first, num);
427 Info(
"TPacketizer",
" --> first %lld, num %lld (cur %lld)", eFirst, eNum, cur);
431 if (cur + eNum < first) {
434 Info(
"TPacketizer",
" --> skip element cur %lld", cur);
439 if (num != -1 && (first+num <= cur)) {
442 Info(
"TPacketizer",
" --> drop element cur %lld", cur);
447 if (cur <= first || (num != -1 && (first+num <= cur+eNum))) {
452 e->
SetFirst( eFirst + (first - cur) );
455 Info(
"TPacketizer",
" --> adjust start %lld and end %lld",
456 eFirst + (first - cur), first + num - cur);
459 if (num != -1 && (first+num <= cur+eNum)) {
464 Info(
"TPacketizer",
" --> adjust end %lld", first + num - cur);
471 Info(
"TPacketizer",
" --> increment 'cur' by %lld", eNum);
486 eNum = evl ? evl->
GetN() : eNum;
492 Info(
"TPacketizer",
" --> next cur %lld", cur);
508 if (host.
Contains(
"localhost") || host ==
"127.0.0.1") {
516 node =
new TFileNode( host );
527 Info(
"TPacketizer",
"processing %lld entries in %d files on %d hosts",
537 Info(
"TPacketizer",
"no valid or non-empty file found: setting invalid");
550 Long_t packetAsAFraction = 20;
552 Info(
"Process",
"using alternate fraction of query time as a packet Size: %ld",
576 PDB(kPacketizer,1)
Info(
"TPacketizer",
"Return");
601 Error(
"AddWorkers",
"Null list of new workers!");
609 while (( sl = dynamic_cast<TSlave*>(next()) ))
641 file = node->GetNextUnAlloc();
645 file = node->GetNextUnAlloc();
667 std::cout <<
"TPacketizer::NextUnAllocNode()" << std::endl;
673 PDB(kPacketizer,1)
Info(
"NextUnAllocNode",
"reached workers per node limit (%ld)",
698 file = node->GetNextActive();
712 Printf(
"TPacketizer::NextActiveNode : ----------------------");
719 Info(
"NextActiveNode",
"reached workers per node limit (%ld)",
fMaxSlaveCnt);
731 TFileNode *node = file->GetNode();
733 node->RemoveActive(file);
757 while ((fn = (TFileNode*) files.
Next()) != 0) {
763 while ((key = slaves.
Next()) != 0) {
768 slstat->SetFileNode(fn);
771 slstat->fCurFile = 0;
773 Warning(
"Reset",
"TSlaveStat associated to key '%s' is NULL", key->
GetName());
796 Info(
"ValidateFiles",
"socket added to monitor: %p (%s)",
801 Info(
"ValidateFiles",
802 "mon: %p, wrk: %p, sck: %p", &mon, slm, slm->
GetSocket());
813 TString msg(
"Validating files");
830 Error(
"ValidateFiles",
"TSlaveStat associated to slave '%s' is NULL", s->GetName());
837 if ( (node = slstat->GetFileNode()) != 0 ) {
840 slstat->SetFileNode(0);
853 slstat->fCurFile = file;
856 if (entries < 0 || strlen(elem->
GetTitle()) <= 0) {
858 file->GetNode()->IncSlaveCnt(slstat->GetName());
865 s->GetSocket()->Send( m );
868 Info(
"ValidateFiles",
869 "sent to worker-%s (%s) via %p GETENTRIES on %s %s %s %s",
870 s->GetOrdinal(), s->GetName(), s->GetSocket(),
881 Error(
"ValidateFiles",
882 "first (%lld) higher then number of entries (%lld) in %s",
885 slstat->fCurFile->SetDone();
889 if (elem->
GetNum() == -1) {
892 Warning(
"ValidateFiles",
"num (%lld) + first (%lld) larger then number of" 898 Info(
"ValidateFiles",
899 "found elem '%s' with %lld entries", elem->
GetFileName(), entries);
914 if (byfile && maxent > 0 && totent > 0) {
916 Long64_t nrestf = (maxent - totent) * nopenf / totent ;
917 if (nrestf <= 0 && maxent > totent) nrestf = 1;
920 Info(
"ValidateFiles",
"{%lld, %lld, %lld): needs to validate %lld more files",
921 maxent, totent, nopenf, nrestf);
923 while ((slm = (
TSlave *) si.
Next()) && nrestf--) {
929 Info(
"ValidateFiles",
"no need to validate more files");
938 Info(
"ValidateFiles",
"waiting for %d workers:", mon.
GetActive());
942 while ((s = (
TSocket*) next())) {
943 Info(
"ValidateFiles",
"found sck: %p", s);
954 Error(
"ValidateFiles",
"selection has been interrupted - STOP");
961 PDB(kPacketizer,3)
Info(
"ValidateFiles",
"select returned: %p", sock);
966 Error(
"ValidateFiles",
"worker-%s (%s) got invalid - STOP",
975 if ( sock->
Recv(reply) <= 0 ) {
979 Error(
"ValidateFiles",
"Recv failed! for worker-%s (%s)",
989 Error(
"ValidateFiles",
"kPROOF_FATAL from worker-%s (%s)",
1002 slavestat->fCurFile->GetNode()->DecSlaveCnt(slavestat->GetName());
1005 (*reply) >> entries;
1010 (*reply) >> objname;
1015 if ( entries > 0 ) {
1023 Error(
"ValidateFiles",
"first (%lld) higher then number of entries (%lld) in %s",
1027 slavestat->fCurFile->SetDone();
1032 if ( e->
GetNum() == -1 ) {
1035 Error(
"ValidateFiles",
1036 "num (%lld) + first (%lld) larger then number of keys/entries (%lld) in %s",
1067 PDB(kPacketizer,3)
Info(
"ValidateFiles",
" %lld events validated", totent);
1070 if (maxent < 0 || ((totent < maxent) && !byfile))
1091 while ( (el = dynamic_cast<TDSetElement*> (next())) ) {
1093 el->SetTDSetOffset(offset);
1107 if ( slstat == 0 )
return 0;
1109 return slstat->GetEntriesProcessed();
1124 while ((key = nxw()) != 0) {
1126 if (slstat && slstat->GetProgressStatus() && slstat->GetEntriesProcessed() > 0) {
1128 currate += slstat->GetProgressStatus()->GetCurrentRate();
1158 if ( slstat->fCurElem != 0 ) {
1159 Double_t latency = 0., proctime = 0., proccpu = 0.;
1163 Long64_t numev = slstat->fCurElem->GetNum();
1176 numev = status->
GetEntries() - slstat->GetEntriesProcessed();
1177 progress = slstat->AddProcessed(status);
1182 totev = status->GetEntries();
1188 Error(
"GetNextPacket",
"no status came in the kPROOF_GETPACKET message");
1191 (*r) >> latency >> proctime >> proccpu;
1198 numev = totev - slstat->GetEntriesProcessed();
1199 if (numev > 0) slstat->GetProgressStatus()->IncEntries(numev);
1200 if (bytesRead > 0) slstat->GetProgressStatus()->IncBytesRead(bytesRead);
1201 if (numev > 0 || bytesRead > 0) slstat->GetProgressStatus()->SetLastUpdate();
1210 Info(
"GetNextPacket",
"worker-%s (%s): %lld %7.3lf %7.3lf %7.3lf %lld",
1212 numev, latency, proctime, proccpu, bytesRead);
1216 numev, latency, proctime, proccpu, bytesRead);
1218 slstat->fCurElem = 0;
1224 firstPacket =
kTRUE;
1234 TFileStat *file = slstat->fCurFile;
1236 if ( file != 0 && file->IsDone() ) {
1237 file->GetNode()->DecSlaveCnt(slstat->GetName());
1240 file->GetElement()->GetFileName(),
kFALSE);
1244 slstat->fCurFile = file;
1249 if (slstat->GetFileNode() != 0) {
1252 slstat->SetFileNode(0);
1266 if (!file)
return 0;
1268 slstat->fCurFile = file;
1269 file->GetNode()->IncSlaveCnt(slstat->GetName());
1272 file->GetNode()->GetName(),
1273 file->GetElement()->GetFileName(),
kTRUE);
1280 if (num < 1) num = 1;
1285 if ( first + num >= last ) {
1293 file->MoveNextEntry(num);
1299 slstat->fCurElem->SetEntryList(base->
GetEntryList(), first, num);
1310 return slstat->fCurElem;
1321 while ((key = nxw())) {
1323 if (wrkstat && wrkstat->fCurFile) actw++;
virtual Int_t AddProcessed(TSlave *, TProofProgressStatus *, Double_t, TList **)
virtual Bool_t IsValid() const
Long64_t GetTDSetOffset() const
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Long64_t GetEntries(Bool_t istree=kTRUE, Bool_t openfile=kTRUE)
Returns number of entries in tree or objects in file.
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
virtual Long64_t GetN() const
void SetProtocol(const char *proto, Bool_t setDefaultPort=kFALSE)
Set protocol and, optionally, change the port accordingly.
Int_t GetActiveWorkers()
Return the number of workers still processing.
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
void SetTDSetOffset(Long64_t offset)
This class represents a WWW compatible URL.
TObject * GetParameter(const char *par) const
Get specified parameter.
virtual Int_t Recv(TMessage *&mess)
Receive a TMessage object.
const char * GetProtocol() const
This class implements a data set to be used for PROOF processing.
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
TSocket * GetSocket() const
Double_t GetProcTime() const
virtual Int_t GetEntries() const
virtual void AddAll(const TCollection *col)
Add all objects from collection col to this collection.
void SetLastUpdate(Double_t updtTime=0)
Update time stamp either with the passed value (if > 0) or with the current time. ...
void Add(TObject *obj)
This function may not be used (but we need to provide it since it is a pure virtual in TCollection)...
virtual void Add(TSocket *sock, Int_t interest=kRead)
Add socket to the monitor's active list.
Long64_t GetFirst() const
const char * GetOrdinal() const
Float_t GetCurrentRate(Bool_t &all)
Get Estimation of the current rate; just summing the current rates of the active workers.
void RemoveActiveNode(TFileNode *)
Remove node from the list of actives.
virtual ~TPacketizer()
Destructor.
virtual void Print(Option_t *option="") const
This method must be overridden when a class wants to print itself.
const char * GetName() const
Returns name of object.
virtual void DeActivateAll()
De-activate all activated sockets.
Long64_t GetEntries() const
Int_t GetProtocol() const
TList * GetListOfElements() const
const char * GetHostFQDN() const
Return fully qualified domain name of url host.
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
virtual TObject * FindObject(const char *name) const
Find an object in this list using its name.
virtual void Sort(Bool_t order=kSortAscending)
Sort linked list.
This class generates packets to be processed on PROOF worker servers.
Manages an element of a TDSet.
const char * GetHost() const
TDSetElement * GetNextPacket(TSlave *sl, TMessage *r)
Get next packet.
TDSetElement * CreateNewPacket(TDSetElement *base, Long64_t first, Long64_t num)
Creates a new TDSetElement from from base packet starting from the first entry with num entries...
TSocket * GetSocket() const
virtual void DeActivate(TSocket *sock)
De-activate a socket.
void IncEntries(Long64_t entries=1)
void IncBytesRead(Long64_t bytesRead)
virtual Bool_t HandleTimer(TTimer *timer)
Execute action in response of a timer timing out.
virtual Int_t GetN() const
const char * GetDirectory() const
Return directory where to look for object.
void Reset()
Reset the internal datastructure for packet distribution.
TSocket * Select()
Return pointer to socket for which an event is waiting.
TProofProgressStatus * fProgressStatus
const char * GetObjName() const
void SetLastEntries(Long64_t entries)
void RemoveUnAllocNode(TFileNode *)
Remove unallocated node.
void SendDataSetStatus(const char *msg, UInt_t n, UInt_t tot, Bool_t st)
Send or notify data set status.
void RemoveActive(TFileStat *file)
Remove file from the list of actives.
const char * GetName() const
Return static name for TOutputListSelectorDataMap objects.
virtual TObject * First() const
Return the first object in the list. Returns 0 when list is empty.
virtual Bool_t IsSortable() const
R__EXTERN TSystem * gSystem
virtual Int_t GetValue(const char *name, Int_t dflt)
Returns the integer value for a resource.
virtual TObject * Remove(TObject *obj)
Remove object from the list.
TList * GetListOfActives() const
Returns a list with all active sockets.
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
char * Form(const char *fmt,...)
TFileNode * NextUnAllocNode()
Get next unallocated node.
A TEventList object is a list of selected events (entries) in a TTree.
TFileStat * GetNextUnAlloc(TFileNode *node=0)
Get next unallocated file.
virtual TObject * After(const TObject *obj) const
Returns the object after object obj.
virtual Int_t Compare(const TObject *obj) const
Compare abstract method.
void Reset(Detail::TBranchProxy *x)
virtual void Activate(TSocket *sock)
Activate a de-activated socket.
void Print(Option_t *options="") const
Print a TDSetElement. When option="a" print full data.
void DeleteValues()
Remove all (key,value) pairs from the map AND delete the values when they are allocated on the heap...
void SetHost(const char *host)
The packetizer is a load balancing object created for each query.
R__EXTERN TProof * gProof
Int_t AddWorkers(TList *workers)
Adds new workers. Returns the number of workers added, or -1 on failure.
void Add(THist< DIMENSIONS, PRECISION_TO, STAT_TO... > &to, const THist< DIMENSIONS, PRECISION_FROM, STAT_FROM... > &from)
Add two histograms.
TObject * GetEntryList() const
virtual const char * HostName()
Return the system's host name.
Long64_t GetEntriesProcessed() const
Double_t GetCPUTime() const
TMap implements an associative array of (key,value) pairs using a THashTable for efficient retrieval ...
This class controls a Parallel ROOT Facility, PROOF, cluster.
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
void SetNum(Long64_t num)
you should not use this method at all Int_t Int_t Double_t Double_t Double_t e
const char * GetFileName() const
virtual void Clear(Option_t *option="")
Remove all objects from the list.
void ValidateFiles(TDSet *dset, TList *slaves, Long64_t maxent=-1, Bool_t byfile=kFALSE)
Check existence of file/dir/tree an get number of entries.
Mother of all ROOT objects.
TObject * FindObject(const char *keyname) const
Check if a (key,value) pair exists with keyname as name of the key.
Long64_t GetBytesRead() const
const char * GetDataSet() const
R__EXTERN TProofServ * gProofServ
virtual void Add(TObject *obj)
TObject * GetValue(const char *keyname) const
Returns a pointer to the value associated with keyname as name of the key.
virtual void Print(Option_t *option="") const
Default print for collections, calls Print(option, 1).
virtual const char * GetName() const
Returns name of object.
virtual Int_t GetSize() const
Class describing a PROOF worker server.
TFileStat * GetNextActive()
Get next active file.
Container class for processing statistics.
void SetFirst(Long64_t first)
virtual void SetTitle(const char *title="")
Set the title of the TNamed.
A List of entry numbers in a TTree or TChain.
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
virtual const char * GetTitle() const
Returns title of object.
const char * Data() const
TFileNode * NextActiveNode()
Get next active node.