77 class TPacketizerAdaptive::TFileStat :
public TObject {
88 Bool_t IsDone()
const {
return fIsDone;}
90 void SetDone() {fIsDone =
kTRUE;}
91 TFileNode *GetNode()
const {
return fNode;}
93 Long64_t GetNextEntry()
const {
return fNextEntry;}
94 void MoveNextEntry(
Long64_t step) {fNextEntry += step;}
101 const TFileStat *fst =
dynamic_cast<const TFileStat*
>(
obj);
102 if (fst && GetElement() && fst->GetElement()) {
103 Long64_t ent = GetElement()->GetNum();
104 Long64_t entfst = fst->GetElement()->GetNum();
105 if (ent > 0 && entfst > 0) {
108 }
else if (ent < entfst) {
120 Printf(
"TFileStat: %s %lld", fElement ? fElement->
GetName() :
"---",
121 fElement ? fElement->
GetNum() : -1);
125 TPacketizerAdaptive::TFileStat::TFileStat(TFileNode *node,
TDSetElement *elem,
TList *files)
126 : fIsDone(
kFALSE), fNode(node), fElement(elem), fNextEntry(elem->GetFirst())
129 if (files) files->
Add(
this);
135 class TPacketizerAdaptive::TFileNode :
public TObject {
158 ~TFileNode() {
delete fFiles;
delete fActFiles; }
160 void IncMySlaveCnt() { fMySlaveCnt++; }
161 Int_t GetMySlaveCnt()
const {
return fMySlaveCnt; }
162 void IncExtSlaveCnt(
const char *slave) {
if (fNodeName != slave) fExtSlaveCnt++; }
163 void DecExtSlaveCnt(
const char *slave) {
if (fNodeName != slave) fExtSlaveCnt--;
R__ASSERT(fExtSlaveCnt >= 0); }
164 Int_t GetSlaveCnt()
const {
return fMySlaveCnt + fExtSlaveCnt; }
165 void IncRunSlaveCnt() { fRunSlaveCnt++; }
166 void DecRunSlaveCnt() { fRunSlaveCnt--;
R__ASSERT(fRunSlaveCnt >= 0); }
167 Int_t GetRunSlaveCnt()
const {
return fRunSlaveCnt; }
168 Int_t GetExtSlaveCnt()
const {
return fExtSlaveCnt; }
169 Int_t GetNumberOfActiveFiles()
const {
return fActFiles->
GetSize(); }
174 Long64_t GetProcessed()
const {
return fProcessed; }
178 Long64_t GetEventsLeftPerSlave()
const
179 {
return ((fEvents - fProcessed)/(fRunSlaveCnt + 1)); }
181 const char *
GetName()
const {
return fNodeName.
Data(); }
182 Long64_t GetNEvents()
const {
return fEvents; }
189 Printf(
"++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
190 Printf(
"+++ TFileNode: %s +++", fNodeName.
Data());
191 Printf(
"+++ Evts: %lld (total: %lld) ", fProcessed, fEvents);
192 Printf(
"+++ Worker count: int:%d, ext: %d, tot:%d ", fMySlaveCnt, fExtSlaveCnt, fRunSlaveCnt);
194 if (fFiles && fFiles->
GetSize() > 0) {
196 while ((fs = (TFileStat *) nxf())) {
197 if ((e = fs->GetElement())) {
198 Printf(
"+++ #%d: %s %lld - %lld (%lld) - next: %lld ", ++nn, e->
GetName(),
201 Printf(
"+++ #%d: no element! ", ++nn);
205 Printf(
"+++ Active files: %d ", fActFiles ? fActFiles->
GetSize() : 0);
206 if (fActFiles && fActFiles->
GetSize() > 0) {
207 TIter nxaf(fActFiles);
208 while ((fs = (TFileStat *) nxaf())) {
209 if ((e = fs->GetElement())) {
210 Printf(
"+++ #%d: %s %lld - %lld (%lld) - next: %lld", ++nn, e->
GetName(),
213 Printf(
"+++ #%d: no element! ", ++nn);
217 Printf(
"++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
223 TFileStat *f =
new TFileStat(
this, elem, files);
225 if (fUnAllocFileNext == 0) fUnAllocFileNext = fFiles->
First();
230 TObject *next = fUnAllocFileNext;
234 fActFiles->
Add(next);
235 if (fActFileNext == 0) fActFileNext = fActFiles->
First();
238 fUnAllocFileNext = fFiles->
After(fUnAllocFileNext);
240 return (TFileStat *)
next;
247 if (fActFileNext != 0) {
248 fActFileNext = fActFiles->
After(fActFileNext);
249 if (fActFileNext == 0) fActFileNext = fActFiles->
First();
252 return (TFileStat *)
next;
257 if (fActFileNext == file) fActFileNext = fActFiles->
After(file);
259 if (fFilesToProcess) fFilesToProcess->
Remove(file);
260 if (fActFileNext == 0) fActFileNext = fActFiles->
First();
271 const TFileNode *obj =
dynamic_cast<const TFileNode*
>(other);
273 Error(
"Compare",
"input is not a TPacketizer::TFileNode object");
279 if (fStrategy == 1) {
281 Int_t myVal = GetRunSlaveCnt();
282 Int_t otherVal = obj->GetRunSlaveCnt();
283 if (myVal < otherVal) {
285 }
else if (myVal > otherVal) {
289 if ((fEvents - fProcessed) >
290 (obj->GetNEvents() - obj->GetProcessed())) {
297 Int_t myVal = GetSlaveCnt();
298 Int_t otherVal = obj->GetSlaveCnt();
299 if (myVal < otherVal) {
301 }
else if (myVal > otherVal) {
311 fUnAllocFileNext = fFiles->
First();
321 TPacketizerAdaptive::TFileNode::TFileNode(
const char *name,
Int_t strategy,
TSortedList *files)
322 : fNodeName(name), fFiles(new
TList), fUnAllocFileNext(0),
323 fActFiles(new
TList), fActFileNext(0), fMySlaveCnt(0),
324 fExtSlaveCnt(0), fRunSlaveCnt(0), fProcessed(0), fEvents(0),
325 fStrategy(strategy), fFilesToProcess(files)
340 TFileNode *fFileNode;
348 TSlaveStat(
TSlave *slave);
350 TFileNode *GetFileNode()
const {
return fFileNode; }
353 TFileStat *GetCurFile() {
return fCurFile; }
354 void SetFileNode(TFileNode *node) { fFileNode = node; }
358 return (fCurProcTime?fCurProcessed/fCurProcTime:0); }
359 Int_t GetLocalEventsLeft() {
360 return fFileNode?(fFileNode->GetEventsLeftPerSlave()):0; }
361 TList *GetProcessedSubSet() {
return fDSubSet; }
369 TPacketizerAdaptive::TSlaveStat::TSlaveStat(
TSlave *slave)
370 : fFileNode(0), fCurFile(0), fCurElem(0),
371 fCurProcessed(0), fCurProcTime(0)
373 fDSubSet =
new TList();
380 if (strcmp(slave->
ClassName(),
"TSlaveLite")) {
383 if (fWrkFQDN.Contains(
"localhost") || fWrkFQDN ==
"127.0.0.1")
387 Info("TSlaveStat", "wrk FQDN: %s", fWrkFQDN.
Data());
405 Error(
"UpdateRates",
"no status object!");
408 if (fCurFile->IsDone()) {
428 if (st && fDSubSet && fCurElem) {
431 fDSubSet->
Add(fCurElem);
435 Error(
"AddProcessed",
"processed subset of current elem undefined");
452 PDB(kPacketizer,1)
Info(
"TPacketizerAdaptive",
453 "enter (first %lld, num %lld)", first, num);
471 Error(
"TPacketizerAdaptive",
"No progress status");
479 cpsync =
gEnv->
GetValue(
"Packetizer.CachePacketSync", 1);
499 Info(
"TPacketizerAdaptive",
"using the basic strategy of TPacketizer");
500 }
else if (strategy != 1) {
501 Warning(
"TPacketizerAdaptive",
"unsupported strategy index (%d): ignore", strategy);
506 if (maxSlaveCnt < 0) {
507 Info(
"TPacketizerAdaptive",
508 "The value of PROOF_MaxSlavesPerNode must be positive");
516 Info(
"TPacketizerAdaptive",
517 "The value of PROOF_MaxSlavesPerNode must be positive");
520 maxSlaveCnt = (
Long_t) mxslcnt;
525 maxSlaveCnt =
gEnv->
GetValue(
"Packetizer.MaxWorkersPerNode", 0);
526 if (maxSlaveCnt > 0) {
528 Info(
"TPacketizerAdaptive",
"Setting max number of workers per node to %ld",
538 Int_t forceLocal = 0;
543 Info(
"TPacketizerAdaptive",
544 "The only accepted value of PROOF_ForceLocal parameter is 1 !");
553 Int_t packetAsAFraction = 0;
555 if (packetAsAFraction > 0) {
557 Info(
"TPacketizerAdaptive",
558 "using alternate fraction of query time as a packet size: %d",
561 Info(
"TPacketizerAdaptive",
"packetAsAFraction parameter must be higher than 0");
566 Int_t tryReassign = 0;
568 tryReassign =
gEnv->
GetValue(
"Packetizer.TryReassign", 0);
571 Info(
"TPacketizerAdaptive",
"failed packets will be re-assigned");
603 partitionsStr =
gEnv->
GetValue(
"Packetizer.Partitions",
"");
604 if (!partitionsStr.
IsNull()) {
605 Info(
"TPacketizerAdaptive",
"Partitions: %s", partitionsStr.
Data());
606 partitions = partitionsStr.
Tokenize(
",");
638 if (host.Contains(
"localhost") || host ==
"127.0.0.1") {
646 TIter iString(partitions);
665 node =
new TFileNode(nodeStr, fStrategy, fFilesToProcess);
668 Info(
"TPacketizerAdaptive",
"creating new node '%s' or the element", nodeStr.
Data());
671 Info(
"TPacketizerAdaptive",
"adding element to existing node '%s'", nodeStr.
Data());
691 Int_t validateMode = 0;
696 Info(
"TPacketizerAdaptive",
697 "processing subset of entries: validating by file? %s", byfile ?
"yes":
"no");
712 Info(
"TPacketizerAdaptive",
713 "processing range: first %lld, num %lld", first, num);
727 Info(
"TPacketizerAdaptive",
"processing element '%s'", e->
GetFileName());
729 Info(
"TPacketizerAdaptive",
730 " --> first %lld, elenum %lld (cur %lld) (entrylist: %p)", eFirst, eNum, cur, e->
GetEntryList());
734 if (cur + eNum < first) {
737 Info(
"TPacketizerAdaptive",
" --> skip element cur %lld", cur);
742 if (num != -1 && (first+num <= cur)) {
745 Info(
"TPacketizerAdaptive",
" --> drop element cur %lld", cur);
750 if (cur <= first || (num != -1 && (first+num <= cur+eNum))) {
755 e->
SetFirst( eFirst + (first - cur) );
758 Info(
"TPacketizerAdaptive",
" --> adjust start %lld and end %lld",
759 eFirst + (first - cur), first + num - cur);
762 if (num != -1 && (first+num <= cur+eNum)) {
767 Info(
"TPacketizerAdaptive",
" --> adjust end %lld", first + num - cur);
774 Info(
"TPacketizerAdaptive",
" --> increment 'cur' by %lld", eNum);
788 Info(
"TPacketizerAdaptive",
" --> entry-list element: %lld entries", eNum);
791 eNum = evl ? evl->
GetN() : eNum;
793 Info(
"TPacketizerAdaptive",
" --> event-list element: %lld entries (evl:%p)", eNum, evl);
797 Info(
"TPacketizerAdaptive",
" --> empty entry- or event-list element!");
802 Info(
"TPacketizerAdaptive",
" --> next cur %lld", cur);
818 if (host.
Contains(
"localhost") || host ==
"127.0.0.1") {
826 TIter iString(partitions);
846 node =
new TFileNode(nodeStr, fStrategy, fFilesToProcess);
849 Info(
"TPacketizerAdaptive",
" --> creating new node '%s' for element", nodeStr.
Data());
852 Info(
"TPacketizerAdaptive",
" --> adding element to exiting node '%s'", nodeStr.
Data());
858 node->IncEvents(eNum);
862 Info(
"TPacketizerAdaptive",
"processing %lld entries in %d files on %d hosts",
876 PDB(kPacketizer,1)
Info(
"TPacketizerAdaptive",
"return");
903 Int_t noRemoteFiles = 0;
905 Int_t totalNumberOfFiles = 0;
907 while (TFileNode *fn = (TFileNode*)
next()) {
908 totalNumberOfFiles += fn->GetNumberOfFiles();
909 if (fn->GetMySlaveCnt() == 0) {
910 noRemoteFiles += fn->GetNumberOfFiles();
915 if (totalNumberOfFiles == 0) {
916 Info(
"InitStats",
"no valid or non-empty file found: setting invalid");
929 PDB(kPacketizer,1)
Info(
"InitStats",
"return");
943 Info(
"GetNextUnAlloc",
"looking for file on node %s", node->GetName());
944 file = node->GetNextUnAlloc();
947 if (nodeHostName && strlen(nodeHostName) > 0) {
958 TUrl uu(fn->GetName());
960 Info(
"GetNextUnAlloc",
"comparing %s with %s...", nodeHostName, uu.GetHost());
963 if (!strcmp(nodeHostName, uu.GetHost())) {
967 if ((file = node->GetNextUnAlloc()) == 0) {
972 Info(
"GetNextUnAlloc",
"found! (host: %s)", uu.GetHost());
977 Warning(
"GetNextUnAlloc",
"unallocate entry %d is empty!", i);
984 Info(
"GetNextUnAlloc",
"reached Workers-per-Node Limit (%ld)",
fMaxSlaveCnt);
990 while (file == 0 && ((node =
NextNode()) != 0)) {
992 Info(
"GetNextUnAlloc",
"looking for file on node %s", node->GetName());
1005 PDB(kPacketizer, 2) {
1007 Info(
"GetNextUnAlloc",
"no file found!");
1023 PDB(kPacketizer,2) {
1052 TFileStat *file = 0;
1055 file = node->GetNextActive();
1069 PDB(kPacketizer,2) {
1070 Info(
"NextActiveNode",
"enter");
1078 Info(
"NextActiveNode",
"reached Workers-per-Node limit (%ld)",
fMaxSlaveCnt);
1090 TFileNode *node = file->GetNode();
1092 node->RemoveActive(file);
1116 while ((fn = (TFileNode*) files.
Next()) != 0) {
1122 while ((key = slaves.
Next()) != 0) {
1125 Warning(
"Reset",
"TSlaveStat associated to key '%s' is NULL", key->
GetName());
1130 TFileNode *fnmin = 0;
1133 while ((fn = (TFileNode*) files.
Next()) != 0) {
1134 if (!strcmp(slstat->GetName(),
TUrl(fn->GetName()).GetHost())) {
1135 if (fn->GetMySlaveCnt() < fncnt) {
1137 fncnt = fn->GetMySlaveCnt();
1142 slstat->SetFileNode(fnmin);
1143 fnmin->IncMySlaveCnt();
1145 Info(
"Reset",
"assigning node '%s' to '%s' (cnt: %d)",
1146 fnmin->GetName(), slstat->GetName(), fnmin->GetMySlaveCnt());
1148 slstat->fCurFile = 0;
1159 TMap slaves_by_sock;
1171 Info(
"ValidateFiles",
"socket added to monitor: %p (%s)",
1188 TString msg(
"Validating files");
1205 Error(
"ValidateFiles",
"TSlaveStat associated to slave '%s' is NULL", s->GetName());
1209 TFileNode *node = 0;
1210 TFileStat *file = 0;
1213 if ((node = slstat->GetFileNode()) != 0) {
1214 PDB(kPacketizer,3) node->Print();
1217 slstat->SetFileNode(0);
1228 slstat->fCurFile = file;
1231 if (entries < 0 || strlen(elem->
GetTitle()) <= 0) {
1233 file->GetNode()->IncExtSlaveCnt(slstat->GetName());
1240 s->GetSocket()->Send( m );
1243 Info(
"ValidateFiles",
1244 "sent to worker-%s (%s) via %p GETENTRIES on %s %s %s %s",
1245 s->GetOrdinal(), s->GetName(), s->GetSocket(),
1256 Error(
"ValidateFiles",
1257 "first (%lld) higher then number of entries (%lld) in %s",
1260 slstat->fCurFile->SetDone();
1264 if (elem->
GetNum() == -1) {
1267 Warning(
"ValidateFiles",
"num (%lld) + first (%lld) larger then number of"
1273 Info(
"ValidateFiles",
1274 "found elem '%s' with %lld entries", elem->
GetFileName(), entries);
1292 if (byfile && maxent > 0) {
1294 Long64_t nrestf = (maxent - totent) * nopenf / totent ;
1295 if (nrestf <= 0 && maxent > totent) nrestf = 1;
1298 Info(
"ValidateFiles",
"{%lld, %lld, %lld}: needs to validate %lld more files",
1299 maxent, totent, nopenf, nrestf);
1301 while ((slm = (
TSlave *) si.
Next()) && nrestf--) {
1307 Info(
"ValidateFiles",
"no need to validate more files");
1315 PDB(kPacketizer,3) {
1316 Info(
"ValidateFiles",
"waiting for %d slaves:", mon.
GetActive());
1322 Info(
"ValidateFiles",
" worker-%s (%s)",
1331 Error(
"ValidateFiles",
"selection has been interrupted - STOP");
1338 PDB(kPacketizer,3)
Info(
"ValidateFiles",
"select returned: %p", sock);
1343 Error(
"ValidateFiles",
"worker-%s (%s) got invalid - STOP",
1345 ((
TProof*)
gProof)->MarkBad(slave,
"socket got invalid during validation");
1352 if (sock->
Recv(reply) <= 0) {
1354 Error(
"ValidateFiles",
"Recv failed! for worker-%s (%s)",
1357 ((
TProof*)
gProof)->MarkBad(slave,
"receive failed during validation");
1367 Error(
"ValidateFiles",
"kPROOF_FATAL from worker-%s (%s)",
1380 slavestat->fCurFile->GetNode()->DecExtSlaveCnt(slavestat->GetName());
1383 (*reply) >> entries;
1388 (*reply) >> objname;
1400 Error(
"ValidateFiles",
1401 "first (%lld) higher then number of entries (%lld) in %s",
1405 slavestat->fCurFile->SetDone();
1413 Error(
"ValidateFiles",
1414 "num (%lld) + first (%lld) larger then number of keys/entries (%lld) in %s",
1430 Error(
"ValidateFiles",
"cannot get entries for file: %s - skipping", e->
GetFileName() );
1437 m <<
TString(
Form(
"Cannot get entries for file: %s - skipping",
1446 PDB(kPacketizer,3)
Info(
"ValidateFiles",
" %lld events validated", totent);
1449 if (maxent < 0 || ((totent < maxent) && !byfile))
1469 while ( (el = dynamic_cast<TDSetElement*> (
next())) ) {
1470 if (el->GetValid()) {
1472 el->SetTDSetOffset(offset);
1484 if (fStrategy == 0) {
1497 TSlaveStat* slstat = (TSlaveStat*)slStatPtr;
1498 Float_t rate = slstat->GetCurRate();
1500 rate = slstat->GetAvgRate();
1519 if (fFilesToProcess->
Last()) {
1521 if (elem) maxEntries = elem->
GetNum();
1524 PDB(kPacketizer,3) {
1525 Info(
"CalculatePacketSize",
"%s: switching off synchronization of packet and cache sizes:", slstat->GetOrdinal());
1526 Info(
"CalculatePacketSize",
"%s: few files (%d) remaining of very different sizes (max/avg = %.2f > %.2f)",
1527 slstat->GetOrdinal(), fFilesToProcess->
GetSize(),
1534 if (bevt > 0. && cachesz > 0 && cpsync) {
1535 if ((
Long64_t)(rate * packetTime * bevt) < cachesz)
1536 packetTime = cachesz / bevt / rate;
1544 num = (
Long64_t)(rate * packetTime);
1548 Info(
"CalculatePacketSize",
"%s: avgr: %f, rate: %f, left: %lld, pacT: %f, sz: %f (csz: %f), num: %lld",
1550 packetTime, ((bevt > 0) ? num*bevt/1048576. : -1.), cachesz/1048576., num);
1555 num = (learnent > 0) ? 5 * learnent : 1000;
1559 Info(
"CalculatePacketSize",
"%s: num: %lld", slstat->GetOrdinal(), num);
1562 if (num < 1) num = 1;
1574 TList **listOfMissingFiles)
1579 Error(
"AddProcessed",
"%s: TSlaveStat instance for worker %s not found!",
1581 (sl ? sl->
GetName() :
"**undef**"));
1587 if ( slstat->fCurElem != 0 ) {
1588 Long64_t expectedNumEv = slstat->fCurElem->GetNum();
1592 numev = status->
GetEntries() - slstat->GetEntriesProcessed();
1600 progress = slstat->AddProcessed(status);
1602 (*fProgressStatus) += *progress;
1604 slstat->UpdateRates(status);
1611 Info(
"AddProcessed",
"%s: %s: %lld %7.3lf %7.3lf %7.3lf %lld",
1617 slstat->fCurElem->GetFileName(),
1625 if (numev != expectedNumEv) {
1631 if (newPacket && numev < expectedNumEv) {
1633 newPacket->
SetFirst(first + numev);
1637 Error(
"AddProcessed",
"%s: processed too much? (%lld, %lld)",
1650 slstat->fCurElem = 0;
1651 return (expectedNumEv - numev);
1678 Error(
"GetNextPacket",
"TSlaveStat instance for worker %s not found!",
1679 (sl ? sl->
GetName() :
"**undef**"));
1684 TFileStat *file = slstat->fCurFile;
1690 Int_t learnent = -1;
1691 if ( slstat->fCurElem != 0 ) {
1694 Double_t latency, proctime, proccpu;
1704 (*r) >> cachesz >> learnent;
1714 (*r) >> latency >> proctime >> proccpu;
1725 if (!fileNotOpen && !fileCorrupted) {
1727 Error(
"GetNextPacket",
"%s: the worker processed a different # of entries", sl->
GetOrdinal());
1730 Error(
"GetNextPacket",
"%s: processed too many entries! (%lld, %lld)",
1738 if (file->GetElement()) {
1739 if (fileCorrupted) {
1740 Info(
"GetNextPacket",
"%s: file '%s' turned corrupted: invalidating file (%lld)",
1741 sl->
GetOrdinal(), file->GetElement()->GetName(), restEntries);
1744 Info(
"GetNextPacket",
"%s: %d entries un-processed", sl->
GetOrdinal(), nunproc);
1749 num = file->GetElement()->GetEntries() + restEntries;
1753 Long64_t rest = file->GetElement()->GetEntries() - file->GetNextEntry();
1754 num = restEntries + rest;
1756 file->GetElement()->SetEntries(num);
1758 Info(
"GetNextPacket",
"%s: removed file: %s, entries left: %lld", sl->
GetOrdinal(),
1759 file->GetElement()->GetName(), file->GetElement()->GetEntries());
1763 Info(
"GetNextPacket",
"%s: file '%s' could not be open: invalidating related element",
1764 sl->
GetOrdinal(), file->GetElement()->GetName());
1767 file->GetElement()->Invalidate();
1777 Info(
"GetNextPacket",
"%s: error raised by worker, but TFileStat object invalid:"
1782 firstPacket =
kTRUE;
1791 if (file != 0) nodeName = file->GetNode()->GetName();
1792 TString nodeHostName(slstat->GetName());
1795 Info(
"GetNextPacket",
"%s: entries processed: %lld - looking for a packet from node '%s'",
1799 if ( file != 0 && file->IsDone() ) {
1800 file->GetNode()->DecExtSlaveCnt(slstat->GetName());
1801 file->GetNode()->DecRunSlaveCnt();
1804 file->GetElement()->GetFileName(),
kFALSE);
1808 slstat->fCurFile = file;
1821 if ( slstat->GetFileNode() != 0 ) {
1825 Bool_t nonLocalNodePossible;
1827 nonLocalNodePossible = 0;
1829 nonLocalNodePossible = firstNonLocalNode ?
1830 (fMaxSlaveCnt < 0 || (fMaxSlaveCnt > 0 && firstNonLocalNode->GetExtSlaveCnt() <
fMaxSlaveCnt))
1832 openLocal = !nonLocalNodePossible;
1833 Float_t slaveRate = slstat->GetAvgRate();
1834 if ( nonLocalNodePossible && fStrategy == 1) {
1836 if ( slstat->GetFileNode()->GetRunSlaveCnt() >
1837 slstat->GetFileNode()->GetMySlaveCnt() - 1 )
1841 else if ( slaveRate == 0 ) {
1844 if ( slstat->GetLocalEventsLeft() * localPreference
1845 > (avgEventsLeftPerSlave))
1847 else if ( (firstNonLocalNode->GetEventsLeftPerSlave())
1848 < slstat->GetLocalEventsLeft() * localPreference )
1850 else if ( firstNonLocalNode->GetExtSlaveCnt() > 1 )
1852 else if ( firstNonLocalNode->GetRunSlaveCnt() == 0 )
1856 Float_t slaveTime = slstat->GetLocalEventsLeft()/slaveRate;
1858 Float_t avgTime = avgEventsLeftPerSlave
1860 if (slaveTime * localPreference > avgTime)
1862 else if ((firstNonLocalNode->GetEventsLeftPerSlave())
1863 < slstat->GetLocalEventsLeft() * localPreference)
1867 if (openLocal || fStrategy == 0) {
1869 file = slstat->GetFileNode()->GetNextUnAlloc();
1871 file = slstat->GetFileNode()->GetNextActive();
1874 slstat->SetFileNode(0);
1887 if (file == 0)
return 0;
1889 PDB(kPacketizer,3)
if (fFilesToProcess) fFilesToProcess->
Print();
1891 slstat->fCurFile = file;
1893 if (file->GetNode()->GetMySlaveCnt() == 0 &&
1894 file->GetElement()->GetFirst() == file->GetNextEntry()) {
1897 Error(
"GetNextPacket",
1898 "inconsistent value for fNEventsOnRemLoc (%lld): stop delivering packets!",
1903 file->GetNode()->IncExtSlaveCnt(slstat->GetName());
1904 file->GetNode()->IncRunSlaveCnt();
1907 file->GetNode()->GetName(),
1908 file->GetElement()->GetFileName(),
kTRUE);
1916 Long64_t first = file->GetNextEntry();
1922 if ( first + num * 1.5 >= last ) {
1930 file->MoveNextEntry(num);
1934 slstat->fCurElem->SetEntryList(base->
GetEntryList(), first, num);
1945 return slstat->fCurElem;
1956 while ((key = nxw())) {
1958 if (wrkstat && wrkstat->fCurFile) actw++;
1976 while ((key = nxw()) != 0) {
1978 if (slstat && slstat->GetProgressStatus() && slstat->GetEntriesProcessed() > 0) {
1980 currate += slstat->GetProgressStatus()->GetCurrentRate();
2022 while ((key = nxw()) != 0) {
2026 Long64_t e = slstat->GetEntriesProcessed();
2027 if (e <= 0) all =
kFALSE;
2029 dt = now - slstat->GetProgressStatus()->GetLastUpdate();
2031 Float_t rate = (current && slstat->GetCurRate() > 0) ? slstat->GetCurRate()
2032 : slstat->GetAvgRate();
2040 Info(
"GetEstEntriesProcessed",
"%s: e:%lld rate:%f dt:%f e:%lld",
2041 slstat->fSlave->GetOrdinal(),
2042 slstat->GetEntriesProcessed(), rate, dt, e);
2049 Info(
"GetEstEntriesProcessed",
2050 "dt: %f, estimated entries: %lld (%lld), bytes read: %lld rate: %f (all: %d)",
2059 return ((all) ? 0 : 1);
2073 TList **listOfMissingFiles)
2077 Error(
"MarkBad",
"Worker does not exist");
2081 if (slaveStat->fCurFile && slaveStat->fCurFile->GetNode()) {
2082 slaveStat->fCurFile->GetNode()->DecExtSlaveCnt(slaveStat->GetName());
2083 slaveStat->fCurFile->GetNode()->DecRunSlaveCnt();
2090 TList *subSet = slaveStat->GetProcessedSubSet();
2093 if (slaveStat->fCurElem) {
2094 subSet->
Add(slaveStat->fCurElem);
2097 Int_t nmg = 0, ntries = 100;
2111 }
while (nmg > 0 && --ntries > 0);
2117 Warning(
"MarkBad",
"subset processed by bad worker not found!");
2119 (*fProgressStatus) -= *(slaveStat->GetProgressStatus());
2133 TList **listOfMissingFiles)
2136 Error(
"ReassignPacket",
"empty packet!");
2157 node->DecreaseProcessed(e->
GetNum());
2166 if (listOfMissingFiles && *listOfMissingFiles)
2167 (*listOfMissingFiles)->Add((
TObject *)fi);
2178 TList **listOfMissingFiles)
2181 Error(
"SplitPerHost",
"Empty list of packets!");
2184 if (elements->
GetSize() <= 0) {
2185 Error(
"SplitPerHost",
"The input list contains no elements");
2188 TIter subSetIter(elements);
2194 Error(
"SplitPerHost",
"Error removing a missing file");
const char * GetHost() const
const char * GetName() const
Returns name of object.
virtual const char * GetTitle() const
Returns title of object.
void MarkBad(TSlave *s, TProofProgressStatus *status, TList **missingFiles)
This method can be called at any time during processing as an effect of handling kPROOF_STOPPROCESS I...
const char * GetOrdinal() const
TFileInfo * GetFileInfo(const char *type="TTree")
Return the content of this element in the form of a TFileInfo.
TProofProgressStatus * fStatus
void Reset()
Reset the internal data structure for packet distribution.
Long64_t GetEntries(Bool_t istree=kTRUE, Bool_t openfile=kTRUE)
Returns number of entries in tree or objects in file.
void Add(THist< DIMENSION, PRECISIONA > &to, THist< DIMENSION, PRECISIONB > &from)
TSocket * GetSocket() const
void SetProtocol(const char *proto, Bool_t setDefaultPort=kFALSE)
Set protocol and, optionally, change the port accordingly.
ClassImp(TSeqCollection) Int_t TSeqCollection TIter next(this)
Return index of object in collection.
Collectable string class.
virtual Bool_t IsValid() const
Float_t fBaseLocalPreference
void SplitPerHost(TList *elements, TList **listOfMissingFiles)
Split into per host entries The files in the listOfMissingFiles can appear several times; in order to...
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
void SetTDSetOffset(Long64_t offset)
This class represents a WWW compatible URL.
virtual Int_t Recv(TMessage *&mess)
Receive a TMessage object.
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
const char * GetProtocol() const
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Long64_t GetBytesRead() const
void ValidateFiles(TDSet *dset, TList *slaves, Long64_t maxent=-1, Bool_t byfile=kFALSE)
Check existence of file/dir/tree an get number of entries.
Double_t GetLastUpdate() const
Int_t GetEstEntriesProcessed(Float_t, Long64_t &ent, Long64_t &bytes, Long64_t &calls)
Get estimation for the number of processed entries and bytes read at time t, based on the numbers alr...
void Add(TObject *obj)
This function may not be used (but we need to provide it since it is a pure virtual in TCollection)...
virtual void Add(TSocket *sock, Int_t interest=kRead)
Add socket to the monitor's active list.
TList * GetListOfActives() const
Returns a list with all active sockets.
virtual TObject * FindObject(const char *name) const
Find an object in this list using its name.
void Print(Option_t *options="") const
Print a TDSetElement. When option="a" print full data.
Basic time type with millisecond precision.
virtual void DeActivateAll()
De-activate all activated sockets.
Long64_t fNEventsOnRemLoc
virtual TObject * At(Int_t idx) const
Returns the object at position idx. Returns 0 if idx is out of range.
virtual Bool_t HandleTimer(TTimer *timer)
Send progress message to client.
Long64_t GetEntriesProcessed() const
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
virtual ~TPacketizerAdaptive()
Destructor.
Int_t GetActiveWorkers()
Return the number of workers still processing.
TFileNode * NextNode()
Get next node which has unallocated files.
Double_t GetProcTime() const
const char * GetObjName() const
virtual void Sort(Bool_t order=kSortAscending)
Sort linked list.
virtual Int_t GetN() const
const char * Data() const
Double_t GetCPUTime() const
Double_t GetCumProcTime() const
Long64_t GetTDSetOffset() const
TFileStat * GetNextActive()
Get next active file.
virtual TObject * After(const TObject *obj) const
Returns the object after object obj.
TDSetElement * CreateNewPacket(TDSetElement *base, Long64_t first, Long64_t num)
Creates a new TDSetElement from from base packet starting from the first entry with num entries...
virtual void DeActivate(TSocket *sock)
De-activate a socket.
Long64_t GetBytesRead() const
Int_t AddProcessed(TSlave *sl, TProofProgressStatus *st, Double_t latency, TList **listOfMissingFiles=0)
To be used by GetNextPacket but also in reaction to kPROOF_STOPPROCESS message (when the worker was a...
virtual void AddAll(const TCollection *col)
Double_t fMaxEntriesRatio
TList * GetListOfElements() const
A sorted doubly linked list.
std::vector< std::vector< double > > Data
TObject * GetValue(const char *keyname) const
Returns a pointer to the value associated with keyname as name of the key.
TDSetElement * GetNextPacket(TSlave *sl, TMessage *r)
Get next packet; A meaningfull difference to TPacketizer is the fact that this packetizer, for each worker, tries to predict whether the worker will finish processing it's local files before the end of the query.
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
TSocket * Select()
Return pointer to socket for which an event is waiting.
TObject * GetParameter(const char *par) const
Get specified parameter.
TProofProgressStatus * fProgressStatus
Long64_t GetFirst() const
virtual Bool_t IsSortable() const
virtual void Print(Option_t *option="") const
This method must be overridden when a class wants to print itself.
const char * GetName() const
Returns name of object.
void SetLastEntries(Long64_t entries)
Named parameter, streamable and storable.
TSocket * GetSocket() const
virtual TTime Now()
Get current time in milliseconds since 0:00 Jan 1 1995.
void SendDataSetStatus(const char *msg, UInt_t n, UInt_t tot, Bool_t st)
Send or notify data set status.
const char * GetFileName() const
TString GetString() const
Float_t GetProcTime() const
R__EXTERN TSystem * gSystem
TObject * Remove(TObject *key)
Remove the (key,value) pair with key from the map.
virtual Int_t GetValue(const char *name, Int_t dflt)
Returns the integer value for a resource.
virtual const char * ClassName() const
Returns name of class to which the object belongs.
virtual TObject * Remove(TObject *obj)
Remove object from the list.
Int_t CalculatePacketSize(TObject *slstat, Long64_t cachesz, Int_t learnent)
The result depends on the fStrategy.
TFileNode * NextActiveNode()
Get next active node.
const char * GetHostFQDN() const
Return fully qualified domain name of url host.
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
Bool_t TestBit(UInt_t f) const
char * Form(const char *fmt,...)
TObject * GetEntryList() const
Int_t ReassignPacket(TDSetElement *e, TList **listOfMissingFiles)
The file in the listOfMissingFiles can appear several times; in order to fix that, a TDSetElement::Merge method is needed.
A TEventList object is a list of selected events (entries) in a TTree.
virtual const char * GetName() const
Returns name of object.
Double_t GetProcTime() const
Long64_t GetReadCalls() const
void Reset(Detail::TBranchProxy *x)
Long64_t GetEntries() const
Int_t MergeElement(TDSetElement *elem)
Check if 'elem' is overlapping or subsequent and, if the case, return a merged element.
virtual void Activate(TSocket *sock)
Activate a de-activated socket.
TObjArray * Tokenize(const TString &delim) const
This function is used to isolate sequential tokens in a TString.
void DeleteValues()
Remove all (key,value) pairs from the map AND delete the values when they are allocated on the heap...
const char * GetUrl(Bool_t withDeflt=kFALSE) const
Return full URL.
void InitStats()
(re)initialise the statistics called at the begining or after a worker dies.
virtual void Print(Option_t *option="") const
Default print for collections, calls Print(option, 1).
virtual TObject * Last() const
Return the last object in the list. Returns 0 when list is empty.
const char * GetDataSet() const
void SetHost(const char *host)
Float_t fFractionOfRemoteFiles
R__EXTERN TProof * gProof
virtual Int_t GetSize() const
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
virtual const char * GetName() const
Returns name of object.
virtual const char * HostName()
Return the system's host name.
TMap implements an associative array of (key,value) pairs using a THashTable for efficient retrieval ...
ClassImp(TPacketizerAdaptive) TPacketizerAdaptive
Constructor.
TFileStat * GetNextUnAlloc(TFileNode *node=0, const char *nodeHostName=0)
Get next unallocated file from 'node' or other nodes: First try 'node'.
TSortedList * fFilesToProcess
Float_t GetCurrentRate(Bool_t &all)
Get Estimation of the current rate; just summing the current rates of the active workers.
void SetNum(Long64_t num)
virtual void Clear(Option_t *option="")
Remove all objects from the list.
Int_t GetProtocol() const
virtual Long64_t GetN() const
Mother of all ROOT objects.
virtual TObject * First() const
Return the first object in the list. Returns 0 when list is empty.
R__EXTERN TProofServ * gProofServ
virtual void Add(TObject *obj)
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
void RemoveActiveNode(TFileNode *)
Remove node from the list of actives.
virtual Int_t Compare(const TObject *obj) const
Compare abstract method.
ClassImp(TSlaveInfo) Int_t TSlaveInfo const TSlaveInfo * si
Used to sort slaveinfos by ordinal.
const char * GetType() const
void RemoveUnAllocNode(TFileNode *)
Remove unallocated node.
Class describing a generic file including meta information.
TProofProgressStatus * GetProgressStatus()
void SetFirst(Long64_t first)
virtual void SetTitle(const char *title="")
Change (i.e. set) the title of the TNamed.
A List of entry numbers in a TTree or TChain.
virtual TProofProgressStatus * AddProcessed(TProofProgressStatus *st)=0
const char * GetFile() const
const char * GetDirectory() const
Return directory where to look for object.
Long64_t GetEntriesProcessed() const
void RemoveActive(TFileStat *file)
Remove file from the list of actives.
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.