#include "TAdaptivePacketizer.h"
#include "Riostream.h"
#include "TDSet.h"
#include "TError.h"
#include "TEnv.h"
#include "TEventList.h"
#include "TMap.h"
#include "TMessage.h"
#include "TMonitor.h"
#include "TNtupleD.h"
#include "TObject.h"
#include "TParameter.h"
#include "TPerfStats.h"
#include "TProofDebug.h"
#include "TProof.h"
#include "TProofPlayer.h"
#include "TProofServ.h"
#include "TSlave.h"
#include "TSocket.h"
#include "TTimer.h"
#include "TUrl.h"
#include "TClass.h"
#include "TRandom.h"
#include "TMath.h"
#include "TObjString.h"
class TAdaptivePacketizer::TFileStat : public TObject {
private:
   Bool_t         fIsDone;       
   TFileNode     *fNode;         
   TDSetElement  *fElement;      
   Long64_t       fNextEntry;    
public:
   TFileStat(TFileNode *node, TDSetElement *elem);
   Bool_t         IsDone() const {return fIsDone;}
   void           SetDone() {fIsDone = kTRUE;}
   TFileNode     *GetNode() const {return fNode;}
   TDSetElement  *GetElement() const {return fElement;}
   Long64_t       GetNextEntry() const {return fNextEntry;}
   void           MoveNextEntry(Long64_t step) {fNextEntry += step;}
};
TAdaptivePacketizer::TFileStat::TFileStat(TFileNode *node, TDSetElement *elem)
   : fIsDone(kFALSE), fNode(node), fElement(elem), fNextEntry(elem->GetFirst())
{
}
class TAdaptivePacketizer::TFileNode : public TObject {
private:
   TString        fNodeName;        
   TList         *fFiles;           
   TObject       *fUnAllocFileNext; 
   TList         *fActFiles;        
   TObject       *fActFileNext;     
   Int_t          fMySlaveCnt;      
                                    
   Int_t          fExtSlaveCnt;     
                                    
   Int_t          fRunSlaveCnt;     
                                    
