75class TPacketizerAdaptive::TFileStat : 
public TObject {
 
   86   Bool_t         IsDone()
 const {
return fIsDone;}
 
   88   void           SetDone() {fIsDone = 
kTRUE;}
 
   89   TFileNode     *GetNode()
 const {
return fNode;}
 
   91   Long64_t       GetNextEntry()
 const {
return fNextEntry;}
 
   92   void           MoveNextEntry(
Long64_t step) {fNextEntry += step;}
 
   99      const TFileStat *fst = 
dynamic_cast<const TFileStat*
>(obj);
 
  100      if (fst && GetElement() && fst->GetElement()) {
 
  101         Long64_t ent = GetElement()->GetNum();
 
  102         Long64_t entfst = fst->GetElement()->GetNum();
 
  103         if (ent > 0 && entfst > 0) {
 
  106            } 
else if (ent < entfst) {
 
  118      Printf(
"TFileStat: %s %lld", fElement ? fElement->
GetName() : 
"---",
 
  119                                   fElement ? fElement->
GetNum() : -1);
 
  123TPacketizerAdaptive::TFileStat::TFileStat(TFileNode *node, 
TDSetElement *elem, 
TList *files)
 
  124   : fIsDone(
kFALSE), fNode(node), fElement(elem), fNextEntry(elem->GetFirst())
 
  127   if (files) files->
Add(
this);
 
  133class TPacketizerAdaptive::TFileNode : 
public TObject {
 
  156   ~TFileNode() { 
delete fFiles; 
delete fActFiles; }
 
  158   void        IncMySlaveCnt() { fMySlaveCnt++; }
 
  159   Int_t       GetMySlaveCnt()
 const { 
return fMySlaveCnt; }
 
  160   void        IncExtSlaveCnt(
const char *slave) { 
if (fNodeName != slave) fExtSlaveCnt++; }
 
  161   void        DecExtSlaveCnt(
const char *slave) { 
if (fNodeName != slave) fExtSlaveCnt--; 
R__ASSERT(fExtSlaveCnt >= 0); }
 
  162   Int_t       GetSlaveCnt()
 const { 
return fMySlaveCnt + fExtSlaveCnt; }
 
  163   void        IncRunSlaveCnt() { fRunSlaveCnt++; }
 
  164   void        DecRunSlaveCnt() { fRunSlaveCnt--; 
R__ASSERT(fRunSlaveCnt >= 0); }
 
  165   Int_t       GetRunSlaveCnt()
 const { 
return fRunSlaveCnt; }
 
  166   Int_t       GetExtSlaveCnt()
 const { 
return fExtSlaveCnt; }
 
  167   Int_t       GetNumberOfActiveFiles()
 const { 
return fActFiles->
GetSize(); }
 
  171                  { fProcessed += nEvents; }
 
  172   Long64_t    GetProcessed()
 const { 
return fProcessed; }
 
  173   void        DecreaseProcessed(
Long64_t nEvents) { fProcessed -= nEvents; }
 
  176   Long64_t    GetEventsLeftPerSlave()
 const 
  177      { 
return ((fEvents - fProcessed)/(fRunSlaveCnt + 1)); }
 
  178   void        IncEvents(
Long64_t nEvents) { fEvents += nEvents; }
 
  179   const char *
GetName()
 const { 
return fNodeName.
Data(); }
 
  180   Long64_t    GetNEvents()
 const { 
return fEvents; }
 
  187      Printf(
"++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
 
  188      Printf(
"+++ TFileNode: %s +++", fNodeName.
Data());
 
  189      Printf(
"+++ Evts: %lld (total: %lld) ", fProcessed, fEvents);
 
  190      Printf(
"+++ Worker count: int:%d, ext: %d, tot:%d ", fMySlaveCnt, fExtSlaveCnt, fRunSlaveCnt);
 
  192      if (fFiles && fFiles->
GetSize() > 0) {
 
  194         while ((fs = (TFileStat *) nxf())) {
 
  195            if ((
e = fs->GetElement())) {
 
  196               Printf(
"+++  #%d: %s  %lld - %lld (%lld) - next: %lld ", ++nn, 
e->GetName(),
 
  197                     e->GetFirst(), 
e->GetFirst() + 
e->GetNum() - 1, 
e->GetNum(), fs->GetNextEntry());
 
  199               Printf(
"+++  #%d: no element! ", ++nn);
 
  203      Printf(
"+++ Active files: %d ", fActFiles ? fActFiles->
GetSize() : 0);
 
  204      if (fActFiles && fActFiles->
GetSize() > 0) {
 
  205         TIter nxaf(fActFiles);
 
  206         while ((fs = (TFileStat *) nxaf())) {
 
  207            if ((
e = fs->GetElement())) {
 
  208               Printf(
"+++  #%d: %s  %lld - %lld (%lld) - next: %lld", ++nn, 
e->GetName(),
 
  209                      e->GetFirst(), 
e->GetFirst() + 
e->GetNum() - 1, 
e->GetNum(), fs->GetNextEntry());
 
  211               Printf(
"+++  #%d: no element! ", ++nn);
 
  215      Printf(
"++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
 
  221      TFileStat *
f = 
new TFileStat(
this, elem, files);
 
  223      if (fUnAllocFileNext == 0) fUnAllocFileNext = fFiles->
First();
 
  228      TObject *next = fUnAllocFileNext;
 
  232         fActFiles->
Add(next);
 
  233         if (fActFileNext == 0) fActFileNext = fActFiles->
First();
 
  236         fUnAllocFileNext = fFiles->
After(fUnAllocFileNext);
 
  238      return (TFileStat *) next;
 
  245      if (fActFileNext != 0) {
 
  246         fActFileNext = fActFiles->
After(fActFileNext);
 
  247         if (fActFileNext == 0) fActFileNext = fActFiles->
First();
 
  250      return (TFileStat *) next;
 
  255      if (fActFileNext == 
file) fActFileNext = fActFiles->
After(
file);
 
  258      if (fActFileNext == 0) fActFileNext = fActFiles->
First();
 
  269      const TFileNode *obj = 
dynamic_cast<const TFileNode*
>(other);
 
  271         Error(
"Compare", 
"input is not a TPacketizer::TFileNode object");
 
  279         Int_t myVal = GetRunSlaveCnt();
 
  280         Int_t otherVal = obj->GetRunSlaveCnt();
 
  281         if (myVal < otherVal) {
 
  283         } 
else if (myVal > otherVal) {
 
  287            if ((fEvents - fProcessed) >
 
  288                (obj->GetNEvents() - obj->GetProcessed())) {
 
  295         Int_t myVal = GetSlaveCnt();
 
  296         Int_t otherVal = obj->GetSlaveCnt();
 
  297         if (myVal < otherVal) {
 
  299         } 
else if (myVal > otherVal) {
 
  309      fUnAllocFileNext = fFiles->
First();
 
  320   : fNodeName(
name), fFiles(new 
TList), fUnAllocFileNext(0),
 
  321     fActFiles(new 
TList), fActFileNext(0), fMySlaveCnt(0),
 
  322     fExtSlaveCnt(0), fRunSlaveCnt(0), fProcessed(0), fEvents(0),
 
  323     fStrategy(strategy), fFilesToProcess(files)
 
  338   TFileNode     *fFileNode;     
 
  346   TSlaveStat(
TSlave *slave);
 
  348   TFileNode  *GetFileNode()
 const { 
return fFileNode; }
 
  351   TFileStat  *GetCurFile() { 
return fCurFile; }
 
  352   void        SetFileNode(TFileNode *node) { fFileNode = node; }
 
  356      return (fCurProcTime?fCurProcessed/fCurProcTime:0); }
 
  357   Int_t       GetLocalEventsLeft() {
 
  358      return fFileNode?(fFileNode->GetEventsLeftPerSlave()):0; }
 
  359   TList      *GetProcessedSubSet() { 
return fDSubSet; }
 
  367TPacketizerAdaptive::TSlaveStat::TSlaveStat(
TSlave *slave)
 
  368   : fFileNode(0), fCurFile(0), fCurElem(0),
 
  369     fCurProcessed(0), fCurProcTime(0)
 
  371   fDSubSet = 
new TList();
 
  378   if (strcmp(slave->
ClassName(), 
"TSlaveLite")) {
 
  381      if (fWrkFQDN.Contains(
"localhost") || fWrkFQDN == 
"127.0.0.1")
 
  385      Info("TSlaveStat", "wrk FQDN: %
s", fWrkFQDN.Data());
 
  403      Error(
"UpdateRates", 
"no status object!");
 
  406   if (fCurFile->IsDone()) {
 
  426   if (st && fDSubSet && fCurElem) {
 
  429      fDSubSet->
Add(fCurElem);
 
  433      Error(
"AddProcessed", 
"processed subset of current elem undefined");
 
  450   PDB(kPacketizer,1) 
Info(
"TPacketizerAdaptive",
 
  451                           "enter (first %lld, num %lld)", 
first, num);
 
  469      Error(
"TPacketizerAdaptive", 
"No progress status");
 
  477      cpsync = 
gEnv->
GetValue(
"Packetizer.CachePacketSync", 1);
 
  497      Info(
"TPacketizerAdaptive", 
"using the basic strategy of TPacketizer");
 
  498   } 
else if (strategy != 1) {
 
  499      Warning(
"TPacketizerAdaptive", 
"unsupported strategy index (%d): ignore", strategy);
 
  504      if (maxSlaveCnt < 0) {
 
  505         Info(
"TPacketizerAdaptive",
 
  506              "The value of PROOF_MaxSlavesPerNode must be positive");
 
  514            Info(
"TPacketizerAdaptive",
 
  515                 "The value of PROOF_MaxSlavesPerNode must be positive");
 
  518         maxSlaveCnt = (
Long_t) mxslcnt;
 
  523      maxSlaveCnt = 
gEnv->
GetValue(
"Packetizer.MaxWorkersPerNode", 0);
 
  524   if (maxSlaveCnt > 0) {
 
  526      Info(
"TPacketizerAdaptive", 
"Setting max number of workers per node to %ld",
 
  536   Int_t forceLocal = 0;
 
  541         Info(
"TPacketizerAdaptive",
 
  542            "The only accepted value of PROOF_ForceLocal parameter is 1 !");
 
  551   Int_t packetAsAFraction = 0;
 
  553      if (packetAsAFraction > 0) {
 
  555         Info(
"TPacketizerAdaptive",
 
  556              "using alternate fraction of query time as a packet size: %d",
 
  559         Info(
"TPacketizerAdaptive", 
"packetAsAFraction parameter must be higher than 0");
 
  564   Int_t tryReassign = 0;
 
  566      tryReassign = 
gEnv->
GetValue(
"Packetizer.TryReassign", 0);
 
  569      Info(
"TPacketizerAdaptive", 
"failed packets will be re-assigned");
 
  601      partitionsStr = 
gEnv->
GetValue(
"Packetizer.Partitions", 
"");
 
  602   if (!partitionsStr.
IsNull()) {
 
  603      Info(
"TPacketizerAdaptive", 
"Partitions: %s", partitionsStr.
Data());
 
  604      partitions = partitionsStr.
Tokenize(
",");
 
  612      if (
e->GetValid()) 
continue;
 
  618      TUrl url = 
e->GetFileName();
 
  620         Info(
"TPacketizerAdaptive", 
"element name: %s (url: %s)", 
e->GetFileName(), url.
GetUrl());
 
  636      if (host.
Contains(
"localhost") || host == 
"127.0.0.1") {
 
  644         TIter iString(partitions);
 
  666            Info(
"TPacketizerAdaptive", 
"creating new node '%s' or the element", nodeStr.
Data());
 
  669            Info(
"TPacketizerAdaptive", 
"adding element to existing node '%s'", nodeStr.
Data());
 
  689   Int_t validateMode = 0;
 
  694         Info(
"TPacketizerAdaptive",
 
  695              "processing subset of entries: validating by file? %s", byfile ? 
"yes": 
"no");
 
  710      Info(
"TPacketizerAdaptive",
 
  711           "processing range: first %lld, num %lld", 
first, num);
 
  719      if (!
e->GetValid()) 
continue;
 
  721      TUrl url = 
e->GetFileName();
 
  725         Info(
"TPacketizerAdaptive", 
"processing element '%s'", 
e->GetFileName());
 
  727         Info(
"TPacketizerAdaptive",
 
  728              " --> first %lld, elenum %lld (cur %lld) (entrylist: %p)", eFirst, eNum, cur, 
e->GetEntryList());
 
  730      if (!
e->GetEntryList()) {
 
  732         if (cur + eNum < 
first) {
 
  735               Info(
"TPacketizerAdaptive", 
" --> skip element cur %lld", cur);
 
  740         if (num != -1 && (
first+num <= cur)) {
 
  743               Info(
"TPacketizerAdaptive", 
" --> drop element cur %lld", cur);
 
  748         if (cur <= 
first || (num != -1 && (
first+num <= cur+eNum))) {
 
  753               e->SetFirst( eFirst + (
first - cur) );
 
  754               e->SetNum( 
e->GetNum() - (
first - cur) );
 
  756                  Info(
"TPacketizerAdaptive", 
" --> adjust start %lld and end %lld",
 
  760            if (num != -1 && (
first+num <= cur+eNum)) {
 
  763               e->SetNum( 
first + num - 
e->GetFirst() - cur );
 
  765                  Info(
"TPacketizerAdaptive", 
" --> adjust end %lld", 
first + num - cur);
 
  772               Info(
"TPacketizerAdaptive", 
" --> increment 'cur' by %lld", eNum);
 
  786               Info(
"TPacketizerAdaptive", 
" --> entry-list element: %lld entries", eNum);
 
  789            eNum = evl ? evl->
GetN() : eNum;
 
  791               Info(
"TPacketizerAdaptive", 
" --> event-list element: %lld entries (evl:%p)", eNum, evl);
 
  795               Info(
"TPacketizerAdaptive", 
" --> empty entry- or event-list element!");
 
  800         Info(
"TPacketizerAdaptive", 
" --> next cur %lld", cur);
 
  816      if (host.
Contains(
"localhost") || host == 
"127.0.0.1") {
 
  824         TIter iString(partitions);
 
  847            Info(
"TPacketizerAdaptive", 
" --> creating new node '%s' for element", nodeStr.
Data());
 
  850            Info(
"TPacketizerAdaptive", 
" --> adding element to exiting node '%s'", nodeStr.
Data());
 
  856      node->IncEvents(eNum);
 
  857      PDB(kPacketizer,2) 
e->Print(
"a");
 
  860      Info(
"TPacketizerAdaptive", 
"processing %lld entries in %d files on %d hosts",
 
  874   PDB(kPacketizer,1) 
Info(
"TPacketizerAdaptive", 
"return");
 
  901   Int_t noRemoteFiles = 0;
 
  903   Int_t totalNumberOfFiles = 0;
 
  905   while (TFileNode *fn = (TFileNode*)next()) {
 
  906      totalNumberOfFiles += fn->GetNumberOfFiles();
 
  907      if (fn->GetMySlaveCnt() == 0) {
 
  908         noRemoteFiles += fn->GetNumberOfFiles();
 
  913   if (totalNumberOfFiles == 0) {
 
  914      Info(
"InitStats", 
"no valid or non-empty file found: setting invalid");
 
  927   PDB(kPacketizer,1) 
Info(
"InitStats", 
"return");
 
  941         Info(
"GetNextUnAlloc", 
"looking for file on node %s", node->GetName());
 
  942      file = node->GetNextUnAlloc();
 
  945      if (nodeHostName && strlen(nodeHostName) > 0) {
 
  956               TUrl uu(fn->GetName());
 
  958                  Info(
"GetNextUnAlloc", 
"comparing %s with %s...", nodeHostName, uu.
GetHost());
 
  961               if (!strcmp(nodeHostName, uu.
GetHost())) {
 
  965                  if ((
file = node->GetNextUnAlloc()) == 0) {
 
  970                        Info(
"GetNextUnAlloc", 
"found! (host: %s)", uu.
GetHost());
 
  975               Warning(
"GetNextUnAlloc", 
"unallocate entry %d is empty!", i);
 
  982               Info(
"GetNextUnAlloc", 
"reached Workers-per-Node Limit (%ld)", 
fMaxSlaveCnt);
 
  990               Info(
"GetNextUnAlloc", 
"looking for file on node %s", node->GetName());
 
 1003   PDB(kPacketizer, 2) {
 
 1005         Info(
"GetNextUnAlloc", 
"no file found!");
 
 1021   PDB(kPacketizer,2) {
 
 1050   TFileStat *
file = 0;
 
 1053         file = node->GetNextActive();
 
 1067   PDB(kPacketizer,2) {
 
 1068      Info(
"NextActiveNode", 
"enter");
 
 1076         Info(
"NextActiveNode",
"reached Workers-per-Node limit (%ld)", 
fMaxSlaveCnt);
 
 1088   TFileNode *node = 
file->GetNode();
 
 1090   node->RemoveActive(
file);
 
 1114   while ((fn = (TFileNode*) files.
Next()) != 0) {
 
 1120   while ((key = slaves.
Next()) != 0) {
 
 1123         Warning(
"Reset", 
"TSlaveStat associated to key '%s' is NULL", key->
GetName());
 
 1128      TFileNode *fnmin = 0;
 
 1131      while ((fn = (TFileNode*) files.
Next()) != 0) {
 
 1132         if (!strcmp(slstat->GetName(), 
TUrl(fn->GetName()).
GetHost())) {
 
 1133            if (fn->GetMySlaveCnt() < fncnt) {
 
 1135               fncnt = fn->GetMySlaveCnt();
 
 1140         slstat->SetFileNode(fnmin);
 
 1141         fnmin->IncMySlaveCnt();
 
 1143            Info(
"Reset",
"assigning node '%s' to '%s' (cnt: %d)",
 
 1144                         fnmin->GetName(), slstat->GetName(), fnmin->GetMySlaveCnt());
 
 1146      slstat->fCurFile = 0;
 
 1157   TMap     slaves_by_sock;
 
 1169      Info(
"ValidateFiles",
"socket added to monitor: %p (%s)",
 
 1186   TString msg(
"Validating files");
 
 1203            Error(
"ValidateFiles", 
"TSlaveStat associated to slave '%s' is NULL", 
s->GetName());
 
 1207         TFileNode *node = 0;
 
 1208         TFileStat *
file = 0;
 
 1211         if ((node = slstat->GetFileNode()) != 0) {
 
 1212            PDB(kPacketizer,3) node->Print();
 
 1215               slstat->SetFileNode(0);
 
 1226            slstat->fCurFile = 
file;
 
 1229            if (entries < 0 || strlen(elem->
GetTitle()) <= 0) {
 
 1231               file->GetNode()->IncExtSlaveCnt(slstat->GetName());
 
 1238               s->GetSocket()->Send( 
m );
 
 1241                  Info(
"ValidateFiles",
 
 1242                       "sent to worker-%s (%s) via %p GETENTRIES on %s %s %s %s",
 
 1243                       s->GetOrdinal(), 
s->GetName(), 
s->GetSocket(),
 
 1254                        Error(
"ValidateFiles",
 
 1255                              "first (%lld) higher then number of entries (%lld) in %s",
 
 1258                        slstat->fCurFile->SetDone();
 
 1262                     if (elem->
GetNum() == -1) {
 
 1265                        Warning(
"ValidateFiles", 
"num (%lld) + first (%lld) larger then number of" 
 1271                        Info(
"ValidateFiles",
 
 1272                             "found elem '%s' with %lld entries", elem->
GetFileName(), entries);
 
 1290         if (byfile && maxent > 0) {
 
 1292            Long64_t nrestf = (maxent - totent) * nopenf / totent ;
 
 1293            if (nrestf <= 0 && maxent > totent) nrestf = 1;
 
 1296                  Info(
"ValidateFiles", 
"{%lld, %lld, %lld}: needs to validate %lld more files",
 
 1297                                         maxent, totent, nopenf, nrestf);
 
 1299               while ((slm = (
TSlave *) si.
Next()) && nrestf--) {
 
 1305                  Info(
"ValidateFiles", 
"no need to validate more files");
 
 1313      PDB(kPacketizer,3) {
 
 1314         Info(
"ValidateFiles", 
"waiting for %d slaves:", mon.
GetActive());
 
 1320               Info(
"ValidateFiles", 
"   worker-%s (%s)",
 
 1329         Error(
"ValidateFiles", 
"selection has been interrupted - STOP");
 
 1336      PDB(kPacketizer,3) 
Info(
"ValidateFiles", 
"select returned: %p", sock);
 
 1341         Error(
"ValidateFiles", 
"worker-%s (%s) got invalid - STOP",
 
 1343         ((
TProof*)
gProof)->MarkBad(slave, 
"socket got invalid during validation");
 
 1350      if (sock->
Recv(reply) <= 0) {
 
 1352         Error(
"ValidateFiles", 
"Recv failed! for worker-%s (%s)",
 
 1355         ((
TProof*)
gProof)->MarkBad(slave, 
"receive failed during validation");
 
 1365             Error(
"ValidateFiles", 
"kPROOF_FATAL from worker-%s (%s)",
 
 1378      slavestat->fCurFile->GetNode()->DecExtSlaveCnt(slavestat->GetName());
 
 1381      (*reply) >> entries;
 
 1386         (*reply) >> objname;
 
 1387         e->SetTitle(objname);
 
 1390      e->SetTDSetOffset(entries);
 
 1396         if (!
e->GetEntryList()) {
 
 1397            if (
e->GetFirst() > entries) {
 
 1398               Error(
"ValidateFiles",
 
 1399                     "first (%lld) higher then number of entries (%lld) in %s",
 
 1400                      e->GetFirst(), entries, 
e->GetFileName());
 
 1403               slavestat->fCurFile->SetDone();
 
 1408            if (
e->GetNum() == -1) {
 
 1409               e->SetNum(entries - 
e->GetFirst());
 
 1410            } 
else if (
e->GetFirst() + 
e->GetNum() > entries) {
 
 1411               Error(
"ValidateFiles",
 
 1412                     "num (%lld) + first (%lld) larger then number of keys/entries (%lld) in %s",
 
 1413                      e->GetNum(), 
e->GetFirst(), entries, 
e->GetFileName());
 
 1414               e->SetNum(entries - 
e->GetFirst());
 
 1428         Error(
"ValidateFiles", 
"cannot get entries for file: %s - skipping", 
e->GetFileName() );
 
 1435            m << 
TString(
Form(
"Cannot get entries for file: %s - skipping",
 
 1444      PDB(kPacketizer,3) 
Info(
"ValidateFiles", 
" %lld events validated", totent);
 
 1447      if (maxent < 0 || ((totent < maxent) && !byfile))
 
 1467   while ( (el = 
dynamic_cast<TDSetElement*
> (next())) ) {
 
 1495      TSlaveStat* slstat = (TSlaveStat*)slStatPtr;
 
 1496      Float_t rate = slstat->GetCurRate();
 
 1498         rate = slstat->GetAvgRate();
 
 1519                  if (elem) maxEntries = elem->
GetNum();
 
 1522                  PDB(kPacketizer,3) {
 
 1523                     Info(
"CalculatePacketSize", 
"%s: switching off synchronization of packet and cache sizes:", slstat->GetOrdinal());
 
 1524                     Info(
"CalculatePacketSize", 
"%s: few files (%d) remaining of very different sizes (max/avg = %.2f > %.2f)",
 
 1532         if (bevt > 0. && cachesz > 0 && cpsync) {
 
 1533            if ((
Long64_t)(rate * packetTime * bevt) < cachesz)
 
 1534               packetTime = cachesz / bevt / rate;
 
 1542         num = (
Long64_t)(rate * packetTime);
 
 1546            Info(
"CalculatePacketSize",
"%s: avgr: %f, rate: %f, left: %lld, pacT: %f, sz: %f (csz: %f), num: %lld",
 
 1548                 packetTime, ((bevt > 0) ? num*bevt/1048576. : -1.), cachesz/1048576., num);
 
 1553         num = (learnent > 0) ? 5 * learnent : 1000;
 
 1557            Info(
"CalculatePacketSize",
"%s: num: %lld", slstat->GetOrdinal(),  num);
 
 1560   if (num < 1) num = 1;
 
 1572                                        TList **listOfMissingFiles)
 
 1577      Error(
"AddProcessed", 
"%s: TSlaveStat instance for worker %s not found!",
 
 1579                            (sl ? sl->
GetName() : 
"**undef**"));
 
 1585   if ( slstat->fCurElem != 0 ) {
 
 1586      Long64_t expectedNumEv = slstat->fCurElem->GetNum();
 
 1590         numev = status->
GetEntries() - slstat->GetEntriesProcessed();
 
 1598         progress = slstat->AddProcessed(status);
 
 1600            (*fProgressStatus) += *progress;
 
 1602            slstat->UpdateRates(status);
 
 1609            Info(
"AddProcessed", 
"%s: %s: %lld %7.3lf %7.3lf %7.3lf %lld",
 
 1615                                    slstat->fCurElem->GetFileName(),
 
 1623      if (numev != expectedNumEv) {
 
 1629         if (newPacket && numev < expectedNumEv) {
 
 1635            Error(
"AddProcessed", 
"%s: processed too much? (%lld, %lld)",
 
 1648      slstat->fCurElem = 0;
 
 1649      return (expectedNumEv - numev);
 
 1676      Error(
"GetNextPacket", 
"TSlaveStat instance for worker %s not found!",
 
 1677                            (sl ? sl->
GetName() : 
"**undef**"));
 
 1682   TFileStat *
file = slstat->fCurFile;
 
 1688   Int_t learnent = -1;
 
 1689   if ( slstat->fCurElem != 0 ) {
 
 1692      Double_t latency, proctime, proccpu;
 
 1702            (*r) >> cachesz >> learnent;
 
 1703            if (
r->BufferSize() > 
r->Length()) (*r) >> restEntries;
 
 1712         (*r) >> latency >> proctime >> proccpu;
 
 1714         if (
r->BufferSize() > 
r->Length()) (*r) >> bytesRead;
 
 1715         if (
r->BufferSize() > 
r->Length()) (*r) >> restEntries;
 
 1717         if (
r->BufferSize() > 
r->Length()) (*r) >> totev;
 
 1723      if (!fileNotOpen && !fileCorrupted) {
 
 1725            Error(
"GetNextPacket", 
"%s: the worker processed a different # of entries", sl->
GetOrdinal());
 
 1728               Error(
"GetNextPacket", 
"%s: processed too many entries! (%lld, %lld)",
 
 1736            if (
file->GetElement()) {
 
 1737               if (fileCorrupted) {
 
 1738                  Info(
"GetNextPacket", 
"%s: file '%s' turned corrupted: invalidating file (%lld)",
 
 1742                     Info(
"GetNextPacket", 
"%s: %d entries un-processed", sl->
GetOrdinal(), nunproc);
 
 1747                     num = 
file->GetElement()->GetEntries() + restEntries;
 
 1752                     num = restEntries + rest;
 
 1754                  file->GetElement()->SetEntries(num);
 
 1756                     Info(
"GetNextPacket", 
"%s: removed file: %s, entries left: %lld", sl->
GetOrdinal(),
 
 1757                                           file->GetElement()->GetName(), 
file->GetElement()->GetEntries());
 
 1761                  Info(
"GetNextPacket", 
"%s: file '%s' could not be open: invalidating related element",
 
 1765               file->GetElement()->Invalidate();
 
 1775            Info(
"GetNextPacket", 
"%s: error raised by worker, but TFileStat object invalid:" 
 1780      firstPacket = 
kTRUE;
 
 1789   if (
file != 0) nodeName = 
file->GetNode()->GetName();
 
 1790   TString nodeHostName(slstat->GetName());
 
 1793      Info(
"GetNextPacket", 
"%s: entries processed: %lld - looking for a packet from node '%s'",
 
 1797   if ( 
file != 0 && 
file->IsDone() ) {
 
 1798      file->GetNode()->DecExtSlaveCnt(slstat->GetName());
 
 1799      file->GetNode()->DecRunSlaveCnt();
 
 1806   slstat->fCurFile = 
file;
 
 1819      if ( slstat->GetFileNode() != 0 ) {
 
 1823         Bool_t nonLocalNodePossible;
 
 1825            nonLocalNodePossible = 0;
 
 1827            nonLocalNodePossible = firstNonLocalNode ?
 
 1830         openLocal = !nonLocalNodePossible;
 
 1831         Float_t slaveRate = slstat->GetAvgRate();
 
 1832         if ( nonLocalNodePossible && 
fStrategy == 1) {
 
 1834            if ( slstat->GetFileNode()->GetRunSlaveCnt() >
 
 1835                 slstat->GetFileNode()->GetMySlaveCnt() - 1 )
 
 1839            else if ( slaveRate == 0 ) { 
 
 1842               if ( slstat->GetLocalEventsLeft() * localPreference
 
 1843                   > (avgEventsLeftPerSlave))
 
 1845               else if ( (firstNonLocalNode->GetEventsLeftPerSlave())
 
 1846                     < slstat->GetLocalEventsLeft() * localPreference )
 
 1848               else if ( firstNonLocalNode->GetExtSlaveCnt() > 1 )
 
 1850               else if ( firstNonLocalNode->GetRunSlaveCnt() == 0 )
 
 1854               Float_t slaveTime = slstat->GetLocalEventsLeft()/slaveRate;
 
 1856               Float_t avgTime = avgEventsLeftPerSlave
 
 1858               if (slaveTime * localPreference > avgTime)
 
 1860               else if ((firstNonLocalNode->GetEventsLeftPerSlave())
 
 1861                        < slstat->GetLocalEventsLeft() * localPreference)
 
 1867            file = slstat->GetFileNode()->GetNextUnAlloc();
 
 1869               file = slstat->GetFileNode()->GetNextActive();
 
 1872               slstat->SetFileNode(0);
 
 1885      if (
file == 0) 
return 0;
 
 1889      slstat->fCurFile = 
file;
 
 1891      if (
file->GetNode()->GetMySlaveCnt() == 0 &&
 
 1892         file->GetElement()->GetFirst() == 
file->GetNextEntry()) {
 
 1895            Error(
"GetNextPacket",
 
 1896                  "inconsistent value for fNEventsOnRemLoc (%lld): stop delivering packets!",
 
 1901      file->GetNode()->IncExtSlaveCnt(slstat->GetName());
 
 1902      file->GetNode()->IncRunSlaveCnt();
 
 1905                               file->GetNode()->GetName(),
 
 1906                               file->GetElement()->GetFileName(), 
kTRUE);
 
 1920   if ( 
first + num * 1.5 >= last ) {
 
 1928   file->MoveNextEntry(num);
 
 1943   return slstat->fCurElem;
 
 1954   while ((key = nxw())) {
 
 1956      if (wrkstat && wrkstat->fCurFile) actw++;
 
 1974      while ((key = nxw()) != 0) {
 
 1976         if (slstat && slstat->GetProgressStatus() && slstat->GetEntriesProcessed() > 0) {
 
 1978            currate += slstat->GetProgressStatus()->GetCurrentRate();
 
 2020      while ((key = nxw()) != 0) {
 
 2024            Long64_t e = slstat->GetEntriesProcessed();
 
 2027            dt = now - slstat->GetProgressStatus()->GetLastUpdate();
 
 2029            Float_t rate = (current && slstat->GetCurRate() > 0) ? slstat->GetCurRate()
 
 2030                                                                 : slstat->GetAvgRate();
 
 2038               Info(
"GetEstEntriesProcessed",
"%s: e:%lld rate:%f dt:%f e:%lld",
 
 2039                                          slstat->fSlave->GetOrdinal(),
 
 2040                                          slstat->GetEntriesProcessed(), rate, dt, 
e);
 
 2047      Info(
"GetEstEntriesProcessed",
 
 2048           "dt: %f, estimated entries: %lld (%lld), bytes read: %lld rate: %f (all: %d)",
 
 2057   return ((all) ? 0 : 1);
 
 2071                                  TList **listOfMissingFiles)
 
 2075      Error(
"MarkBad", 
"Worker does not exist");
 
 2079   if (slaveStat->fCurFile && slaveStat->fCurFile->GetNode()) {
 
 2080      slaveStat->fCurFile->GetNode()->DecExtSlaveCnt(slaveStat->GetName());
 
 2081      slaveStat->fCurFile->GetNode()->DecRunSlaveCnt();
 
 2088      TList *subSet = slaveStat->GetProcessedSubSet();
 
 2091         if (slaveStat->fCurElem) {
 
 2092            subSet->
Add(slaveStat->fCurElem);
 
 2095         Int_t nmg = 0, ntries = 100;
 
 2101               if (
e->MergeElement(enxt) >= 0) {
 
 2109         } 
while (nmg > 0 && --ntries > 0);
 
 2115         Warning(
"MarkBad", 
"subset processed by bad worker not found!");
 
 2117      (*fProgressStatus) -= *(slaveStat->GetProgressStatus());
 
 2131                                          TList **listOfMissingFiles)
 
 2134      Error(
"ReassignPacket", 
"empty packet!");
 
 2138   TUrl url = 
e->GetFileName();
 
 2155      node->DecreaseProcessed(
e->GetNum());
 
 2164      if (listOfMissingFiles && *listOfMissingFiles)
 
 2165         (*listOfMissingFiles)->Add((
TObject *)fi);
 
 2176                                       TList **listOfMissingFiles)
 
 2179      Error(
"SplitPerHost", 
"Empty list of packets!");
 
 2182   if (elements->
GetSize() <= 0) {
 
 2183      Error(
"SplitPerHost", 
"The input list contains no elements");
 
 2186   TIter subSetIter(elements);
 
 2192            Error(
"SplitPerHost", 
"Error removing a missing file");
 
void Info(const char *location, const char *msgfmt,...)
void Error(const char *location, const char *msgfmt,...)
R__EXTERN TProofServ * gProofServ
R__EXTERN TProof * gProof
char * Form(const char *fmt,...)
R__EXTERN TSystem * gSystem
virtual void Print(Option_t *option="") const
Default print for collections, calls Print(option, 1).
virtual void AddAll(const TCollection *col)
Add all objects from collection col to this collection.
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
virtual Int_t GetSize() const
Return the capacity of the collection, i.e.
Manages an element of a TDSet.
Long64_t GetEntries(Bool_t istree=kTRUE, Bool_t openfile=kTRUE)
Returns number of entries in tree or objects in file.
const char * GetObjName() const
TObject * GetEntryList() const
void SetFirst(Long64_t first)
void SetTDSetOffset(Long64_t offset)
void SetNum(Long64_t num)
const char * GetDirectory() const
Return directory where to look for object.
Long64_t GetTDSetOffset() const
const char * GetFileName() const
Long64_t GetFirst() const
This class implements a data set to be used for PROOF processing.
virtual TDSetElement * Next(Long64_t totalEntries=-1)
Returns next TDSetElement.
virtual void Reset()
Reset or initialize access to the elements.
const char * GetType() const
TList * GetListOfElements() const
A List of entry numbers in a TTree or TChain.
virtual Long64_t GetN() const
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
A TEventList object is a list of selected events (entries) in a TTree.
virtual Int_t GetN() const
Class describing a generic file including meta information.
virtual void Add(TObject *obj)
virtual TObject * After(const TObject *obj) const
Returns the object after object obj.
virtual TObject * Remove(TObject *obj)
Remove object from the list.
virtual TObject * FindObject(const char *name) const
Delete a TObjLink object.
virtual TObject * At(Int_t idx) const
Returns the object at position idx. Returns 0 if idx is out of range.
virtual TObject * Last() const
Return the last object in the list. Returns 0 when list is empty.
virtual TObject * First() const
Return the first object in the list. Returns 0 when list is empty.
virtual void Clear(Option_t *option="")
Remove all objects from the list.
virtual void Sort(Bool_t order=kSortAscending)
Sort linked list.
TMap implements an associative array of (key,value) pairs using a THashTable for efficient retrieval ...
void Add(TObject *obj)
This function may not be used (but we need to provide it since it is a pure virtual in TCollection).
void DeleteValues()
Remove all (key,value) pairs from the map AND delete the values when they are allocated on the heap.
TObject * GetValue(const char *keyname) const
Returns a pointer to the value associated with keyname as name of the key.
TObject * Remove(TObject *key)
Remove the (key,value) pair with key from the map.
TSocket * Select()
Return pointer to socket for which an event is waiting.
virtual void Activate(TSocket *sock)
Activate a de-activated socket.
virtual void Add(TSocket *sock, Int_t interest=kRead)
Add socket to the monitor's active list.
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
virtual void DeActivateAll()
De-activate all activated sockets.
virtual void DeActivate(TSocket *sock)
De-activate a socket.
TList * GetListOfActives() const
Returns a list with all active sockets.
virtual const char * GetTitle() const
Returns title of object.
virtual const char * GetName() const
Returns name of object.
Collectable string class.
const char * GetName() const
Returns name of object.
const TString & GetString() const
Mother of all ROOT objects.
virtual const char * GetName() const
Returns name of object.
R__ALWAYS_INLINE Bool_t TestBit(UInt_t f) const
virtual Bool_t IsSortable() const
virtual const char * ClassName() const
Returns name of class to which the object belongs.
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
virtual Int_t Compare(const TObject *obj) const
Compare abstract method.
virtual void Print(Option_t *option="") const
This method must be overridden when a class wants to print itself.
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
This packetizer is based on TPacketizer but uses different load-balancing algorithms and data structu...
Int_t AddProcessed(TSlave *sl, TProofProgressStatus *st, Double_t latency, TList **listOfMissingFiles=0)
To be used by GetNextPacket but also in reaction to kPROOF_STOPPROCESS message (when the worker was a...
Int_t ReassignPacket(TDSetElement *e, TList **listOfMissingFiles)
The file in the listOfMissingFiles can appear several times; in order to fix that,...
void SplitPerHost(TList *elements, TList **listOfMissingFiles)
Split into per host entries The files in the listOfMissingFiles can appear several times; in order to...
virtual ~TPacketizerAdaptive()
Destructor.
Double_t fMaxEntriesRatio
TFileNode * NextNode()
Get next node which has unallocated files.
TDSetElement * GetNextPacket(TSlave *sl, TMessage *r)
Get next packet; A meaningfull difference to TPacketizer is the fact that this packetizer,...
void RemoveActive(TFileStat *file)
Remove file from the list of actives.
Int_t GetEstEntriesProcessed(Float_t, Long64_t &ent, Long64_t &bytes, Long64_t &calls)
Get estimation for the number of processed entries and bytes read at time t, based on the numbers alr...
void ValidateFiles(TDSet *dset, TList *slaves, Long64_t maxent=-1, Bool_t byfile=kFALSE)
Check existence of file/dir/tree an get number of entries.
void RemoveActiveNode(TFileNode *)
Remove node from the list of actives.
Int_t GetActiveWorkers()
Return the number of workers still processing.
void MarkBad(TSlave *s, TProofProgressStatus *status, TList **missingFiles)
This method can be called at any time during processing as an effect of handling kPROOF_STOPPROCESS I...
Int_t CalculatePacketSize(TObject *slstat, Long64_t cachesz, Int_t learnent)
The result depends on the fStrategy.
Float_t fBaseLocalPreference
Float_t fFractionOfRemoteFiles
Long64_t fNEventsOnRemLoc
TFileNode * NextActiveNode()
Get next active node.
TFileStat * GetNextUnAlloc(TFileNode *node=0, const char *nodeHostName=0)
Get next unallocated file from 'node' or other nodes: First try 'node'.
void RemoveUnAllocNode(TFileNode *)
Remove unallocated node.
void Reset()
Reset the internal data structure for packet distribution.
void InitStats()
(re)initialise the statistics called at the begining or after a worker dies.
Float_t GetCurrentRate(Bool_t &all)
Get Estimation of the current rate; just summing the current rates of the active workers.
TSortedList * fFilesToProcess
TFileStat * GetNextActive()
Get next active file.
Named parameter, streamable and storable.
Container class for processing statistics.
Double_t GetProcTime() const
Double_t GetLastUpdate() const
Long64_t GetEntries() const
void SetLastEntries(Long64_t entries)
Double_t GetCPUTime() const
Long64_t GetBytesRead() const
TSocket * GetSocket() const
This class controls a Parallel ROOT Facility, PROOF, cluster.
TObject * GetParameter(const char *par) const
Get specified parameter.
void SendDataSetStatus(const char *msg, UInt_t n, UInt_t tot, Bool_t st)
Send or notify data set status.
Class describing a PROOF worker server.
TSocket * GetSocket() const
Int_t GetProtocol() const
const char * GetName() const
Returns name of object.
const char * GetOrdinal() const
virtual Int_t Recv(TMessage *&mess)
Receive a TMessage object.
virtual Bool_t IsValid() const
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
A sorted doubly linked list.
const char * Data() const
TObjArray * Tokenize(const TString &delim) const
This function is used to isolate sequential tokens in a TString.
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
virtual TTime Now()
Get current time in milliseconds since 0:00 Jan 1 1995.
virtual const char * HostName()
Return the system's host name.
Basic time type with millisecond precision.
This class represents a WWW compatible URL.
const char * GetUrl(Bool_t withDeflt=kFALSE) const
Return full URL.
const char * GetFile() const
void SetProtocol(const char *proto, Bool_t setDefaultPort=kFALSE)
Set protocol and, optionally, change the port accordingly.
const char * GetHost() const
const char * GetHostFQDN() const
Return fully qualified domain name of url host.
void SetHost(const char *host)
const char * GetProtocol() const
Long64_t GetEntriesProcessed() const
Double_t GetProcTime() const
virtual TProofProgressStatus * AddProcessed(TProofProgressStatus *st)=0
TProofProgressStatus * fStatus
TProofProgressStatus * GetProgressStatus()
The packetizer is a load balancing object created for each query.
Float_t GetProcTime() const
TProofProgressStatus * fProgressStatus
virtual Bool_t HandleTimer(TTimer *timer)
Send progress message to client.
Long64_t GetReadCalls() const
Long64_t GetEntriesProcessed() const
Double_t GetCumProcTime() const
Long64_t GetBytesRead() const
TDSetElement * CreateNewPacket(TDSetElement *base, Long64_t first, Long64_t num)
Creates a new TDSetElement from from base packet starting from the first entry with num entries.
void Add(RHist< DIMENSIONS, PRECISION_TO, STAT_TO... > &to, const RHist< DIMENSIONS, PRECISION_FROM, STAT_FROM... > &from)
Add two histograms.
static constexpr double s