   Long64_t       fProcessed;       
   Long64_t       fEvents;          
public:
   TFileNode(const char *name);
   ~TFileNode() { delete fFiles; delete fActFiles; }
   void        IncMySlaveCnt() { fMySlaveCnt++; }
   Int_t       GetMySlaveCnt() const { return fMySlaveCnt; }
   void        IncExtSlaveCnt(const char *slave) { if (fNodeName != slave) fExtSlaveCnt++; }
   void        DecExtSlaveCnt(const char *slave) { if (fNodeName != slave) fExtSlaveCnt--; R__ASSERT(fExtSlaveCnt >= 0); }
   Int_t       GetSlaveCnt() const { return fMySlaveCnt + fExtSlaveCnt; }
   void        IncRunSlaveCnt() { fRunSlaveCnt++; }
   void        DecRunSlaveCnt() { fRunSlaveCnt--; R__ASSERT(fRunSlaveCnt >= 0); }
   Int_t       GetRunSlaveCnt() const { return fRunSlaveCnt; }
   Int_t       GetExtSlaveCnt() const { return fExtSlaveCnt; }
   Int_t       GetNumberOfActiveFiles() const { return fActFiles->GetSize(); }
   Bool_t      IsSortable() const { return kTRUE; }
   Int_t       GetNumberOfFiles() { return fFiles->GetSize(); }
   void        IncProcessed(Long64_t nEvents)
                  { fProcessed += nEvents; }
   Long64_t    GetProcessed() const { return fProcessed; }
   
   
   Long64_t    GetEventsLeftPerSlave() const
      { return ((fEvents - fProcessed)/(fRunSlaveCnt + 1)); }
   void        IncEvents(Long64_t nEvents) { fEvents += nEvents; }
   const char *GetName() const { return fNodeName.Data(); }
   Long64_t    GetNEvents() const { return fEvents; }
   void Add(TDSetElement *elem)
   {
      TFileStat *f = new TFileStat(this,elem);
      fFiles->Add(f);
      if (fUnAllocFileNext == 0) fUnAllocFileNext = fFiles->First();
   }
   TFileStat *GetNextUnAlloc()
   {
      TObject *next = fUnAllocFileNext;
      if (next != 0) {
         
         fActFiles->Add(next);
         if (fActFileNext == 0) fActFileNext = fActFiles->First();
         
         fUnAllocFileNext = fFiles->After(fUnAllocFileNext);
      }
      return (TFileStat *) next;
   }
   TFileStat *GetNextActive()
   {
      TObject *next = fActFileNext;
      if (fActFileNext != 0) {
         fActFileNext = fActFiles->After(fActFileNext);
         if (fActFileNext == 0) fActFileNext = fActFiles->First();
      }
      return (TFileStat *) next;
   }
   void RemoveActive(TFileStat *file)
   {
      if (fActFileNext == file) fActFileNext = fActFiles->After(file);
      fActFiles->Remove(file);
      if (fActFileNext == 0) fActFileNext = fActFiles->First();
   }
   Int_t Compare(const TObject *other) const
   {
      
      
      
      
      
      const TFileNode *obj = dynamic_cast<const TFileNode*>(other);
      R__ASSERT(obj != 0);
      
      if (fgNetworkFasterThanHD) {
         
         
         
         Int_t myVal = GetRunSlaveCnt();
         Int_t otherVal = obj->GetRunSlaveCnt();
         if (myVal < otherVal) {
            return -1;
         } else if (myVal > otherVal) {
            return 1;
         } else {
            
            if ((fEvents - fProcessed) >
                (obj->GetNEvents() - obj->GetProcessed())) {
               return -1;
            } else {
               return 1;
            }
         }
      } else {
         
         
         
         
         
         
         
         
         
         Long64_t diffEvents =
            GetEventsLeftPerSlave() - obj->GetEventsLeftPerSlave();
         Int_t myExtSlaves = GetExtSlaveCnt();
         Int_t otherExtSlaves = obj->GetExtSlaveCnt();
         Long64_t avEventsLeft = (GetEventsLeftPerSlave()
                                  + obj->GetEventsLeftPerSlave())/2;
         
         Int_t mySlavesProcRemote = GetSlaveCnt() - GetRunSlaveCnt();
         Int_t otherSlavesProcRemote = obj->GetSlaveCnt()
                                       - obj->GetRunSlaveCnt();
         if ( mySlavesProcRemote < otherSlavesProcRemote ) {
            if (diffEvents < -(avEventsLeft / 2)
                && obj->GetExtSlaveCnt() < TAdaptivePacketizer::fgMaxSlaveCnt)
               return 1;
            else
               return -1;
         } else if ( mySlavesProcRemote > otherSlavesProcRemote ) {
            if (diffEvents > (avEventsLeft / 2)
                && GetExtSlaveCnt() < TAdaptivePacketizer::fgMaxSlaveCnt)
               return -1;
            else
               return 1;
         } else if (myExtSlaves < otherExtSlaves) {
            if (diffEvents < -(avEventsLeft / 3)
                && obj->GetExtSlaveCnt() < TAdaptivePacketizer::fgMaxSlaveCnt)
               return 1;
            else
               return -1;
         } else if (myExtSlaves > otherExtSlaves) {
            if (diffEvents > (avEventsLeft / 3)
                && GetExtSlaveCnt() < TAdaptivePacketizer::fgMaxSlaveCnt)
               return -1;
            else
               return 1;
         } else {
            Int_t myOwnSlaves = GetMySlaveCnt();
            Int_t otherOwnSlaves = obj->GetMySlaveCnt();
            if (myOwnSlaves < otherOwnSlaves) {
               
               if (diffEvents < -(avEventsLeft / 3)
                   && obj->GetExtSlaveCnt() < TAdaptivePacketizer::fgMaxSlaveCnt)
                  return 1;
               else
                  return -1;
            } else if (myOwnSlaves > otherOwnSlaves) {
               if (diffEvents > (avEventsLeft / 3)
                   && GetExtSlaveCnt() < TAdaptivePacketizer::fgMaxSlaveCnt)
                  return -1;
               else
                  return 1;
            } else {
            if (diffEvents > 0) {
                  return -1;
               } else if (diffEvents < 0) {
                  return 1;
               } else
                  return 0;
            }
         }
      }
   }
   void Print(Option_t *) const
   {
      cout << "OBJ: " << IsA()->GetName() << "\t" << fNodeName
           << "\tMySlaveCount " << fMySlaveCnt
           << "\tSlaveCount " << fExtSlaveCnt << endl;
   }
   void Reset()
   {
      fUnAllocFileNext = fFiles->First();
      fActFiles->Clear();
      fActFileNext = 0;
      fExtSlaveCnt = 0;
      fMySlaveCnt = 0;
      fRunSlaveCnt = 0;
   }
};
TAdaptivePacketizer::TFileNode::TFileNode(const char *name)
   : fNodeName(name), fFiles(new TList), fUnAllocFileNext(0),fActFiles(new TList),
     fActFileNext(0), fMySlaveCnt(0), fExtSlaveCnt(0), fProcessed(0), fEvents(0)
{
   
   fFiles->SetOwner();
   fActFiles->SetOwner(kFALSE);
}
class TAdaptivePacketizer::TSlaveStat : public TObject {
friend class TAdaptivePacketizer;
private:
   TSlave        *fSlave;        
   TFileNode     *fFileNode;     
   TFileStat     *fCurFile;      
   TDSetElement  *fCurElem;      
   Long64_t       fProcessed;    
   Float_t        fProcTime;     
   Long64_t       fCurProcessed; 
   Float_t        fCurProcTime;  
public:
   TSlaveStat(TSlave *slave);
   TFileNode  *GetFileNode() const { return fFileNode; }
   const char *GetName() const { return fSlave->GetName(); }
   Long64_t    GetEntriesProcessed() const { return fProcessed; }
   TFileStat  *GetCurFile() { return fCurFile; }
   void        SetFileNode(TFileNode *node) { fFileNode = node; }
   void        UpdateRates(Long64_t nEvents, Float_t time);
   Float_t     GetAvgRate() { return (fProcTime?fProcessed/fProcTime:0); }
   Float_t     GetCurRate() {
      return (fCurProcTime?fCurProcessed/fCurProcTime:0); }
   Int_t       GetLocalEventsLeft() {
      return fFileNode?(fFileNode->GetEventsLeftPerSlave()):0; }
};
TAdaptivePacketizer::TSlaveStat::TSlaveStat(TSlave *slave)
   : fSlave(slave), fFileNode(0), fCurFile(0), fCurElem(0), fProcessed(0),
     fProcTime(0), fCurProcessed(0), fCurProcTime(0)
{
   
}
void TAdaptivePacketizer::TSlaveStat::UpdateRates(Long64_t nEvents,
                                                  Float_t time)
{
   
   if (fCurFile->IsDone()) {
      fCurProcTime = 0;
      fCurProcessed = 0;
   } else {
      fCurProcTime += time;
      fCurProcessed += nEvents;
   }
   fProcTime += time;
   fProcessed += nEvents;
   fCurFile->GetNode()->IncProcessed(nEvents);
}
ClassImp(TAdaptivePacketizer)
Int_t TAdaptivePacketizer::fgMaxSlaveCnt = 2;
Int_t TAdaptivePacketizer::fgNetworkFasterThanHD = 1;
TAdaptivePacketizer::TAdaptivePacketizer(TDSet *dset, TList *slaves,
                          Long64_t first, Long64_t num, TList *input)
{
   
   PDB(kPacketizer,1) Info("TAdaptivePacketizer",
                           "enter (first %lld, num %lld)", first, num);
   
   fSlaveStats = 0;
   fPackets = 0;
   fSlaveStats = 0;
   fUnAllocated = 0;
   fActive = 0;
   fFileNodes = 0;
   fProgress = 0;
   fProcessed = 0;
   fBytesRead = 0;
   fCumProcTime = 0;
   fMaxPerfIdx = 1;
   
   TTime tnow = gSystem->Now();
   fStartTime = Long_t(tnow);
   fInitTime = 0;
   fProcTime = 0;
   fTimeUpdt = -1.;
   fCircProg = new TNtupleD("CircNtuple","Circular progress info","tm:ev:mb");
   fCircN = 10;
   TProof::GetParameter(input, "PROOF_ProgressCircularity", fCircN);
   fCircProg->SetCircular(fCircN);
   Long_t maxSlaveCnt = 0;
   if (!(TProof::GetParameter(input, "PROOF_MaxSlavesPerNode", maxSlaveCnt)))
      fgMaxSlaveCnt = (Int_t) maxSlaveCnt;
   fgNetworkFasterThanHD = gEnv->GetValue("ProofServ.NetworkFasterThanHD", 1);
   if (fgNetworkFasterThanHD != 1)
      Info("TAdaptivePacketizer","fgNetworkFasterThanHD set to %d",
                                 fgNetworkFasterThanHD);
   Double_t baseLocalPreference = 1.2;
   TProof::GetParameter(input, "PROOF_BaseLocalPreference", baseLocalPreference);
   fBaseLocalPreference = (Float_t)baseLocalPreference;
   fPackets = new TList;
   fPackets->SetOwner();
   fFileNodes = new TList;
   fFileNodes->SetOwner();
   fUnAllocated = new TList;
   fUnAllocated->SetOwner(kFALSE);
   fActive = new TList;
   fActive->SetOwner(kFALSE);
   fValid = kTRUE;
   
   dset->Lookup();
   
   dset->Reset();
   TDSetElement *e;
   while ((e = (TDSetElement*)dset->Next())) {
      if (e->GetValid()) continue;
      TUrl url = e->GetFileName();
      
      TString host;
      if ( !url.IsValid() ||
          (strncmp(url.GetProtocol(),"root", 4) &&
           strncmp(url.GetProtocol(),"rfio", 4)) ) {
         host = "no-host";
      } else {
         host = url.GetHost();
      }
      TFileNode *node = (TFileNode*) fFileNodes->FindObject( host );
      if (node == 0) {
         node = new TFileNode(host);
         fFileNodes->Add(node);
      }
      node->Add( e );
   }
   fSlaveStats = new TMap;
   fSlaveStats->SetOwner(kFALSE);
   TSlave *slave;
   TIter si(slaves);
   while ((slave = (TSlave*) si.Next())) {
      fSlaveStats->Add( slave, new TSlaveStat(slave) );
      fMaxPerfIdx = slave->GetPerfIdx() > fMaxPerfIdx ?
         slave->GetPerfIdx() : fMaxPerfIdx;
   }
   Reset();                
   ValidateFiles(dset, slaves);
   if (!fValid) return;
   
   
   Int_t files = 0;
   fTotalEntries = 0;
   fUnAllocated->Clear();  
   fActive->Clear();
   fFileNodes->Clear();    
   PDB(kPacketizer,2)
      Info("TAdaptivePacketizer",
           "processing Range: First %lld, Num %lld", first, num);
   dset->Reset();
   Long64_t cur = 0;
   while (( e = (TDSetElement*)dset->Next())) {
      TUrl url = e->GetFileName();
      Long64_t eFirst = e->GetFirst();
      Long64_t eNum = e->GetNum();
      PDB(kPacketizer,2)
         Info("TAdaptivePacketizer",
              "processing element: First %lld, Num %lld (cur %lld)", eFirst, eNum, cur);
      if (!e->GetEventList()) {
         
         if (cur + eNum < first) {
            cur += eNum;
            PDB(kPacketizer,2)
               Info("TAdaptivePacketizer",
                    "processing element: skip element cur %lld", cur);
            continue;
         }
         
         if (num != -1 && (first+num <= cur)) {
            cur += eNum;
            PDB(kPacketizer,2)
               Info("TAdaptivePacketizer",
                    "processing element: drop element cur %lld", cur);
            continue; 
         }
         
         
         if (num != -1 && (first+num < cur+eNum)) {
            e->SetNum( first + num - cur );
            PDB(kPacketizer,2)
               Info("TAdaptivePacketizer",
                    "processing element: Adjust end %lld", first + num - cur);
         }
         
         
         if (cur < first) {
            e->SetFirst( eFirst + (first - cur) );
            e->SetNum( e->GetNum() - (first - cur) );
            PDB(kPacketizer,2)
               Info("TAdaptivePacketizer",
                    "processing element: Adjust start %lld and end %lld",
                    eFirst + (first - cur), first + num - cur);
         }
         cur += eNum;
      } else {
         if (e->GetEventList()->GetN() == 0)
            continue;
      }
      PDB(kPacketizer,2)
         Info("TAdaptivePacketizer",
              "processing element: next cur %lld", cur);
      
      TString host;
      if ( !url.IsValid() ||
          (strncmp(url.GetProtocol(),"root", 4) &&
           strncmp(url.GetProtocol(),"rfio", 4)) ) {
         host = "no-host";
      } else {
         host = url.GetHost();
      }
      TFileNode *node = (TFileNode*) fFileNodes->FindObject( host );
      if ( node == 0 ) {
         node = new TFileNode( host );
         fFileNodes->Add( node );
      }
      ++files;
      fTotalEntries += e->GetNum();
      node->Add( e );
      node->IncEvents(e->GetNum());
      PDB(kPacketizer,2) e->Print("a");
   }
   
   if (dset->GetEventList())
      fTotalEntries = dset->GetEventList()->GetN();
   PDB(kGlobal,1)
      Info("TAdaptivePacketizer", "processing %lld entries in %d files on %d hosts",
                                  fTotalEntries, files, fFileNodes->GetSize());
   Reset();
   
   
   Int_t noRemoteFiles = 0;
   fNEventsOnRemLoc = 0;
   Int_t totalNumberOfFiles = 0;
   TIter next(fFileNodes);
   while (TFileNode *fn = (TFileNode*)next()) {
      totalNumberOfFiles += fn->GetNumberOfFiles();
      if (fn->GetSlaveCnt() == 0) {
         noRemoteFiles += fn->GetNumberOfFiles();
         fNEventsOnRemLoc += (fn->GetNEvents() - fn->GetProcessed());
      }
   }
   if (totalNumberOfFiles == 0) {
      
      fValid = kFALSE;
      return;
   }
   fFractionOfRemoteFiles = noRemoteFiles / totalNumberOfFiles;
   Info("TAdaptivePacketizer",
        "fraction of remote files %f", fFractionOfRemoteFiles);
   if (fValid) {
      Long_t period = 500;
      TProof::GetParameter(input, "PROOF_ProgressPeriod", period);
      fProgress = new TTimer;
      fProgress->SetObject(this);
      fProgress->Start(period, kFALSE);
   }
   PDB(kPacketizer,1) Info("TAdaptivePacketizer", "return");
}
TAdaptivePacketizer::~TAdaptivePacketizer()
{
   
   if (fSlaveStats) {
      fSlaveStats->DeleteValues();
   }
   SafeDelete(fPackets);
   SafeDelete(fSlaveStats);
   SafeDelete(fUnAllocated);
   SafeDelete(fActive);
   SafeDelete(fFileNodes);
   SafeDelete(fProgress);
}
TAdaptivePacketizer::TFileStat *TAdaptivePacketizer::GetNextUnAlloc(TFileNode *node)
{
   
   
   
   TFileStat *file = 0;
   if (node != 0) {
      file = node->GetNextUnAlloc();
      if (file == 0) RemoveUnAllocNode(node);
   } else {
      while (file == 0 && ((node = NextNode()) != 0)) {
         file = node->GetNextUnAlloc();
         if (file == 0) RemoveUnAllocNode(node);
      }
   }
   if (file != 0) {
      
      if (fActive->FindObject(node) == 0) {
         fActive->Add(node);
      }
   }
   return file;
}
TAdaptivePacketizer::TFileNode *TAdaptivePacketizer::NextNode()
{
   
   
   fUnAllocated->Sort();
   PDB(kPacketizer,2) {
      fUnAllocated->Print();
   }
   TFileNode *fn = (TFileNode*) fUnAllocated->First();
   if (fn != 0 && fn->GetExtSlaveCnt() >= fgMaxSlaveCnt) {
      
      PDB(kPacketizer,1) Info("NextNode",
                              "Reached Slaves per Node Limit (%d)", fgMaxSlaveCnt);
      fn = 0;
   }
   return fn;
}
void TAdaptivePacketizer::RemoveUnAllocNode(TFileNode * node)
{
   
   fUnAllocated->Remove(node);
}
TAdaptivePacketizer::TFileStat *TAdaptivePacketizer::GetNextActive()
{
   
   TFileNode *node;
   TFileStat *file = 0;
   while (file == 0 && ((node = NextActiveNode()) != 0)) {
         file = node->GetNextActive();
         if (file == 0) RemoveActiveNode(node);
   }
   return file;
}
TAdaptivePacketizer::TFileNode *TAdaptivePacketizer::NextActiveNode()
{
   
   fActive->Sort();
   PDB(kPacketizer,2) {
      Info("NextActiveNode", "enter");
      fActive->Print();
   }
   TFileNode *fn = (TFileNode*) fActive->First();
   
   if (fn != 0 && fn->GetExtSlaveCnt() >= fgMaxSlaveCnt) {
      PDB(kPacketizer,1)
         Info("NextActiveNode","reached Workers-per-Node limit (%d)", fgMaxSlaveCnt);
      fn = 0;
   }
   return fn;
}
void TAdaptivePacketizer::RemoveActive(TFileStat *file)
{
   
   TFileNode *node = file->GetNode();
   node->RemoveActive(file);
   if (node->GetNumberOfActiveFiles() == 0) RemoveActiveNode(node);
}
void TAdaptivePacketizer::RemoveActiveNode(TFileNode *node)
{
   
   fActive->Remove(node);
}
void TAdaptivePacketizer::Reset()
{
   
   fUnAllocated->Clear();
   fUnAllocated->AddAll(fFileNodes);
   fActive->Clear();
   TIter files(fFileNodes);
   TFileNode *fn;
   while ((fn = (TFileNode*) files.Next()) != 0) {
      fn->Reset();
   }
   TIter slaves(fSlaveStats);
   TObject *key;
   while ((key = slaves.Next()) != 0) {
      TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue(key);
      TFileNode *fn = (TFileNode*) fFileNodes->FindObject(slstat->GetName());
      if (fn != 0 ) {
         slstat->SetFileNode(fn);
         fn->IncMySlaveCnt();
      }
      slstat->fCurFile = 0;
   }
}
void TAdaptivePacketizer::ValidateFiles(TDSet *dset, TList *slaves)
{
   
   
   TMap     slaves_by_sock;
   TMonitor mon;
   TList    workers;
   
   workers.AddAll(slaves);
   TIter    si(slaves);
   TSlave   *slave;
   while ((slave = (TSlave*)si.Next()) != 0) {
      PDB(kPacketizer,3) Info("ValidateFiles","socket added to monitor: %p (%s)",
          slave->GetSocket(), slave->GetName());
      mon.Add(slave->GetSocket());
      slaves_by_sock.Add(slave->GetSocket(),slave);
   }
   mon.DeActivateAll();
   ((TProof*)gProof)->DeActivateAsyncInput();
   
   ((TProof*)gProof)->fCurrentMonitor = &mon;
   
   TString msg("Validating files");
   UInt_t n = 0;
   UInt_t tot = dset->GetListOfElements()->GetSize();
   Bool_t st = kTRUE;
   while (kTRUE) {
      
      while( TSlave *s = (TSlave*)workers.First() ) {
         workers.Remove(s);
         
         TSlaveStat *slstat = (TSlaveStat*)fSlaveStats->GetValue(s);
         TFileNode *node = 0;
         TFileStat *file = 0;
         
         if ( (node = slstat->GetFileNode()) != 0 ) {
            file = GetNextUnAlloc(node);
            if ( file == 0 ) {
               slstat->SetFileNode(0);
            }
         }
         
         if (file == 0) {
            file = GetNextUnAlloc();
         }
         if ( file != 0 ) {
            
            RemoveActive(file);
            slstat->fCurFile = file;
            file->GetNode()->IncExtSlaveCnt(slstat->GetName());
            TMessage m(kPROOF_GETENTRIES);
            TDSetElement *elem = file->GetElement();
            m << dset->IsTree()
              << TString(elem->GetFileName())
              << TString(elem->GetDirectory())
              << TString(elem->GetObjName());
            s->GetSocket()->Send( m );
            mon.Activate(s->GetSocket());
            PDB(kPacketizer,2)
               Info("ValidateFiles",
                    "sent to slave-%s (%s) via %p GETENTRIES on %s %s %s %s",
                    s->GetOrdinal(), s->GetName(), s->GetSocket(),
                    dset->IsTree() ? "tree" : "objects",
                    elem->GetFileName(), elem->GetDirectory(), elem->GetObjName());
         }
      }
      if ( mon.GetActive() == 0 ) break; 
      PDB(kPacketizer,3) {
         Info("ValidateFiles", "waiting for %d slaves:", mon.GetActive());
         TList *act = mon.GetListOfActives();
         TIter next(act);
         while (TSocket *s = (TSocket*) next()) {
            TSlave *sl = (TSlave *) slaves_by_sock.GetValue(s);
            if (sl)
               Info("ValidateFiles", "   slave-%s (%s)", sl->GetOrdinal(), sl->GetName());
         }
         delete act;
      }
      TSocket *sock = mon.Select();
      mon.DeActivate(sock);
      PDB(kPacketizer,3) Info("ValidateFiles", "select returned: %p", sock);
      TSlave *slave = (TSlave *) slaves_by_sock.GetValue( sock );
      TMessage *reply;
      if ( sock->Recv(reply) <= 0 ) {
         
         ((TProof*)gProof)->MarkBad(slave);
         fValid = kFALSE;
         Error("ValidateFiles", "Recv failed! for slave-%s (%s)",
               slave->GetOrdinal(), slave->GetName());
         continue;
         }
      if ( reply->What() == kPROOF_FATAL ) {
         Error("ValidateFiles", "kPROOF_FATAL from slave-%s (%s)",
               slave->GetOrdinal(), slave->GetName());
         ((TProof*)gProof)->MarkBad(slave);
         fValid = kFALSE;
         continue;
      } else if ( reply->What() == kPROOF_LOGFILE ) {
         PDB(kPacketizer,3) Info("ValidateFiles", "got logfile");
         Int_t size;
         (*reply) >> size;
         ((TProof*)gProof)->RecvLogFile(sock, size);
         mon.Activate(sock);
         continue;
      } else if ( reply->What() == kPROOF_LOGDONE ) {
         PDB(kPacketizer,3) Info("ValidateFiles", "got logdone");
         mon.Activate(sock);
         continue;
      } else if ( reply->What() != kPROOF_GETENTRIES ) {
         
         Error("ValidateFiles", "unexpected message type (%d) from slave-%s (%s)",
               reply->What(), slave->GetOrdinal(), slave->GetName());
         ((TProof*)gProof)->MarkBad(slave);
         fValid = kFALSE;
         continue;
      }
      TSlaveStat *slavestat = (TSlaveStat*) fSlaveStats->GetValue( slave );
      TDSetElement *e = slavestat->fCurFile->GetElement();
      slavestat->fCurFile->GetNode()->DecExtSlaveCnt(slavestat->GetName());
      Long64_t entries;
      (*reply) >> entries;
      e->SetTDSetOffset(entries);
      if ( entries > 0 ) {
         if (!e->GetEventList()) {
            if ( e->GetFirst() > entries ) {
               Error("ValidateFiles", "first (%d) higher then number of entries (%d) in %d",
                     e->GetFirst(), entries, e->GetFileName() );
               
               slavestat->fCurFile->SetDone();
               fValid = kFALSE; 
            }
            if ( e->GetNum() == -1 ) {
               e->SetNum( entries - e->GetFirst() );
            } else if ( e->GetFirst() + e->GetNum() > entries ) {
               Error("ValidateFiles",
                     "Num (%d) + First (%d) larger then number of keys/entries (%d) in %s",
                     e->GetNum(), e->GetFirst(), entries, e->GetFileName() );
               e->SetNum( entries - e->GetFirst() );
            }
         }
         
         n++;
         gProof->SendDataSetStatus(msg, n, tot, st);
      } else {
         Error("ValidateFiles", "cannot get entries for %s (", e->GetFileName() );
         
         
         
         
         if (gProofServ) {
            TMessage m(kPROOF_MESSAGE);
            m << TString(Form("Cannot get entries for file: %s - skipping", e->GetFileName()));
            gProofServ->GetSocket()->Send(m);
         }
         
         if (dset->Remove(e) == -1)
            Error("ValidateFiles", "removing of not-registered element %p failed", e);
      }
      workers.Add(slave); 
   }
   
   ((TProof*)gProof)->ActivateAsyncInput();
   
   ((TProof*)gProof)->fCurrentMonitor = 0;
   
   if (!fValid)
      return;
   
   Long64_t offset = 0;
   Long64_t newOffset = 0;
   TIter next(dset->GetListOfElements());
   TDSetElement *el;
   while ( (el = dynamic_cast<TDSetElement*> (next())) ) {
      newOffset = offset + el->GetTDSetOffset();
      el->SetTDSetOffset(offset);
      offset = newOffset;
   }
   if (dset->GetEventList()) {
      SplitEventList(dset);
   }
}
Long64_t TAdaptivePacketizer::GetEntriesProcessed(TSlave *slave) const
{
   
   if ( fSlaveStats == 0 ) return 0;
   TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue( slave );
   if ( slstat == 0 ) return 0;
   return slstat->GetEntriesProcessed();
}
Int_t TAdaptivePacketizer::CalculatePacketSize(TObject *slStatPtr)
{
   
   
   TSlaveStat* slstat = (TSlaveStat*)slStatPtr;
   Long64_t num;
   Int_t packetSizeAsFraction = 4;
   Float_t rate = slstat->GetCurRate();
   if (!rate)
      rate = slstat->GetAvgRate();
   if (rate) {
      Float_t avgProcRate = (fProcessed/(fCumProcTime / fSlaveStats->GetSize()));
      Float_t packetTime;
      packetTime = ((fTotalEntries - fProcessed)/avgProcRate)/packetSizeAsFraction;
      if (packetTime < 2)
         packetTime = 2;
      num = (Long64_t)(rate * packetTime);
   } else { 
      Int_t packetSize = (fTotalEntries - fProcessed)
                         / (8 * packetSizeAsFraction * fSlaveStats->GetSize());
      num = Long64_t(packetSize *
            ((Float_t)slstat->fSlave->GetPerfIdx() / fMaxPerfIdx));
   }
   if (num < 1) num = 1;
   return num;
}
TDSetElement *TAdaptivePacketizer::GetNextPacket(TSlave *sl, TMessage *r)
{
   
   
   
   
   
   
   
   if ( !fValid ) {
      return 0;
   }
   
   TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue( sl );
   R__ASSERT( slstat != 0 );
   
   if ( slstat->fCurElem != 0 ) {
      Double_t latency, proctime, proccpu;
      Long64_t bytesRead = -1;
      Long64_t totalEntries = -1;
      Long64_t numev = slstat->fCurElem->GetNum();
      fPackets->Add(slstat->fCurElem);
      (*r) >> latency >> proctime >> proccpu;
      
      if (r->BufferSize() > r->Length()) (*r) >> bytesRead;
      if (r->BufferSize() > r->Length()) (*r) >> totalEntries;
      Long64_t totev = 0;
      if (r->BufferSize() > r->Length()) (*r) >> totev;
      
      if (totev > 0)
         numev = totev - slstat->fProcessed;
      fProcessed += ((numev > 0) ? numev : 0);
      fBytesRead += ((bytesRead > 0) ? bytesRead : 0);
      
      slstat->UpdateRates(numev, proctime);
      fCumProcTime += proctime;
      PDB(kPacketizer,2)
         Info("GetNextPacket","slave-%s (%s): %lld %7.3lf %7.3lf %7.3lf %lld",
              sl->GetOrdinal(), sl->GetName(),
              numev, latency, proctime, proccpu, bytesRead);
      if (gPerfStats != 0) {
         gPerfStats->PacketEvent(sl->GetOrdinal(), sl->GetName(), slstat->fCurElem->GetFileName(),
                                 numev, latency, proctime, proccpu, bytesRead);
      }
      slstat->fCurElem = 0;
      if ( fProcessed == fTotalEntries ) {
         HandleTimer(0);   
         delete fProgress; fProgress = 0;
      }
   }
   if ( fStop ) {
      HandleTimer(0);
      return 0;
   }
   TFileStat *file = slstat->fCurFile;
   
   if ( file != 0 && file->IsDone() ) {
      file->GetNode()->DecExtSlaveCnt(slstat->GetName());
      file->GetNode()->DecRunSlaveCnt();
      if (gPerfStats != 0) {
         gPerfStats->FileEvent(sl->GetOrdinal(), sl->GetName(), file->GetNode()->GetName(),
                               file->GetElement()->GetFileName(), kFALSE);
      }
      file = 0;
   }
   Long64_t avgEventsLeftPerSlave =
      (fTotalEntries - fProcessed) / fSlaveStats->GetSize();
   if (fTotalEntries == fProcessed)
      return 0;
   
   if ( file == 0) {
      
      Bool_t openLocal;
      
      Float_t localPreference = fBaseLocalPreference - (fNEventsOnRemLoc /
                                (0.4 *(fTotalEntries - fProcessed)));
      if ( slstat->GetFileNode() != 0 ) {
         
         fUnAllocated->Sort();
         TFileNode* firstNonLocalNode = (TFileNode*)fUnAllocated->First();
         Bool_t nonLocalNodePossible =
            firstNonLocalNode?(firstNonLocalNode->GetExtSlaveCnt() < fgMaxSlaveCnt):0;
         openLocal = !nonLocalNodePossible;
         Float_t slaveRate = slstat->GetAvgRate();
         if ( nonLocalNodePossible ) {
            
            if ( slstat->GetFileNode()->GetRunSlaveCnt() >
                 slstat->GetFileNode()->GetMySlaveCnt() - 1 )
                
                
                  openLocal = kTRUE;
            else if ( slaveRate == 0 ) { 
               
               
               if ( slstat->GetLocalEventsLeft() * localPreference > (avgEventsLeftPerSlave))
                  openLocal = kTRUE;
               else if ( (firstNonLocalNode->GetEventsLeftPerSlave())
                     < slstat->GetLocalEventsLeft() * localPreference )
                  openLocal = kTRUE;
               else if ( firstNonLocalNode->GetExtSlaveCnt() > 1 )
                  openLocal = kTRUE;
               else if ( firstNonLocalNode->GetRunSlaveCnt() == 0 )
                  openLocal = kTRUE;
            } else {
               
               Float_t slaveTime = slstat->GetLocalEventsLeft()/slaveRate;
               
               Float_t avgTime = avgEventsLeftPerSlave/(fProcessed/fCumProcTime);
               if (slaveTime * localPreference > avgTime)
                  openLocal = kTRUE;
               else if ((firstNonLocalNode->GetEventsLeftPerSlave())
                        < slstat->GetLocalEventsLeft() * localPreference)
                  openLocal = kTRUE;
            }
         }
         if (openLocal) {
            
            file = slstat->GetFileNode()->GetNextUnAlloc();
            if (!file)
               file = slstat->GetFileNode()->GetNextActive();
            if ( file == 0 ) {
               
               slstat->SetFileNode(0);
            }
         }
      }
      
      if(file == 0) {
         file = GetNextUnAlloc();
      }
      
      if(file == 0) {
         file = GetNextActive();
      }
      if ( file == 0 ) return 0;
      slstat->fCurFile = file;
      
      if (file->GetNode()->GetMySlaveCnt() == 0 &&
         file->GetElement()->GetFirst() == file->GetNextEntry()) {
         fNEventsOnRemLoc -= file->GetElement()->GetNum();
         R__ASSERT(fNEventsOnRemLoc >= 0);
      }
      file->GetNode()->IncExtSlaveCnt(slstat->GetName());
      file->GetNode()->IncRunSlaveCnt();
      if (gPerfStats != 0) {
         gPerfStats->FileEvent(sl->GetOrdinal(), sl->GetName(),
                               file->GetNode()->GetName(),
                               file->GetElement()->GetFileName(), kTRUE);
      }
   }
   Long64_t num = CalculatePacketSize(slstat);
   
   TDSetElement *base = file->GetElement();
   Long64_t first = file->GetNextEntry();
   Long64_t last = base->GetFirst() + base->GetNum();
   
   if ( first + num >= last ) {
      num = last - first;
      file->SetDone(); 
      
      RemoveActive(file);
   } else {
      file->MoveNextEntry(num);
   }
   slstat->fCurElem = CreateNewPacket(base, first, num);
   if (base->GetEventList()) {
      
      TEventList *evl = new TEventList();
      for (; num > 0; num--, first++)
         evl->Enter(base->GetEventList()->GetEntry((Int_t)first));
      slstat->fCurElem->SetEventList(evl);
   }
   return slstat->fCurElem;
}
This page has been automatically generated. If you have any comments or suggestions about the page layout send a mail to ROOT support, or contact the developers with any questions or problems regarding ROOT.