#include <fcntl.h>
#include <errno.h>
#ifdef WIN32
#   include <io.h>
#   include <sys/stat.h>
#   include <sys/types.h>
#else
#   include <unistd.h>
#endif
#include <vector>
#ifdef R__HAVE_CONFIG
#include "RConfigure.h"
#endif
#include "Riostream.h"
#include "Getline.h"
#include "TBrowser.h"
#include "TChain.h"
#include "TCondor.h"
#include "TDSet.h"
#include "TError.h"
#include "TEnv.h"
#include "TEventList.h"
#include "TFile.h"
#include "TFileInfo.h"
#include "TFTP.h"
#include "THashList.h"
#include "TInterpreter.h"
#include "TMap.h"
#include "TMessage.h"
#include "TMonitor.h"
#include "TMutex.h"
#include "TObjArray.h"
#include "TObjString.h"
#include "TParameter.h"
#include "TProof.h"
#include "TProofNodeInfo.h"
#include "TVirtualProofPlayer.h"
#include "TProofServ.h"
#include "TPluginManager.h"
#include "TQueryResult.h"
#include "TRandom.h"
#include "TRegexp.h"
#include "TROOT.h"
#include "TSemaphore.h"
#include "TSlave.h"
#include "TSocket.h"
#include "TSortedList.h"
#include "TSystem.h"
#include "TThread.h"
#include "TTree.h"
#include "TUrl.h"
TProof *gProof = 0;
TVirtualMutex *gProofMutex = 0;
TList   *TProof::fgProofEnvList = 0;  
ClassImp(TProof)
TProofThreadArg::TProofThreadArg(const char *h, Int_t po, const char *o,
                                 Int_t pe, const char *i, const char *w,
                                 TList *s, TProof *prf)
  : fOrd(o), fPerf(pe), fImage(i), fWorkdir(w),
    fSlaves(s), fProof(prf), fCslave(0), fClaims(0),
    fType(TSlave::kSlave)
{
   
   fUrl = new TUrl(Form("%s:%d",h,po));
}
TProofThreadArg::TProofThreadArg(TCondorSlave *csl, TList *clist,
                                 TList *s, TProof *prf)
  : fUrl(0), fOrd(0), fPerf(-1), fImage(0), fWorkdir(0),
    fSlaves(s), fProof(prf), fCslave(csl), fClaims(clist),
    fType(TSlave::kSlave)
{
   
   if (csl) {
      fUrl     = new TUrl(Form("%s:%d",csl->fHostname.Data(),csl->fPort));
      fImage   = csl->fImage;
      fOrd     = csl->fOrdinal;
      fWorkdir = csl->fWorkDir;
      fPerf    = csl->fPerfIdx;
   }
}
TProofThreadArg::TProofThreadArg(const char *h, Int_t po, const char *o,
                                 const char *i, const char *w, const char *m,
                                TList *s, TProof *prf)
  : fOrd(o), fPerf(-1), fImage(i), fWorkdir(w),
    fMsd(m), fSlaves(s), fProof(prf), fCslave(0), fClaims(0),
    fType(TSlave::kSlave)
{
   
   fUrl = new TUrl(Form("%s:%d",h,po));
}
Bool_t TProofInterruptHandler::Notify()
{
   
   Info("Notify","Processing interrupt signal ...");
   
   fProof->StopProcess(kTRUE);
   
   fProof->Interrupt(TProof::kLocalInterrupt);
   return kTRUE;
}
TProofInputHandler::TProofInputHandler(TProof *p, TSocket *s)
                   : TFileHandler(s->GetDescriptor(),1),
                     fSocket(s), fProof(p)
{
   
}
Bool_t TProofInputHandler::Notify()
{
   
   fProof->CollectInputFrom(fSocket);
   return kTRUE;
}
ClassImp(TSlaveInfo)
Int_t TSlaveInfo::Compare(const TObject *obj) const
{
   
   if (!obj) return 1;
   const TSlaveInfo *si = dynamic_cast<const TSlaveInfo*>(obj);
   if (!si) return fOrdinal.CompareTo(obj->GetName());
   const char *myord = GetOrdinal();
   const char *otherord = si->GetOrdinal();
   while (myord && otherord) {
      Int_t myval = atoi(myord);
      Int_t otherval = atoi(otherord);
      if (myval < otherval) return 1;
      if (myval > otherval) return -1;
      myord = strchr(myord, '.');
      if (myord) myord++;
      otherord = strchr(otherord, '.');
      if (otherord) otherord++;
   }
   if (myord) return -1;
   if (otherord) return 1;
   return 0;
}
void TSlaveInfo::Print(Option_t *opt) const
{
   
   
   
   
   TString stat = fStatus == kActive ? "active" :
                  fStatus == kBad ? "bad" :
                  "not active";
   TString msd  = fMsd.IsNull() ? "<null>" : fMsd.Data();
   if (!opt) opt = "";
   if (!strcmp(opt, "active") && fStatus != kActive)
      return;
   if (!strcmp(opt, "notactive") && fStatus != kNotActive)
      return;
   if (!strcmp(opt, "bad") && fStatus != kBad)
      return;
   cout << "Slave: "          << fOrdinal
        << "  hostname: "     << fHostName
        << "  msd: "          << msd
        << "  perf index: "   << fPerfIndex
        << "  "               << stat
        << endl;
}
static char *CollapseSlashesInPath(const char *path)
{
   
   
   if (path) {
      Int_t i = 1; 
      Int_t j = 0; 
      char *newPath = new char [strlen(path) + 1];
      newPath[0] = path[0];
      while (path[i]) {
         if (path[i] != '/' || newPath[j] != '/') {
            j++;
            newPath[j] = path[i];
         }
         i++;
      }
      if (newPath[j] != '/')
         j++;
      newPath[j] = 0; 
      return newPath;
   }
   return 0;
}
ClassImp(TProof)
TSemaphore    *TProof::fgSemaphore = 0;
TProof::TProof(const char *masterurl, const char *conffile, const char *confdir,
               Int_t loglevel, const char *alias, TProofMgr *mgr)
       : fUrl(masterurl)
{
   
   
   
   
   
   
   
   
   
   
   
   fManager = mgr;
   
   fServType = TProofMgr::kXProofd;
   
   fQueryMode = kSync;
   if (!conffile || strlen(conffile) == 0)
      conffile = kPROOF_ConfFile;
   if (!confdir  || strlen(confdir) == 0)
      confdir = kPROOF_ConfDir;
   Init(masterurl, conffile, confdir, loglevel, alias);
   
   
   if (mgr) {
      R__LOCKGUARD2(gROOTMutex);
      gROOT->GetListOfSockets()->Remove(mgr);
      gROOT->GetListOfSockets()->Add(mgr);
   }
   
   if (IsProofd() || IsMaster())
      gROOT->GetListOfProofs()->Add(this);
   
   gProof = this;
}
TProof::TProof() : fUrl(""), fServType(TProofMgr::kXProofd)
{
   
   
   
   
   
   
   gROOT->GetListOfProofs()->Add(this);
   gProof = this;
}
TProof::~TProof()
{
   
   while (TChain *chain = dynamic_cast<TChain*> (fChains->First()) ) {
      
      chain->SetProof(0);
      RemoveChain(chain);
   }
   
   if (!IsMaster()) {
      
      TIter nextpackage(fEnabledPackagesOnClient);
      while (TObjString *package = dynamic_cast<TObjString*>(nextpackage())) {
         FileStat_t stat;
         gSystem->GetPathInfo(package->String(), stat);
         
         
         
         if (stat.fIsLink)
            gSystem->Unlink(package->String());
      }
   }
   Close();
   SafeDelete(fIntHandler);
   SafeDelete(fSlaves);
   SafeDelete(fActiveSlaves);
   SafeDelete(fInactiveSlaves);
   SafeDelete(fUniqueSlaves);
   SafeDelete(fAllUniqueSlaves);
   SafeDelete(fNonUniqueMasters);
   SafeDelete(fBadSlaves);
   SafeDelete(fAllMonitor);
   SafeDelete(fActiveMonitor);
   SafeDelete(fUniqueMonitor);
   SafeDelete(fAllUniqueMonitor);
   SafeDelete(fSlaveInfo);
   SafeDelete(fChains);
   SafeDelete(fPlayer);
   SafeDelete(fFeedback);
   SafeDelete(fWaitingSlaves);
   SafeDelete(fAvailablePackages);
   SafeDelete(fEnabledPackages);
   SafeDelete(fEnabledPackagesOnClient);
   SafeDelete(fPackageLock);
   SafeDelete(fGlobalPackageDirList);
   
   if (!IsMaster()) {
      if (fLogFileR)
         fclose(fLogFileR);
      if (fLogFileW)
         fclose(fLogFileW);
      if (fLogFileName.Length())
         gSystem->Unlink(fLogFileName);
   }
   
   Emit("~TProof()");
}
Int_t TProof::Init(const char *masterurl, const char *conffile,
                   const char *confdir, Int_t loglevel, const char *alias)
{
   
   
   
   
   
   
   R__ASSERT(gSystem);
   fValid = kFALSE;
   if (strlen(fUrl.GetOptions()) > 0 && !(strncmp(fUrl.GetOptions(),"std",3))) {
      fServType = TProofMgr::kProofd;
      fUrl.SetOptions("");
   }
   if (!masterurl || !*masterurl) {
      fUrl.SetProtocol("proof");
      fUrl.SetHost("__master__");
   } else if (!(strstr(masterurl, "://"))) {
      fUrl.SetProtocol("proof");
   }
   if (fUrl.GetPort() == TUrl(" ").GetPort())
      fUrl.SetPort(TUrl("proof:// ").GetPort());
   
   Bool_t attach = kFALSE;
   if (strlen(fUrl.GetOptions()) > 0) {
      attach = kTRUE;
      
      TString opts = fUrl.GetOptions();
      if (opts.Contains("GUI")) {
         SetBit(TProof::kUsingSessionGui);
         opts.Remove(opts.Index("GUI"));
         fUrl.SetOptions(opts);
      }
   }
   if (strlen(fUrl.GetUser()) <= 0) {
      
      UserGroup_t *pw = gSystem->GetUserInfo();
      if (pw) {
         fUrl.SetUser(pw->fUser);
         delete pw;
      }
   }
   
   
   
   if (!strlen(fUrl.GetHost()))
      fMaster = gSystem->GetHostByName(gSystem->HostName()).GetHostName();
   else
      fMaster = gSystem->GetHostByName(fUrl.GetHost()).GetHostName();
   fConfDir        = confdir;
   fConfFile       = conffile;
   fWorkDir        = gSystem->WorkingDirectory();
   fLogLevel       = loglevel;
   fProtocol       = kPROOF_Protocol;
   fMasterServ     = (fMaster == "__master__") ? kTRUE : kFALSE;
   fSendGroupView  = kTRUE;
   fImage          = fMasterServ ? "" : "<local>";
   fIntHandler     = 0;
   fStatus         = 0;
   fSlaveInfo      = 0;
   fChains         = new TList;
   fAvailablePackages = 0;
   fEnabledPackages = 0;
   fEndMaster      = IsMaster() ? kTRUE : kFALSE;
   
   if (!IsMaster())
      fDataPoolUrl.Form("root://%s", fMaster.Data());
   else
      fDataPoolUrl = "";
   fProgressDialog        = 0;
   fProgressDialogStarted = kFALSE;
   
   TString      al = (alias) ? alias : fMaster.Data();
   SetAlias(al);
   
   fRedirLog = kFALSE;
   if (!IsMaster()) {
      fLogFileName    = "ProofLog_";
      if ((fLogFileW = gSystem->TempFileName(fLogFileName)) == 0)
         Error("Init", "could not create temporary logfile");
      if ((fLogFileR = fopen(fLogFileName, "r")) == 0)
         Error("Init", "could not open temp logfile for reading");
   }
   fLogToWindowOnly = kFALSE;
   
   fIdle = kTRUE;
   
   fSync = kTRUE;
   
   fQueries = 0;
   fOtherQueries = 0;
   fDrawQueries = 0;
   fMaxDrawQueries = 1;
   fSeqNum = 0;
   
   fSessionID = -1;
   
   fWaitingSlaves = 0;
   
   fPlayer = 0;
   MakePlayer();
   fFeedback = new TList;
   fFeedback->SetOwner();
   fFeedback->SetName("FeedbackList");
   AddInput(fFeedback);
   
   fSlaves           = new TSortedList(kSortDescending);
   fActiveSlaves     = new TList;
   fInactiveSlaves   = new TList;
   fUniqueSlaves     = new TList;
   fAllUniqueSlaves  = new TList;
   fNonUniqueMasters = new TList;
   fBadSlaves        = new TList;
   fAllMonitor       = new TMonitor;
   fActiveMonitor    = new TMonitor;
   fUniqueMonitor    = new TMonitor;
   fAllUniqueMonitor = new TMonitor;
   fCurrentMonitor   = 0;
   fPackageLock             = 0;
   fEnabledPackagesOnClient = 0;
   fGlobalPackageDirList    = 0;
   if (!IsMaster()) {
      fPackageDir = kPROOF_WorkDir;
      gSystem->ExpandPathName(fPackageDir);
      if (gSystem->AccessPathName(fPackageDir)) {
         if (gSystem->MakeDirectory(fPackageDir) == -1) {
            Error("Init", "failure creating directory %s", fPackageDir.Data());
            return 0;
         }
      }
      fPackageDir += TString("/") + kPROOF_PackDir;
      if (gSystem->AccessPathName(fPackageDir)) {
         if (gSystem->MakeDirectory(fPackageDir) == -1) {
            Error("Init", "failure creating directory %s", fPackageDir.Data());
            return 0;
         }
      }
      
      TString globpack = gEnv->GetValue("Proof.GlobalPackageDirs","");
      if (globpack.Length() > 0) {
         Int_t ng = 0;
         Int_t from = 0;
         TString ldir;
         while (globpack.Tokenize(ldir, from, ":")) {
            if (gSystem->AccessPathName(ldir, kReadPermission)) {
               Warning("Init", "directory for global packages %s does not"
                               " exist or is not readable", ldir.Data());
            } else {
               
               TString key = Form("G%d", ng++);
               if (!fGlobalPackageDirList) {
                  fGlobalPackageDirList = new THashList();
                  fGlobalPackageDirList->SetOwner();
               }
               fGlobalPackageDirList->Add(new TNamed(key,ldir));
            }
         }
      }
      UserGroup_t *ug = gSystem->GetUserInfo();
      fPackageLock = new TProofLockPath(Form("%s%s", kPROOF_PackageLockFile, ug->fUser.Data()));
      delete ug;
      fEnabledPackagesOnClient = new TList;
      fEnabledPackagesOnClient->SetOwner();
   }
   
   Bool_t parallelStartup = kFALSE;
   if (!attach && IsMaster()) {
      parallelStartup = gEnv->GetValue("Proof.ParallelStartup", kFALSE);
      PDB(kGlobal,1) Info("Init", "Parallel Startup: %s",
                          parallelStartup ? "kTRUE" : "kFALSE");
      if (parallelStartup) {
         
         TString threadLib = "libThread";
         char *p;
         if ((p = gSystem->DynamicPathName(threadLib, kTRUE))) {
            delete[]p;
            if (gSystem->Load(threadLib) == -1) {
               Warning("Init",
                       "Cannot load libThread: switch to serial startup (%s)",
                       threadLib.Data());
               parallelStartup = kFALSE;
            }
         } else {
            Warning("Init",
                    "Cannot find libThread: switch to serial startup (%s)",
                    threadLib.Data());
            parallelStartup = kFALSE;
         }
         
         Int_t parallelRequests = gEnv->GetValue("Proof.ParallelStartupRequests", 0);
         if (parallelRequests > 0) {
            PDB(kGlobal,1)
               Info("Init", "Parallel Startup Requests: %d", parallelRequests);
            fgSemaphore = new TSemaphore((UInt_t)(parallelRequests));
         }
      }
   }
   
   if (!StartSlaves(parallelStartup, attach))
      return 0;
   if (fgSemaphore)
      SafeDelete(fgSemaphore);
   
   fValid = kTRUE;
   
   fAllMonitor->DeActivateAll();
   
   GoParallel(9999, attach);
   
   if (!attach)
      SendInitialState();
   else if (!IsIdle())
      
      fRedirLog = kTRUE;
   
   if (!IsMaster())
      SetAlias(al);
   SetActive(kFALSE);
   if (IsValid()) {
      
      ActivateAsyncInput();
      R__LOCKGUARD2(gROOTMutex);
      gROOT->GetListOfSockets()->Add(this);
   }
   return fActiveSlaves->GetSize();
}
void TProof::SetManager(TProofMgr *mgr)
{
   
   
   fManager = mgr;
   if (mgr) {
      R__LOCKGUARD2(gROOTMutex);
      gROOT->GetListOfSockets()->Remove(mgr);
      gROOT->GetListOfSockets()->Add(mgr);
   }
}
Bool_t TProof::StartSlaves(Bool_t parallel, Bool_t attach)
{
   
   
   
   if (IsMaster()) {
      Int_t pc = 0;
      TList *workerList = new TList;
      
      if (gProofServ->GetWorkers(workerList, pc) == TProofServ::kQueryStop) {
         Error("StartSlaves", "getting list of worker nodes");
         return kFALSE;
      }
      fImage = gProofServ->GetImage();
      
      UInt_t nSlaves = workerList->GetSize();
      UInt_t nSlavesDone = 0;
      Int_t ord = 0;
      
      std::vector<TProofThread *> thrHandlers;
      if (parallel) {
         thrHandlers.reserve(nSlaves);
         if (thrHandlers.max_size() < nSlaves) {
            PDB(kGlobal,1)
               Info("StartSlaves","cannot reserve enough space for thread"
                    " handlers - switch to serial startup");
            parallel = kFALSE;
         }
      }
      
      TListIter next(workerList);
      TObject *to;
      TProofNodeInfo *worker;
      while ((to = next())) {
         
         worker = (TProofNodeInfo *)to;
         
         const Char_t *image = worker->GetImage().Data();
         const Char_t *workdir = worker->GetWorkDir().Data();
         Int_t perfidx = worker->GetPerfIndex();
         Int_t sport = worker->GetPort();
         if (sport == -1)
            sport = fUrl.GetPort();
         
         TString fullord = TString(gProofServ->GetOrdinal()) + "." + ((Long_t) ord);
         if (parallel) {
            
            TProofThreadArg *ta =
               new TProofThreadArg(worker->GetNodeName().Data(), sport,
                                   fullord, perfidx, image, workdir,
                                   fSlaves, this);
            if (ta) {
               
               TThread *th = new TThread(SlaveStartupThread, ta);
               if (!th) {
                  Info("StartSlaves","Can't create startup thread:"
                       " out of system resources");
                  SafeDelete(ta);
               } else {
                  
                  thrHandlers.push_back(new TProofThread(th, ta));
                  
                  th->Run();
                  
                  nSlavesDone++;
                  TMessage m(kPROOF_SERVERSTARTED);
                  m << TString("Opening connections to workers") << nSlaves
                    << nSlavesDone << kTRUE;
                  gProofServ->GetSocket()->Send(m);
               }
            } 
            else {
               Info("StartSlaves","Can't create thread arguments object:"
                    " out of system resources");
            }
         } 
         else {
            
            TUrl u(Form("%s:%d",worker->GetNodeName().Data(), sport));
            
            if (strlen(gProofServ->GetGroup()) > 0) {
               
               if (strlen(u.GetUser()) <= 0)
                  u.SetUser(gProofServ->GetUser());
               u.SetPasswd(gProofServ->GetGroup());
            }
            TSlave *slave = CreateSlave(u.GetUrl(), fullord, perfidx,
                                        image, workdir);
            
            
            Bool_t slaveOk = kTRUE;
            if (slave->IsValid()) {
               fSlaves->Add(slave);
            } else {
               slaveOk = kFALSE;
               fBadSlaves->Add(slave);
            }
            PDB(kGlobal,3)
               Info("StartSlaves", "worker on host %s created"
                    " and added to list", worker->GetNodeName().Data());
            
            nSlavesDone++;
            TMessage m(kPROOF_SERVERSTARTED);
            m << TString("Opening connections to workers") << nSlaves
              << nSlavesDone << slaveOk;
            gProofServ->GetSocket()->Send(m);
         }
         ord++;
      } 
      
      SafeDelete(workerList);
      nSlavesDone = 0;
      if (parallel) {
         
         std::vector<TProofThread *>::iterator i;
         for (i = thrHandlers.begin(); i != thrHandlers.end(); ++i) {
            TProofThread *pt = *i;
            
            if (pt && pt->fThread->GetState() == TThread::kRunningState) {
               PDB(kGlobal,3)
                  Info("Init",
                       "parallel startup: waiting for worker %s (%s:%d)",
                        pt->fArgs->fOrd.Data(), pt->fArgs->fUrl->GetHost(),
                        pt->fArgs->fUrl->GetPort());
               pt->fThread->Join();
            }
            
            nSlavesDone++;
            TMessage m(kPROOF_SERVERSTARTED);
            m << TString("Setting up worker servers") << nSlaves
              << nSlavesDone << kTRUE;
            gProofServ->GetSocket()->Send(m);
         }
         TIter next(fSlaves);
         TSlave *sl = 0;
         while ((sl = (TSlave *)next())) {
            if (sl->IsValid())
               fAllMonitor->Add(sl->GetSocket());
            else
               fBadSlaves->Add(sl);
         }
         
         while (!thrHandlers.empty()) {
            i = thrHandlers.end()-1;
            if (*i) {
               SafeDelete(*i);
               thrHandlers.erase(i);
            }
         }
      } else {
         
         
         TIter nxsl(fSlaves);
         TSlave *sl = 0;
         while ((sl = (TSlave *) nxsl())) {
            
            if (sl->IsValid())
               sl->SetupServ(TSlave::kSlave, 0);
            
            Bool_t slaveOk = kTRUE;
            if (sl->IsValid()) {
               fAllMonitor->Add(sl->GetSocket());
            } else {
               slaveOk = kFALSE;
               fBadSlaves->Add(sl);
            }
            
            nSlavesDone++;
            TMessage m(kPROOF_SERVERSTARTED);
            m << TString("Setting up worker servers") << nSlaves
              << nSlavesDone << slaveOk;
            gProofServ->GetSocket()->Send(m);
         }
      }
   } else {
      
      fprintf(stderr,"Starting master: opening connection ... \n");
      TSlave *slave = CreateSubmaster(fUrl.GetUrl(), "0", "master", 0);
      if (slave->IsValid()) {
         
         fprintf(stderr,"Starting master:"
                        " connection open: setting up server ...             \r");
         StartupMessage("Connection to master opened", kTRUE, 1, 1);
         if (!attach) {
            
            slave->SetInterruptHandler(kTRUE);
            
            slave->SetupServ(TSlave::kMaster, fConfFile);
            if (slave->IsValid()) {
               
               fprintf(stderr,"Starting master: OK                                     \n");
               StartupMessage("Master started", kTRUE, 1, 1);
               
               
               if (fProtocol == 1) {
                  Error("StartSlaves",
                        "client and remote protocols not compatible (%d and %d)",
                        kPROOF_Protocol, fProtocol);
                  slave->Close("S");
                  delete slave;
                  return kFALSE;
               }
               fSlaves->Add(slave);
               fAllMonitor->Add(slave->GetSocket());
               
               slave->SetInterruptHandler(kFALSE);
               
               fIntHandler = new TProofInterruptHandler(this);
               Collect(slave);
               Int_t slStatus = slave->GetStatus();
               if (slStatus == -99 || slStatus == -98) {
                  fSlaves->Remove(slave);
                  fAllMonitor->Remove(slave->GetSocket());
                  if (slStatus == -99)
                     Error("StartSlaves", "not allowed to connect to PROOF master server");
                  else if (slStatus == -98)
                     Error("StartSlaves", "could not setup output redirection on master");
                  else
                     Error("StartSlaves", "setting up master");
                  slave->Close("S");
                  delete slave;
                  return 0;
               }
               if (!slave->IsValid()) {
                  fSlaves->Remove(slave);
                  fAllMonitor->Remove(slave->GetSocket());
                  slave->Close("S");
                  delete slave;
                  Error("StartSlaves",
                        "failed to setup connection with PROOF master server");
                  return kFALSE;
               }
               if (!gROOT->IsBatch()) {
                  if ((fProgressDialog =
                     gROOT->GetPluginManager()->FindHandler("TProofProgressDialog")))
                     if (fProgressDialog->LoadPlugin() == -1)
                        fProgressDialog = 0;
               }
            } else {
               
               fprintf(stderr,"Starting master: failure\n");
            }
         } else {
            
            if (attach) {
               fprintf(stderr,"Starting master: OK                                     \n");
               StartupMessage("Master attached", kTRUE, 1, 1);
               if (!gROOT->IsBatch()) {
                  if ((fProgressDialog =
                     gROOT->GetPluginManager()->FindHandler("TProofProgressDialog")))
                     if (fProgressDialog->LoadPlugin() == -1)
                        fProgressDialog = 0;
               }
            } else {
               fprintf(stderr,"Starting manager: OK                                    \n");
               StartupMessage("Manager started", kTRUE, 1, 1);
            }
            fSlaves->Add(slave);
            fAllMonitor->Add(slave->GetSocket());
            fIntHandler = new TProofInterruptHandler(this);
         }
      } else {
         delete slave;
         Error("StartSlaves", "failed to connect to a PROOF master server");
         return kFALSE;
      }
   }
   return kTRUE;
}
void TProof::Close(Option_t *opt)
{
   
   
   
   
   if (fSlaves) {
      if (fIntHandler)
         fIntHandler->Remove();
      TIter nxs(fSlaves);
      TSlave *sl = 0;
      while ((sl = (TSlave *)nxs()))
         sl->Close(opt);
      fActiveSlaves->Clear("nodelete");
      fUniqueSlaves->Clear("nodelete");
      fAllUniqueSlaves->Clear("nodelete");
      fNonUniqueMasters->Clear("nodelete");
      fBadSlaves->Clear("nodelete");
      fSlaves->Delete();
   }
   {
      R__LOCKGUARD2(gROOTMutex);
      gROOT->GetListOfSockets()->Remove(this);
      if (IsProofd()) {
         gROOT->GetListOfProofs()->Remove(this);
         if (gProof && gProof == this) {
            
            TIter pvp(gROOT->GetListOfProofs(), kIterBackward);
            while ((gProof = (TProof *)pvp())) {
               if (gProof->IsProofd())
                  break;
            }
         }
      }
   }
}
TSlave *TProof::CreateSlave(const char *url, const char *ord,
                            Int_t perf, const char *image, const char *workdir)
{
   
   
   
   TSlave* sl = TSlave::Create(url, ord, perf, image,
                               this, TSlave::kSlave, workdir, 0);
   if (sl->IsValid()) {
      sl->SetInputHandler(new TProofInputHandler(this, sl->GetSocket()));
      
      
      sl->fParallel = 1;
   }
   return sl;
}
TSlave *TProof::CreateSubmaster(const char *url, const char *ord,
                                const char *image, const char *msd)
{
   
   
   
   TSlave *sl = TSlave::Create(url, ord, 100, image, this,
                               TSlave::kMaster, 0, msd);
   if (sl->IsValid()) {
      sl->SetInputHandler(new TProofInputHandler(this, sl->GetSocket()));
   }
   return sl;
}
TSlave *TProof::FindSlave(TSocket *s) const
{
   
   TSlave *sl;
   TIter   next(fSlaves);
   while ((sl = (TSlave *)next())) {
      if (sl->IsValid() && sl->GetSocket() == s)
         return sl;
   }
   return 0;
}
void TProof::FindUniqueSlaves()
{
   
   
   
   
   
   
   
   fUniqueSlaves->Clear();
   fUniqueMonitor->RemoveAll();
   fAllUniqueSlaves->Clear();
   fAllUniqueMonitor->RemoveAll();
   fNonUniqueMasters->Clear();
   TIter next(fActiveSlaves);
   while (TSlave *sl = dynamic_cast<TSlave*>(next())) {
      if (fImage == sl->fImage) {
         if (sl->GetSlaveType() == TSlave::kMaster) {
            fNonUniqueMasters->Add(sl);
            fAllUniqueSlaves->Add(sl);
            fAllUniqueMonitor->Add(sl->GetSocket());
         }
         continue;
      }
      TIter next2(fUniqueSlaves);
      TSlave *replace_slave = 0;
      Bool_t add = kTRUE;
      while (TSlave *sl2 = dynamic_cast<TSlave*>(next2())) {
         if (sl->fImage == sl2->fImage) {
            add = kFALSE;
            if (sl->GetSlaveType() == TSlave::kMaster) {
               if (sl2->GetSlaveType() == TSlave::kSlave) {
                  
                  replace_slave = sl2;
                  add = kTRUE;
               } else if (sl2->GetSlaveType() == TSlave::kMaster) {
                  fNonUniqueMasters->Add(sl);
                  fAllUniqueSlaves->Add(sl);
                  fAllUniqueMonitor->Add(sl->GetSocket());
               } else {
                  Error("FindUniqueSlaves", "TSlave is neither Master nor Slave");
                  R__ASSERT(0);
               }
            }
            break;
         }
      }
      if (add) {
         fUniqueSlaves->Add(sl);
         fAllUniqueSlaves->Add(sl);
         fUniqueMonitor->Add(sl->GetSocket());
         fAllUniqueMonitor->Add(sl->GetSocket());
         if (replace_slave) {
            fUniqueSlaves->Remove(replace_slave);
            fAllUniqueSlaves->Remove(replace_slave);
            fUniqueMonitor->Remove(replace_slave->GetSocket());
            fAllUniqueMonitor->Remove(replace_slave->GetSocket());
         }
      }
   }
   
   fUniqueMonitor->DeActivateAll();
   fAllUniqueMonitor->DeActivateAll();
}
Int_t TProof::GetNumberOfSlaves() const
{
   
   return fSlaves->GetSize();
}
Int_t TProof::GetNumberOfActiveSlaves() const
{
   
   
   return fActiveSlaves->GetSize();
}
Int_t TProof::GetNumberOfInactiveSlaves() const
{
   
   
   return fInactiveSlaves->GetSize();
}
Int_t TProof::GetNumberOfUniqueSlaves() const
{
   
   
   return fUniqueSlaves->GetSize();
}
Int_t TProof::GetNumberOfBadSlaves() const
{
   
   
   return fBadSlaves->GetSize();
}
void TProof::AskStatistics()
{
   
   if (!IsValid()) return;
   Broadcast(kPROOF_GETSTATS, kActive);
   Collect(kActive);
}
void TProof::AskParallel()
{
   
   if (!IsValid()) return;
   Broadcast(kPROOF_GETPARALLEL, kActive);
   Collect(kActive);
}
TList *TProof::GetListOfQueries(Option_t *opt)
{
   
   if (!IsValid() || IsMaster()) return (TList *)0;
   Bool_t all = ((strchr(opt,'A') || strchr(opt,'a'))) ? kTRUE : kFALSE;
   TMessage m(kPROOF_QUERYLIST);
   m << all;
   Broadcast(m, kActive);
   Collect(kActive);
   
   return fQueries;
}
Int_t TProof::GetNumberOfQueries()
{
   
   if (fQueries)
      return fQueries->GetSize() - fOtherQueries;
   return 0;
}
void TProof::SetMaxDrawQueries(Int_t max)
{
   
   if (max > 0) {
      if (fPlayer)
         fPlayer->SetMaxDrawQueries(max);
      fMaxDrawQueries = max;
   }
}
void TProof::GetMaxQueries()
{
   
   
   TMessage m(kPROOF_MAXQUERIES);
   m << kFALSE;
   Broadcast(m, kActive);
   Collect(kActive);
}
TList *TProof::GetQueryResults()
{
   
   return fPlayer->GetListOfResults();
}
TQueryResult *TProof::GetQueryResult(const char *ref)
{
   
   
   return fPlayer->GetQueryResult(ref);
}
void TProof::ShowQueries(Option_t *opt)
{
   
   
   
   
   
   
   
   
   
   
   Bool_t help = ((strchr(opt,'H') || strchr(opt,'h'))) ? kTRUE : kFALSE;
   if (help) {
      
      Printf("+++");
      Printf("+++ Options: \"A\" show all queries known to server");
      Printf("+++          \"L\" show retrieved queries");
      Printf("+++          \"F\" full listing of query info");
      Printf("+++          \"H\" print this menu");
      Printf("+++");
      Printf("+++ (case insensitive)");
      Printf("+++");
      Printf("+++ Use Retrieve(<#>) to retrieve the full"
             " query results from the master");
      Printf("+++     e.g. Retrieve(8)");
      Printf("+++");
      return;
   }
   if (!IsValid()) return;
   Bool_t local = ((strchr(opt,'L') || strchr(opt,'l'))) ? kTRUE : kFALSE;
   TObject *pq = 0;
   if (!local) {
      GetListOfQueries(opt);
      if (!fQueries) return;
      TIter nxq(fQueries);
      
      if (fOtherQueries > 0) {
         Printf("+++");
         Printf("+++ Queries processed during other sessions: %d", fOtherQueries);
         Int_t nq = 0;
         while (nq++ < fOtherQueries && (pq = nxq()))
            pq->Print(opt);
      }
      
      Printf("+++");
      Printf("+++ Queries processed during this session: selector: %d, draw: %d",
              GetNumberOfQueries(), fDrawQueries);
      while ((pq = nxq()))
         pq->Print(opt);
   } else {
      
      Printf("+++");
      Printf("+++ Queries processed during this session: selector: %d, draw: %d",
              GetNumberOfQueries(), fDrawQueries);
      
      TList *listlocal = fPlayer->GetListOfResults();
      if (listlocal) {
         Printf("+++");
         Printf("+++ Queries available locally: %d", listlocal->GetSize());
         TIter nxlq(listlocal);
         while ((pq = nxlq()))
            pq->Print(opt);
      }
   }
   Printf("+++");
}
Bool_t TProof::IsDataReady(Long64_t &totalbytes, Long64_t &bytesready)
{
   
   if (!IsValid()) return kFALSE;
   TList submasters;
   TIter nextSlave(GetListOfActiveSlaves());
   while (TSlave *sl = dynamic_cast<TSlave*>(nextSlave())) {
      if (sl->GetSlaveType() == TSlave::kMaster) {
         submasters.Add(sl);
      }
   }
   fDataReady = kTRUE; 
   fBytesReady = 0;
   fTotalBytes = 0;
   
   if (submasters.GetSize() > 0) {
      Broadcast(kPROOF_DATA_READY, &submasters);
      Collect(&submasters);
   }
   bytesready = fBytesReady;
   totalbytes = fTotalBytes;
   EmitVA("IsDataReady(Long64_t,Long64_t)", 2, totalbytes, bytesready);
   
   Info("IsDataReady", "%lld / %lld (%s)",
        bytesready, totalbytes, fDataReady?"READY":"NOT READY");
   return fDataReady;
}
void TProof::Interrupt(EUrgent type, ESlaves list)
{
   
   if (!IsValid()) return;
   TList *slaves = 0;
   if (list == kAll)       slaves = fSlaves;
   if (list == kActive)    slaves = fActiveSlaves;
   if (list == kUnique)    slaves = fUniqueSlaves;
   if (list == kAllUnique) slaves = fAllUniqueSlaves;
   if (slaves->GetSize() == 0) return;
   TSlave *sl;
   TIter   next(slaves);
   while ((sl = (TSlave *)next())) {
      if (sl->IsValid()) {
         
         sl->Interrupt((Int_t)type);
      }
   }
}
Int_t TProof::GetParallel() const
{
   
   
   if (!IsValid()) return -1;
   
   TIter nextSlave(GetListOfActiveSlaves());
   Int_t nparallel = 0;
   while (TSlave* sl = dynamic_cast<TSlave*>(nextSlave()))
      if (sl->GetParallel() >= 0)
         nparallel += sl->GetParallel();
   return nparallel;
}
TList *TProof::GetListOfSlaveInfos()
{
   
   if (!IsValid()) return 0;
   if (fSlaveInfo == 0) {
      fSlaveInfo = new TSortedList(kSortDescending);
      fSlaveInfo->SetOwner();
   } else {
      fSlaveInfo->Delete();
   }
   TList masters;
   TIter next(GetListOfSlaves());
   TSlave *slave;
   while ((slave = (TSlave *) next()) != 0) {
      if (slave->GetSlaveType() == TSlave::kSlave) {
         TSlaveInfo *slaveinfo = new TSlaveInfo(slave->GetOrdinal(),
                                                slave->GetName(),
                                                slave->GetPerfIdx());
         fSlaveInfo->Add(slaveinfo);
         TIter nextactive(GetListOfActiveSlaves());
         TSlave *activeslave;
         while ((activeslave = (TSlave *) nextactive())) {
            if (TString(slaveinfo->GetOrdinal()) == activeslave->GetOrdinal()) {
               slaveinfo->SetStatus(TSlaveInfo::kActive);
               break;
            }
         }
         TIter nextbad(GetListOfBadSlaves());
         TSlave *badslave;
         while ((badslave = (TSlave *) nextbad())) {
            if (TString(slaveinfo->GetOrdinal()) == badslave->GetOrdinal()) {
               slaveinfo->SetStatus(TSlaveInfo::kBad);
               break;
            }
         }
      } else if (slave->GetSlaveType() == TSlave::kMaster) {
         if (slave->IsValid()) {
            if (slave->GetSocket()->Send(kPROOF_GETSLAVEINFO) == -1)
               MarkBad(slave);
            else
               masters.Add(slave);
         }
      } else {
         Error("GetSlaveInfo", "TSlave is neither Master nor Slave");
         R__ASSERT(0);
      }
   }
   if (masters.GetSize() > 0) Collect(&masters);
   return fSlaveInfo;
}
void TProof::Activate(TList *slaves)
{
   
   TMonitor *mon = fAllMonitor;
   mon->DeActivateAll();
   slaves = !slaves ? fActiveSlaves : slaves;
   TIter next(slaves);
   TSlave *sl;
   while ((sl = (TSlave*) next())) {
      if (sl->IsValid())
         mon->Activate(sl->GetSocket());
   }
}
Int_t TProof::Broadcast(const TMessage &mess, TList *slaves)
{
   
   
   
   if (!IsValid()) return -1;
   if (slaves->GetSize() == 0) return 0;
   int   nsent = 0;
   TIter next(slaves);
   TSlave *sl;
   while ((sl = (TSlave *)next())) {
      if (sl->IsValid()) {
         if (sl->GetSocket()->Send(mess) == -1)
            MarkBad(sl);
         else
            nsent++;
      }
   }
   return nsent;
}
Int_t TProof::Broadcast(const TMessage &mess, ESlaves list)
{
   
   
   
   TList *slaves = 0;
   if (list == kAll)       slaves = fSlaves;
   if (list == kActive)    slaves = fActiveSlaves;
   if (list == kUnique)    slaves = fUniqueSlaves;
   if (list == kAllUnique) slaves = fAllUniqueSlaves;
   return Broadcast(mess, slaves);
}
Int_t TProof::Broadcast(const char *str, Int_t kind, TList *slaves)
{
   
   
   
   TMessage mess(kind);
   if (str) mess.WriteString(str);
   return Broadcast(mess, slaves);
}
Int_t TProof::Broadcast(const char *str, Int_t kind, ESlaves list)
{
   
   
   
   
   TMessage mess(kind);
   if (str) mess.WriteString(str);
   return Broadcast(mess, list);
}
Int_t TProof::BroadcastObject(const TObject *obj, Int_t kind, TList *slaves)
{
   
   
   
   TMessage mess(kind);
   mess.WriteObject(obj);
   return Broadcast(mess, slaves);
}
Int_t TProof::BroadcastObject(const TObject *obj, Int_t kind, ESlaves list)
{
   
   
   
   TMessage mess(kind);
   mess.WriteObject(obj);
   return Broadcast(mess, list);
}
Int_t TProof::BroadcastRaw(const void *buffer, Int_t length, TList *slaves)
{
   
   
   
   if (!IsValid()) return -1;
   if (slaves->GetSize() == 0) return 0;
   int   nsent = 0;
   TIter next(slaves);
   TSlave *sl;
   while ((sl = (TSlave *)next())) {
      if (sl->IsValid()) {
         if (sl->GetSocket()->SendRaw(buffer, length) == -1)
            MarkBad(sl);
         else
            nsent++;
      }
   }
   return nsent;
}
Int_t TProof::BroadcastRaw(const void *buffer, Int_t length, ESlaves list)
{
   
   
   
   TList *slaves = 0;
   if (list == kAll)       slaves = fSlaves;
   if (list == kActive)    slaves = fActiveSlaves;
   if (list == kUnique)    slaves = fUniqueSlaves;
   if (list == kAllUnique) slaves = fAllUniqueSlaves;
   return BroadcastRaw(buffer, length, slaves);
}
Int_t TProof::Collect(const TSlave *sl, Long_t timeout)
{
   
   
   
   
   if (!sl->IsValid()) return 0;
   TMonitor *mon = fAllMonitor;
   mon->DeActivateAll();
   mon->Activate(sl->GetSocket());
   return Collect(mon, timeout);
}
Int_t TProof::Collect(TList *slaves, Long_t timeout)
{
   
   
   
   
   TMonitor *mon = fAllMonitor;
   mon->DeActivateAll();
   TIter next(slaves);
   TSlave *sl;
   while ((sl = (TSlave*) next())) {
      if (sl->IsValid())
         mon->Activate(sl->GetSocket());
   }
   return Collect(mon, timeout);
}
Int_t TProof::Collect(ESlaves list, Long_t timeout)
{
   
   
   
   
   TMonitor *mon = 0;
   if (list == kAll)       mon = fAllMonitor;
   if (list == kActive)    mon = fActiveMonitor;
   if (list == kUnique)    mon = fUniqueMonitor;
   if (list == kAllUnique) mon = fAllUniqueMonitor;
   mon->ActivateAll();
   return Collect(mon, timeout);
}
Int_t TProof::Collect(TMonitor *mon, Long_t timeout)
{
   
   
   
   
   fStatus = 0;
   if (!mon->GetActive()) return 0;
   DeActivateAsyncInput();
   
   fCurrentMonitor = mon;
   
   
   Bool_t saveRedirLog = fRedirLog;
   if (!IsIdle() && !IsSync())
      fRedirLog = kFALSE;
   int cnt = 0, rc = 0;
   fBytesRead = 0;
   fRealTime  = 0.0;
   fCpuTime   = 0.0;
   
   Long_t nto = timeout;
   if (gDebug > 2)
      Info("Collect","active: %d", mon->GetActive());
   
   if (fIntHandler)
      fIntHandler->Add();
   while (mon->GetActive() && (nto < 0 || nto > 0)) {
      
      TSocket *s = mon->Select(1000);
      if (s && s != (TSocket *)(-1)) {
         
         if ((rc = CollectInputFrom(s)) == 1) {
            
            mon->DeActivate(s);
            if (gDebug > 2)
               Info("Collect","deactivating %p (active: %d, %p)",
                              s, mon->GetActive(),
                              mon->GetListOfActives()->First());
         }
         
         if (rc >= 0)
            cnt++;
      } else {
         
         
         
         if (!s)
            if (fPlayer && (fPlayer->GetExitStatus() == TVirtualProofPlayer::kFinished))
               mon->DeActivateAll();
         
         if (s == (TSocket *)(-1) && nto > 0)
            nto--;
      }
   }
   
   if (nto == 0)
      mon->DeActivateAll();
   
   if (fIntHandler)
      fIntHandler->Remove();
   
   SendGroupView();
   
   fRedirLog = saveRedirLog;
   
   fCurrentMonitor = 0;
   ActivateAsyncInput();
   return cnt;
}
R__HIDDEN void TProof::CleanGDirectory(TList *ol)
{
   
   if (ol) {
      TIter nxo(ol);
      TObject *o = 0;
      while ((o = nxo()))
         gDirectory->RecursiveRemove(o);
   }
}
Int_t TProof::CollectInputFrom(TSocket *s)
{
   
   
   TMessage *mess;
   Int_t rc = 0;
   char      str[512];
   TSlave   *sl;
   TObject  *obj;
   Int_t     what;
   Bool_t    delete_mess = kTRUE;
   if (s->Recv(mess) < 0) {
      MarkBad(s);
      return -1;
   }
   if (!mess) {
      
      MarkBad(s);
      return -1;
   }
   what = mess->What();
   PDB(kGlobal,3) {
      sl = FindSlave(s);
      Info("CollectInputFrom","got %d from %s", what, (sl ? sl->GetOrdinal() : "undef"));
   }
   switch (what) {
      case kMESS_OBJECT:
         fPlayer->HandleRecvHisto(mess);
         break;
      case kPROOF_FATAL:
         MarkBad(s);
         break;
      case kPROOF_GETOBJECT:
         
         mess->ReadString(str, sizeof(str));
         obj = gDirectory->Get(str);
         if (obj)
            s->SendObject(obj);
         else
            s->Send(kMESS_NOTOK);
         break;
      case kPROOF_GETPACKET:
         {
            TDSetElement *elem = 0;
            sl = FindSlave(s);
            elem = fPlayer->GetNextPacket(sl, mess);
            if (elem != (TDSetElement*) -1) {
               TMessage answ(kPROOF_GETPACKET);
               answ << elem;
               s->Send(answ);
               while (fWaitingSlaves != 0 && fWaitingSlaves->GetSize()) {
                  TPair *p = (TPair*) fWaitingSlaves->First();
                  s = (TSocket*) p->Key();
                  sl = FindSlave(s);
                  TMessage *m = (TMessage*) p->Value();
                  elem = fPlayer->GetNextPacket(sl, m);
                  if (elem != (TDSetElement*) -1) {
                     TMessage a(kPROOF_GETPACKET);
                     a << elem;
                     s->Send(a);
                     
                     
                     
                     fWaitingSlaves->Remove(fWaitingSlaves->FirstLink());
                     delete p;
                     delete m;
                  } else {
                     break;
                  }
               }
            } else {
               if (fWaitingSlaves == 0) fWaitingSlaves = new TList;
               fWaitingSlaves->Add(new TPair(s, mess));
               delete_mess = kFALSE;
            }
         }
         break;
      case kPROOF_LOGFILE:
         {
            Int_t size;
            (*mess) >> size;
            RecvLogFile(s, size);
         }
         break;
      case kPROOF_LOGDONE:
         sl = FindSlave(s);
         (*mess) >> sl->fStatus >> sl->fParallel;
         PDB(kGlobal,2)
            Info("CollectInputFrom","kPROOF_LOGDONE:%s: status %d  parallel %d",
                 sl->GetOrdinal(), sl->fStatus, sl->fParallel);
         if (sl->fStatus != 0) fStatus = sl->fStatus; 
         rc = 1;
         break;
      case kPROOF_GETSTATS:
         sl = FindSlave(s);
         (*mess) >> sl->fBytesRead >> sl->fRealTime >> sl->fCpuTime
                 >> sl->fWorkDir >> sl->fProofWorkDir;
         fBytesRead += sl->fBytesRead;
         fRealTime  += sl->fRealTime;
         fCpuTime   += sl->fCpuTime;
         rc = 1;
         break;
      case kPROOF_GETPARALLEL:
         sl = FindSlave(s);
         (*mess) >> sl->fParallel;
         rc = 1;
         break;
      case kPROOF_PACKAGE_LIST:
         {
            PDB(kGlobal,2) Info("CollectInputFrom","kPROOF_PACKAGE_LIST: enter");
            Int_t type = 0;
            (*mess) >> type;
            switch (type) {
            case TProof::kListEnabledPackages:
               SafeDelete(fEnabledPackages);
               fEnabledPackages = (TList *) mess->ReadObject(TList::Class());
               fEnabledPackages->SetOwner();
               break;
            case TProof::kListPackages:
               SafeDelete(fAvailablePackages);
               fAvailablePackages = (TList *) mess->ReadObject(TList::Class());
               fAvailablePackages->SetOwner();
               break;
            default:
               Info("CollectInputFrom","kPROOF_PACKAGE_LIST: unknown type: %d", type);
            }
         }
         break;
      case kPROOF_OUTPUTOBJECT:
         {
            PDB(kGlobal,2) Info("CollectInputFrom","kPROOF_OUTPUTOBJECT: enter");
            Int_t type = 0;
            (*mess) >> type;
            
            if (type == 0) {
               
               TQueryResult *pq =
                  (TQueryResult *) mess->ReadObject(TQueryResult::Class());
               if (pq) {
                  
                  fPlayer->AddQueryResult(pq);
                  fPlayer->SetCurrentQuery(pq);
                  
                  
                  fPlayer->AddInput(new TNamed("PROOF_QueryTag",
                                    Form("%s:%s",pq->GetTitle(),pq->GetName())));
               } else {
                  Warning("CollectInputFrom","kPROOF_OUTPUTOBJECT: query result missing");
               }
            } else if (type > 0) {
               
               TObject *obj = mess->ReadObject(TObject::Class());
               
               if ((fPlayer->AddOutputObject(obj) == 1))
                  
                  SafeDelete(obj);
               if (type > 1 && !IsMaster()) {
                  TQueryResult *pq = fPlayer->GetCurrentQuery();
                  pq->SetOutputList(fPlayer->GetOutputList(), kFALSE);
                  pq->SetInputList(fPlayer->GetInputList(), kFALSE);
                  
                  QueryResultReady(Form("%s:%s", pq->GetTitle(), pq->GetName()));
               }
            }
         }
         break;
      case kPROOF_OUTPUTLIST:
         {
            PDB(kGlobal,2) Info("CollectInputFrom","kPROOF_OUTPUTLIST: enter");
            TList *out = 0;
            if (IsMaster() || fProtocol < 7) {
               out = (TList *) mess->ReadObject(TList::Class());
            } else {
               TQueryResult *pq =
                  (TQueryResult *) mess->ReadObject(TQueryResult::Class());
               if (pq) {
                  
                  fPlayer->AddQueryResult(pq);
                  fPlayer->SetCurrentQuery(pq);
                  
                  
                  out = pq->GetOutputList();
                  CleanGDirectory(out);
                  out = (TList *) out->Clone();
                  
                  QueryResultReady(Form("%s:%s", pq->GetTitle(), pq->GetName()));
               } else {
                  PDB(kGlobal,2)
                     Info("CollectInputFrom","kPROOF_OUTPUTLIST: query result missing");
               }
            }
            if (out) {
               out->SetOwner();
               fPlayer->AddOutput(out); 
               SafeDelete(out);
            } else {
               PDB(kGlobal,2) Info("CollectInputFrom","kPROOF_OUTPUTLIST: ouputlist is empty");
            }
            
            if (!IsMaster()) {
               
               if (fPlayer->GetExitStatus() == TVirtualProofPlayer::kAborted) {
                  if (fSync)
                     Info("CollectInputFrom",
                          "processing was aborted - %lld events processed",
                          fPlayer->GetEventsProcessed());
                  if (GetRemoteProtocol() > 11) {
                     
                     Progress(-1, fPlayer->GetEventsProcessed(), -1, -1., -1., -1., -1.);
                  } else {
                     Progress(-1, fPlayer->GetEventsProcessed());
                  }
                  Emit("StopProcess(Bool_t)", kTRUE);
               }
               
               if (fPlayer->GetExitStatus() == TVirtualProofPlayer::kStopped) {
                  if (fSync)
                     Info("CollectInputFrom",
                          "processing was stopped - %lld events processed",
                          fPlayer->GetEventsProcessed());
                  if (GetRemoteProtocol() > 11) {
                     
                     Progress(-1, fPlayer->GetEventsProcessed(), -1, -1., -1., -1., -1.);
                  } else {
                     Progress(-1, fPlayer->GetEventsProcessed());
                  }
                  Emit("StopProcess(Bool_t)", kFALSE);
               }
               
               if (GetRemoteProtocol() > 11) {
                  
                  EmitVA("Progress(Long64_t,Long64_t,Long64_t,Float_t,Float_t,Float_t,Float_t,)",
                          7, (Long64_t)(-1), (Long64_t)(-1), (Long64_t)(-1),
                             (Float_t)(-1.),(Float_t)(-1.),(Float_t)(-1.),(Float_t)(-1.));
               } else {
                  EmitVA("Progress(Long64_t,Long64_t)", 2, (Long64_t)(-1), (Long64_t)(-1));
               }
            }
         }
         break;
      case kPROOF_QUERYLIST:
         {
            PDB(kGlobal,2) Info("CollectInputFrom","kPROOF_QUERYLIST: enter");
            (*mess) >> fOtherQueries >> fDrawQueries;
            if (fQueries) {
               fQueries->Delete();
               delete fQueries;
               fQueries = 0;
            }
            fQueries = (TList *) mess->ReadObject(TList::Class());
         }
         break;
      case kPROOF_RETRIEVE:
         {
            PDB(kGlobal,2) Info("CollectInputFrom","kPROOF_RETRIEVE: enter");
            TQueryResult *pq =
               (TQueryResult *) mess->ReadObject(TQueryResult::Class());
            if (pq) {
               fPlayer->AddQueryResult(pq);
               
               QueryResultReady(Form("%s:%s", pq->GetTitle(), pq->GetName()));
            } else {
               PDB(kGlobal,2)
                  Info("CollectInputFrom","kPROOF_RETRIEVE: query result missing");
            }
         }
         break;
      case kPROOF_MAXQUERIES:
         {
            PDB(kGlobal,2) Info("CollectInputFrom","kPROOF_MAXQUERIES: enter");
            Int_t max = 0;
            (*mess) >> max;
            Printf("Number of queries fully kept remotely: %d", max);
         }
         break;
      case kPROOF_SERVERSTARTED:
         {
            PDB(kGlobal,2) Info("CollectInputFrom","kPROOF_SERVERSTARTED: enter");
            UInt_t tot = 0, done = 0;
            TString action;
            Bool_t st = kTRUE;
            (*mess) >> action >> tot >> done >> st;
            if (!IsMaster()) {
               if (tot) {
                  TString type = (action.Contains("submas")) ? "submasters"
                                                             : "workers";
                  Int_t frac = (Int_t) (done*100.)/tot;
                  char msg[512] = {0};
                  if (frac >= 100) {
                     sprintf(msg,"%s: OK (%d %s)                 \n",
                             action.Data(),tot, type.Data());
                  } else {
                     sprintf(msg,"%s: %d out of %d (%d %%)\r",
                             action.Data(), done, tot, frac);
                  }
                  if (fSync)
                     fprintf(stderr,"%s", msg);
                  else
                     NotifyLogMsg(msg, 0);
               }
               
               StartupMessage(action.Data(), st, (Int_t)done, (Int_t)tot);
            } else {
               
               TMessage m(kPROOF_SERVERSTARTED);
               m << action << tot << done << st;
               gProofServ->GetSocket()->Send(m);
            }
         }
         break;
      case kPROOF_DATASET_STATUS:
         {
            PDB(kGlobal,2) Info("CollectInputFrom","kPROOF_DATASET_STATUS: enter");
            UInt_t tot = 0, done = 0;
            TString action;
            Bool_t st = kTRUE;
            (*mess) >> action >> tot >> done >> st;
            if (!IsMaster()) {
               if (tot) {
                  TString type = "files";
                  Int_t frac = (Int_t) (done*100.)/tot;
                  char msg[512] = {0};
                  if (frac >= 100) {
                     sprintf(msg,"%s: OK (%d %s)                 \n",
                             action.Data(),tot, type.Data());
                  } else {
                     sprintf(msg,"%s: %d out of %d (%d %%)\r",
                             action.Data(), done, tot, frac);
                  }
                  if (fSync)
                     fprintf(stderr,"%s", msg);
                  else
                     NotifyLogMsg(msg, 0);
               }
               
               DataSetStatus(action.Data(), st, (Int_t)done, (Int_t)tot);
            } else {
               
               TMessage m(kPROOF_DATASET_STATUS);
               m << action << tot << done << st;
               gProofServ->GetSocket()->Send(m);
            }
         }
         break;
      case kPROOF_STARTPROCESS:
         {
            PDB(kGlobal,2) Info("CollectInputFrom","kPROOF_STARTPROCESS: enter");
            fIdle = kFALSE;
            TString selec;
            Int_t dsz = -1;
            Long64_t first = -1, nent = -1;
            (*mess) >> selec >> dsz >> first >> nent;
            
            if (fProgressDialog && !TestBit(kUsingSessionGui)) {
               if (!fProgressDialogStarted) {
                  fProgressDialog->ExecPlugin(5, this,
                                              selec.Data(), dsz, first, nent);
                  fProgressDialogStarted = kTRUE;
               } else {
                  ResetProgressDialog(selec, dsz, first, nent);
               }
            }
            ResetBit(kUsingSessionGui);
         }
         break;
      case kPROOF_SETIDLE:
         {
            PDB(kGlobal,2) Info("CollectInputFrom","kPROOF_SETIDLE: enter");
            
            fIdle = kTRUE;
         }
         break;
      case kPROOF_QUERYSUBMITTED:
         {
            PDB(kGlobal,2) Info("CollectInputFrom","kPROOF_QUERYSUBMITTED: enter");
            
            (*mess) >> fSeqNum;
            rc = 1;
         }
         break;
      case kPROOF_SESSIONTAG:
         {
            PDB(kGlobal,2) Info("CollectInputFrom","kPROOF_SESSIONTAG: enter");
            
            TString stag;
            (*mess) >> stag;
            SetName(stag);
         }
         break;
      case kPROOF_FEEDBACK:
         {
            PDB(kGlobal,2) Info("CollectInputFrom","kPROOF_FEEDBACK: enter");
            TList *out = (TList *) mess->ReadObject(TList::Class());
            out->SetOwner();
            sl = FindSlave(s);
            if (fPlayer)
               fPlayer->StoreFeedback(sl, out); 
            else
               
               rc = 1;
         }
         break;
      case kPROOF_AUTOBIN:
         {
            PDB(kGlobal,2) Info("CollectInputFrom","kPROOF_AUTOBIN: enter");
            TString name;
            Double_t xmin, xmax, ymin, ymax, zmin, zmax;
            (*mess) >> name >> xmin >> xmax >> ymin >> ymax >> zmin >> zmax;
            fPlayer->UpdateAutoBin(name,xmin,xmax,ymin,ymax,zmin,zmax);
            TMessage answ(kPROOF_AUTOBIN);
            answ << name << xmin << xmax << ymin << ymax << zmin << zmax;
            s->Send(answ);
         }
         break;
      case kPROOF_PROGRESS:
         {
            PDB(kGlobal,2) Info("CollectInputFrom","kPROOF_PROGRESS: enter");
            sl = FindSlave(s);
            if (GetRemoteProtocol() > 11) {
               
               Long64_t total, processed, bytesread;
               Float_t initTime, procTime, evtrti, mbrti;
               (*mess) >> total >> processed >> bytesread
                       >> initTime >> procTime
                       >> evtrti >> mbrti;
               fPlayer->Progress(total, processed, bytesread,
                                 initTime, procTime, evtrti, mbrti);
            } else {
               
               Long64_t total, processed;
               (*mess) >> total >> processed;
               fPlayer->Progress(sl, total, processed);
            }
         }
         break;
      case kPROOF_STOPPROCESS:
         {
            
            PDB(kGlobal,2) Info("CollectInputFrom","kPROOF_STOPPROCESS: enter");
            Long64_t events;
            Bool_t abort = kFALSE;
            if ((mess->BufferSize() > mess->Length()) && (fProtocol > 8))
               (*mess) >> events >> abort;
            else
               (*mess) >> events;
            if (!abort) {
               fPlayer->AddEventsProcessed(events);
            } else if (IsMaster()) {
               fPlayer->StopProcess(kTRUE);
            }
            if (!IsMaster())
               Emit("StopProcess(Bool_t)", abort);
            break;
         }
      case kPROOF_GETSLAVEINFO:
         {
            PDB(kGlobal,2) Info("CollectInputFrom","kPROOF_GETSLAVEINFO: enter");
            sl = FindSlave(s);
            Bool_t active = (GetListOfActiveSlaves()->FindObject(sl) != 0);
            Bool_t bad = (GetListOfBadSlaves()->FindObject(sl) != 0);
            TList* tmpinfo = 0;
            (*mess) >> tmpinfo;
            tmpinfo->SetOwner(kFALSE);
            Int_t nentries = tmpinfo->GetSize();
            for (Int_t i=0; i<nentries; i++) {
               TSlaveInfo* slinfo =
                  dynamic_cast<TSlaveInfo*>(tmpinfo->At(i));
               if (slinfo) {
                  fSlaveInfo->Add(slinfo);
                  if (slinfo->fStatus != TSlaveInfo::kBad) {
                     if (!active) slinfo->SetStatus(TSlaveInfo::kNotActive);
                     if (bad) slinfo->SetStatus(TSlaveInfo::kBad);
                  }
                  if (!sl->GetMsd().IsNull()) slinfo->fMsd = sl->GetMsd();
               }
            }
            delete tmpinfo;
            rc = 1;
         }
         break;
      case kPROOF_VALIDATE_DSET:
         {
            PDB(kGlobal,2) Info("CollectInputFrom","kPROOF_VALIDATE_DSET: enter");
            TDSet* dset = 0;
            (*mess) >> dset;
            if (!fDSet)
               Error("CollectInputFrom","kPROOF_VALIDATE_DSET: fDSet not set");
            else
               fDSet->Validate(dset);
            delete dset;
         }
         break;
      case kPROOF_DATA_READY:
         {
            PDB(kGlobal,2) Info("CollectInputFrom","kPROOF_DATA_READY: enter");
            Bool_t dataready = kFALSE;
            Long64_t totalbytes, bytesready;
            (*mess) >> dataready >> totalbytes >> bytesready;
            fTotalBytes += totalbytes;
            fBytesReady += bytesready;
            if (dataready == kFALSE) fDataReady = dataready;
         }
         break;
      case kPROOF_PING:
         
         break;
      case kPROOF_MESSAGE:
         {
            PDB(kGlobal,2) Info("CollectInputFrom","kPROOF_MESSAGE: enter");
            
            TString msg;
            (*mess) >> msg;
            Bool_t lfeed = kTRUE;
            if ((mess->BufferSize() > mess->Length()))
               (*mess) >> lfeed;
            if (!IsMaster()) {
               if (fSync) {
                  
                  fprintf(stderr,"%s%c", msg.Data(), (lfeed ? '\n' : '\r'));
               } else {
                  
                  NotifyLogMsg(msg, (lfeed ? "\n" : "\r"));
               }
            } else {
               
               fprintf(stderr,"%s%c", msg.Data(), (lfeed ? '\n' : '\r'));
               if (gProofServ) {
                  
                  gProofServ->FlushLogFile();
                  
                  gProofServ->SendAsynMessage(msg, lfeed);
               }
            }
         }
         break;
      case kPROOF_VERSARCHCOMP:
         {
            TString vac;
            (*mess) >> vac;
            PDB(kGlobal,2) Info("CollectInputFrom","kPROOF_VERSARCHCOMP: %s", vac.Data());
            Int_t from = 0;
            TString vers, archcomp;
            if (vac.Tokenize(vers, from, "|"))
               vac.Tokenize(archcomp, from, "|");
            if ((sl = FindSlave(s))) {
               sl->SetArchCompiler(archcomp);
               sl->SetROOTVersion(vers);
            }
         }
         break;
      default:
         Error("Collect", "unknown command received from slave (what = %d)", what);
         break;
   }
   
   if (delete_mess)
      delete mess;
   
   return rc;
}
void TProof::ActivateAsyncInput()
{
   
   TIter next(fSlaves);
   TSlave *sl;
   while ((sl = (TSlave*) next()))
      if (sl->GetInputHandler())
         sl->GetInputHandler()->Add();
}
void TProof::DeActivateAsyncInput()
{
   
   TIter next(fSlaves);
   TSlave *sl;
   while ((sl = (TSlave*) next()))
      if (sl->GetInputHandler())
         sl->GetInputHandler()->Remove();
}
void TProof::HandleAsyncInput(TSocket *sl)
{
   
   
   
   
   TMessage *mess;
   Int_t     what;
   if (sl->Recv(mess) <= 0)
      return;                
   what = mess->What();
   switch (what) {
      case kPROOF_PING:
         
         break;
      default:
         Error("HandleAsyncInput", "unknown command (what = %d)", what);
         break;
   }
   delete mess;
}
void TProof::MarkBad(TSlave *sl)
{
   
   
   fActiveSlaves->Remove(sl);
   FindUniqueSlaves();
   fBadSlaves->Add(sl);
   fAllMonitor->Remove(sl->GetSocket());
   fActiveMonitor->Remove(sl->GetSocket());
   sl->Close();
   fSendGroupView = kTRUE;
   
   SaveWorkerInfo();
}
void TProof::MarkBad(TSocket *s)
{
   
   
   TSlave *sl = FindSlave(s);
   MarkBad(sl);
}
Int_t TProof::Ping()
{
   
   return Ping(kActive);
}
Int_t TProof::Ping(ESlaves list)
{
   
   TList *slaves = 0;
   if (list == kAll)       slaves = fSlaves;
   if (list == kActive)    slaves = fActiveSlaves;
   if (list == kUnique)    slaves = fUniqueSlaves;
   if (list == kAllUnique) slaves = fAllUniqueSlaves;
   if (slaves->GetSize() == 0) return 0;
   int   nsent = 0;
   TIter next(slaves);
   TSlave *sl;
   while ((sl = (TSlave *)next())) {
      if (sl->IsValid()) {
         if (sl->Ping() == -1)
            MarkBad(sl);
         else
            nsent++;
      }
   }
   return nsent;
}
void TProof::Print(Option_t *option) const
{
   
   TString secCont;
   if (!IsMaster()) {
      Printf("Connected to:             %s (%s)", GetMaster(),
                                             IsValid() ? "valid" : "invalid");
      Printf("Port number:              %d", GetPort());
      Printf("User:                     %s", GetUser());
      Printf("ROOT version:             %s", gROOT->GetVersion());
      Printf("Architecture-Compiler:    %s-%s", gSystem->GetBuildArch(),
                                                gSystem->GetBuildCompilerVersion());
      TSlave *sl = (TSlave *)fActiveSlaves->First();
      if (sl) {
         TString sc;
         if (sl->GetSocket()->GetSecContext())
            Printf("Security context:         %s",
                                      sl->GetSocket()->GetSecContext()->AsString(sc));
         Printf("Proofd protocol version:  %d", sl->GetSocket()->GetRemoteProtocol());
      } else {
         Printf("Security context:         Error - No connection");
         Printf("Proofd protocol version:  Error - No connection");
      }
      Printf("Client protocol version:  %d", GetClientProtocol());
      Printf("Remote protocol version:  %d", GetRemoteProtocol());
      Printf("Log level:                %d", GetLogLevel());
      Printf("Session unique tag:       %s", IsValid() ? GetSessionTag() : "");
      Printf("Default data pool:        %s", IsValid() ? GetDataPoolUrl() : "");
      if (IsValid())
         const_cast<TProof*>(this)->SendPrint(option);
   } else {
      const_cast<TProof*>(this)->AskStatistics();
      if (IsParallel())
         Printf("*** Master server %s (parallel mode, %d slaves):",
                gProofServ->GetOrdinal(), GetParallel());
      else
         Printf("*** Master server %s (sequential mode):",
                gProofServ->GetOrdinal());
      Printf("Master host name:           %s", gSystem->HostName());
      Printf("Port number:                %d", GetPort());
      if (strlen(gProofServ->GetGroup()) > 0) {
         Printf("User/Group:                 %s/%s", GetUser(), gProofServ->GetGroup());
      } else {
         Printf("User:                       %s", GetUser());
      }
      if (gSystem->Getenv("ROOTVERSIONTAG"))
         Printf("ROOT version:               %s-%s", gROOT->GetVersion(),
                                                     gSystem->Getenv("ROOTVERSIONTAG"));
      else
         Printf("ROOT version:               %s", gROOT->GetVersion());
      Printf("Architecture-Compiler:      %s-%s", gSystem->GetBuildArch(),
                                                  gSystem->GetBuildCompilerVersion());
      Printf("Protocol version:           %d", GetClientProtocol());
      Printf("Image name:                 %s", GetImage());
      Printf("Working directory:          %s", gSystem->WorkingDirectory());
      Printf("Config directory:           %s", GetConfDir());
      Printf("Config file:                %s", GetConfFile());
      Printf("Log level:                  %d", GetLogLevel());
      Printf("Number of workers:          %d", GetNumberOfSlaves());
      Printf("Number of active workers:   %d", GetNumberOfActiveSlaves());
      Printf("Number of unique workers:   %d", GetNumberOfUniqueSlaves());
      Printf("Number of inactive workers: %d", GetNumberOfInactiveSlaves());
      Printf("Number of bad workers:      %d", GetNumberOfBadSlaves());
      Printf("Total MB's processed:       %.2f", float(GetBytesRead())/(1024*1024));
      Printf("Total real time used (s):   %.3f", GetRealTime());
      Printf("Total CPU time used (s):    %.3f", GetCpuTime());
      if (TString(option).Contains("a", TString::kIgnoreCase) && GetNumberOfSlaves()) {
         Printf("List of workers:");
         TList masters;
         TIter nextslave(fSlaves);
         while (TSlave* sl = dynamic_cast<TSlave*>(nextslave())) {
            if (!sl->IsValid()) continue;
            if (sl->GetSlaveType() == TSlave::kSlave) {
               sl->Print(option);
            } else if (sl->GetSlaveType() == TSlave::kMaster) {
               TMessage mess(kPROOF_PRINT);
               mess.WriteString(option);
               if (sl->GetSocket()->Send(mess) == -1)
                  const_cast<TProof*>(this)->MarkBad(sl);
               else
                  masters.Add(sl);
            } else {
               Error("Print", "TSlave is neither Master nor Worker");
               R__ASSERT(0);
            }
         }
         const_cast<TProof*>(this)->Collect(&masters);
      }
   }
}
Long64_t TProof::Process(TDSet *dset, const char *selector, Option_t *option,
                         Long64_t nentries, Long64_t first, TEventList *evl)
{
   
   
   
   if (!IsValid()) return -1;
   
   fSync = (GetQueryMode(option) == kSync);
   if (fSync && !IsIdle()) {
      Info("Process","not idle, cannot submit synchronous query");
      return -1;
   }
   
   
   TSignalHandler *sh = 0;
   if (fSync) {
      if (gApplication)
         sh = gSystem->RemoveSignalHandler(gApplication->GetSignalHandler());
   }
   Long64_t rv = fPlayer->Process(dset, selector, option, nentries, first, evl);
   if (fSync) {
      
      if (sh)
         gSystem->AddSignalHandler(sh);
   }
   return rv;
}
Long64_t TProof::Process(const char *dsetname, const char *selector,
                         Option_t *option, Long64_t nentries,
                         Long64_t first, TEventList *evl)
{
   
   
   
   
   
   
   
   
   
   
   
   
   if (fProtocol < 13) {
      Info("Process", "processing 'by name' not supported by the server");
      return -1;
   }
   TString name(dsetname);
   TString obj;
   TString dir = "/";
   Int_t idxc = name.Index("#");
   if (idxc != kNPOS) {
      Int_t idxs = name.Index("/", 1, idxc, TString::kExact);
      if (idxs != kNPOS && idxc != kNPOS) {
         obj = name(idxs+1, name.Length());
         dir = name(idxc+1, name.Length());
         dir.Remove(dir.Index("/") + 1);
         name.Remove(idxc);
      } else if (idxc != kNPOS && idxs == kNPOS) {
         obj = name(idxc+1, name.Length());
         name.Remove(idxc);
      } else if (idxs != kNPOS && idxc == kNPOS) {
         Error("Process", "bad name syntax (%s): specification of additional"
                          " attributes needs a '#' after the dataset name", dsetname);
         return -1;
      }
   } else if (name.Index(":") != kNPOS && name.Index("://") == kNPOS) {
      
      Error("Process", "bad name syntax (%s): please use"
                       " a '#' after the dataset name", dsetname);
      return -1;
   }
   TDSet *dset = new TDSet(name, obj, dir);
   Long64_t retval = Process(dset, selector, option, nentries, first, evl);
   delete dset;
   return retval;
}
Int_t TProof::GetQueryReference(Int_t qry, TString &ref)
{
   
   
   ref = "";
   if (qry > 0) {
      if (!fQueries)
         GetListOfQueries();
      if (fQueries) {
         TIter nxq(fQueries);
         TQueryResult *qr = 0;
         while ((qr = (TQueryResult *) nxq()))
            if (qr->GetSeqNum() == qry) {
               ref = Form("%s:%s", qr->GetTitle(), qr->GetName());
               return 0;
            }
      }
   }
   return -1;
}
Long64_t TProof::Finalize(Int_t qry, Bool_t force)
{
   
   
   
   
   
   if (fPlayer) {
      if (qry > 0) {
         TString ref;
         if (GetQueryReference(qry, ref) == 0) {
            return Finalize(ref, force);
         } else {
            Info("Finalize", "query #%d not found", qry);
         }
      } else {
         
         return fPlayer->Finalize(force);
      }
   }
   return -1;
}
Long64_t TProof::Finalize(const char *ref, Bool_t force)
{
   
   
   
   
   
   if (fPlayer) {
      if (ref) {
         
         TQueryResult *qr = fPlayer->GetQueryResult(ref);
         
         Bool_t retrieve = kFALSE;
         if (!qr) {
            retrieve = kTRUE;
         } else {
            if (qr->IsFinalized()) {
               if (force) {
                  retrieve = kTRUE;
               } else {
                  Info("Finalize","query already finalized:"
                       " use Finalize(<qry>,kTRUE) to force new retrieval");
                  qr = 0;
               }
            }
         }
         if (retrieve) {
            Retrieve(ref);
            qr = fPlayer->GetQueryResult(ref);
         }
         if (qr)
            return fPlayer->Finalize(qr);
      }
   }
   return -1;
}
Int_t TProof::Retrieve(Int_t qry, const char *path)
{
   
   
   if (qry > 0) {
      TString ref;
      if (GetQueryReference(qry, ref) == 0)
         return Retrieve(ref, path);
      else
         Info("Retrieve", "query #%d not found", qry);
   } else {
      Info("Retrieve","positive argument required - do nothing");
   }
   return -1;
}
Int_t TProof::Retrieve(const char *ref, const char *path)
{
   
   
   
   if (ref) {
      TMessage m(kPROOF_RETRIEVE);
      m << TString(ref);
      Broadcast(m, kActive);
      Collect(kActive);
      
      if (path) {
         
         TQueryResult *qr = fPlayer ? fPlayer->GetQueryResult(ref) : 0;
         if (qr) {
            TFile *farc = TFile::Open(path,"UPDATE");
            if (!(farc->IsOpen())) {
               Info("Retrieve", "archive file cannot be open (%s)", path);
               return 0;
            }
            farc->cd();
            
            qr->SetArchived(path);
            
            qr->Write();
            farc->Close();
            SafeDelete(farc);
         } else {
            Info("Retrieve", "query not found after retrieve");
            return -1;
         }
      }
      return 0;
   }
   return -1;
}
Int_t TProof::Remove(Int_t qry, Bool_t all)
{
   
   if (qry > 0) {
      TString ref;
      if (GetQueryReference(qry, ref) == 0)
         return Remove(ref, all);
      else
         Info("Remove", "query #%d not found", qry);
   } else {
      Info("Remove","positive argument required - do nothing");
   }
   return -1;
}
Int_t TProof::Remove(const char *ref, Bool_t all)
{
   
   
   
   
   
   if (all) {
      
      if (fPlayer)
         fPlayer->RemoveQueryResult(ref);
   }
   if (ref) {
      TMessage m(kPROOF_REMOVE);
      m << TString(ref);
      Broadcast(m, kActive);
      Collect(kActive);
      return 0;
   }
   return -1;
}
Int_t TProof::Archive(Int_t qry, const char *path)
{
   
   if (qry > 0) {
      TString ref;
      if (GetQueryReference(qry, ref) == 0)
         return Archive(ref, path);
      else
         Info("Archive", "query #%d not found", qry);
   } else {
      Info("Archive","positive argument required - do nothing");
   }
   return -1;
}
Int_t TProof::Archive(const char *ref, const char *path)
{
   
   
   
   
   if (ref) {
      TMessage m(kPROOF_ARCHIVE);
      m << TString(ref) << TString(path);
      Broadcast(m, kActive);
      Collect(kActive);
      return 0;
   }
   return -1;
}
Int_t TProof::CleanupSession(const char *sessiontag)
{
   
   if (sessiontag) {
      TMessage m(kPROOF_CLEANUPSESSION);
      m << TString(sessiontag);
      Broadcast(m, kActive);
      Collect(kActive);
      return 0;
   }
   return -1;
}
void TProof::SetQueryMode(EQueryMode mode)
{
   
   fQueryMode = mode;
   if (gDebug > 0)
      Info("SetQueryMode","query mode is set to: %s", fQueryMode == kSync ?
           "Sync" : "Async");
}
TProof::EQueryMode TProof::GetQueryMode(Option_t *mode) const
{
   
   EQueryMode qmode = fQueryMode;
   if (mode && (strlen(mode) > 0)) {
      TString m(mode);
      m.ToUpper();
      if (m.Contains("ASYN")) {
         qmode = kAsync;
      } else if (m.Contains("SYNC")) {
         qmode = kSync;
      }
   }
   if (gDebug > 0)
      Info("GetQueryMode","query mode is set to: %s", qmode == kSync ?
           "Sync" : "Async");
   return qmode;
}
Long64_t TProof::DrawSelect(TDSet *dset, const char *varexp,
                            const char *selection, Option_t *option,
                            Long64_t nentries, Long64_t first)
{
   
   
   if (!IsValid()) return -1;
   
   if (!IsIdle()) {
      Info("DrawSelect","not idle, asynchronous Draw not supported");
      return -1;
   }
   TString opt(option);
   Int_t idx = opt.Index("ASYN", 0, TString::kIgnoreCase);
   if (idx != kNPOS)
      opt.Replace(idx,4,"");
   return fPlayer->DrawSelect(dset, varexp, selection, opt, nentries, first);
}
void TProof::StopProcess(Bool_t abort, Int_t timeout)
{
   
   PDB(kGlobal,2)
      Info("StopProcess","enter %d", abort);
   if (!IsValid())
      return;
   if (fPlayer)
      fPlayer->StopProcess(abort, timeout);
   
   if (!IsMaster())
      InterruptCurrentMonitor();
   if (fSlaves->GetSize() == 0)
      return;
   
   TSlave *sl;
   TIter   next(fSlaves);
   while ((sl = (TSlave *)next()))
      if (sl->IsValid())
         
         sl->StopProcess(abort, timeout);
}
void TProof::RecvLogFile(TSocket *s, Int_t size)
{
   
   const Int_t kMAXBUF = 16384;  
   char buf[kMAXBUF];
   
   Int_t fdout = -1;
   if (!fLogToWindowOnly) {
      fdout = (fRedirLog) ? fileno(fLogFileW) : fileno(stdout);
      if (fdout < 0) {
         Warning("RecvLogFile", "file descriptor for outputs undefined (%d):"
                 " will not log msgs", fdout);
         return;
      }
      lseek(fdout, (off_t) 0, SEEK_END);
   }
   Int_t  left, rec, r;
   Long_t filesize = 0;
   while (filesize < size) {
      left = Int_t(size - filesize);
      if (left > kMAXBUF)
         left = kMAXBUF;
      rec = s->RecvRaw(&buf, left);
      filesize = (rec > 0) ? (filesize + rec) : filesize;
      if (!fLogToWindowOnly) {
         if (rec > 0) {
            char *p = buf;
            r = rec;
            while (r) {
               Int_t w;
               w = write(fdout, p, r);
               if (w < 0) {
                  SysError("RecvLogFile", "error writing to unit: %d", fdout);
                  break;
               }
               r -= w;
               p += w;
            }
         } else if (rec < 0) {
            Error("RecvLogFile", "error during receiving log file");
            break;
         }
      }
      if (rec > 0) {
         buf[rec] = 0;
         EmitVA("LogMessage(const char*,Bool_t)", 2, buf, kFALSE);
      }
   }
   
   if (fRedirLog && IsIdle())
      fRedirLog = kFALSE;
}
void TProof::NotifyLogMsg(const char *msg, const char *sfx)
{
   
   
   
   Int_t len = 0;
   if (!msg || (len = strlen(msg)) <= 0)
      return;
   
   Int_t lsfx = (sfx) ? strlen(sfx) : 0;
   
   Int_t fdout = -1;
   if (!fLogToWindowOnly) {
      fdout = (fRedirLog) ? fileno(fLogFileW) : fileno(stdout);
      if (fdout < 0) {
         Warning("NotifyLogMsg", "file descriptor for outputs undefined (%d):"
                 " will not notify msgs", fdout);
         return;
      }
      lseek(fdout, (off_t) 0, SEEK_END);
   }
   if (!fLogToWindowOnly) {
      
      if (len > 0) {
         char *p = (char *)msg;
         Int_t r = len;
         while (r) {
            Int_t w = write(fdout, p, r);
            if (w < 0) {
               SysError("NotifyLogMsg", "error writing to unit: %d", fdout);
               break;
            }
            r -= w;
            p += w;
         }
         
         if (lsfx > 0)
            write(fdout, sfx, lsfx);
      }
   }
   if (len > 0) {
      
      
      EmitVA("LogMessage(const char*,Bool_t)", 2, msg, kFALSE);
   }
   
   if (fRedirLog && IsIdle())
      fRedirLog = kFALSE;
}
void TProof::LogMessage(const char *msg, Bool_t all)
{
   
   PDB(kGlobal,1)
      Info("LogMessage","Enter ... %s, 'all: %s", msg ? msg : "",
           all ? "true" : "false");
   if (gROOT->IsBatch()) {
      PDB(kGlobal,1) Info("LogMessage","GUI not started - use TProof::ShowLog()");
      return;
   }
   if (msg)
      EmitVA("LogMessage(const char*,Bool_t)", 2, msg, all);
   
   
   
   if (all)
      lseek(fileno(fLogFileR), (off_t) 0, SEEK_SET);
   const Int_t kMAXBUF = 32768;
   char buf[kMAXBUF];
   Int_t len;
   do {
      while ((len = read(fileno(fLogFileR), buf, kMAXBUF-1)) < 0 &&
             TSystem::GetErrno() == EINTR)
         TSystem::ResetErrno();
      if (len < 0) {
         Error("LogMessage", "error reading log file");
         break;
      }
      if (len > 0) {
         buf[len] = 0;
         EmitVA("LogMessage(const char*,Bool_t)", 2, buf, kFALSE);
      }
   } while (len > 0);
}
Int_t TProof::SendGroupView()
{
   
   
   
   if (!IsValid()) return -1;
   if (!IsMaster()) return 0;
   if (!fSendGroupView) return 0;
   fSendGroupView = kFALSE;
   TIter   next(fActiveSlaves);
   TSlave *sl;
   int  bad = 0, cnt = 0, size = GetNumberOfActiveSlaves();
   char str[32];
   while ((sl = (TSlave *)next())) {
      sprintf(str, "%d %d", cnt, size);
      if (sl->GetSocket()->Send(str, kPROOF_GROUPVIEW) == -1) {
         MarkBad(sl);
         bad++;
      } else
         cnt++;
   }
   
   
   if (bad) SendGroupView();
   return GetNumberOfActiveSlaves();
}
Int_t TProof::Exec(const char *cmd, Bool_t plusMaster)
{
   
   
   
   
   
   
   return Exec(cmd, kActive, plusMaster);
}
R__HIDDEN Int_t TProof::Exec(const char *cmd, ESlaves list, Bool_t plusMaster)
{
   
   
   
   
   
   if (!IsValid()) return -1;
   TString s = cmd;
   s = s.Strip(TString::kBoth);
   if (!s.Length()) return 0;
   
   if (s.BeginsWith(".L") || s.BeginsWith(".x") || s.BeginsWith(".X")) {
      TString file = s(2, s.Length());
      TString acm, arg, io;
      TString filename = gSystem->SplitAclicMode(file, acm, arg, io);
      char *fn = gSystem->Which(TROOT::GetMacroPath(), filename, kReadPermission);
      if (fn) {
         if (GetNumberOfUniqueSlaves() > 0) {
            if (SendFile(fn, kAscii | kForward) < 0) {
               Error("Exec", "file %s could not be transfered", fn);
               delete [] fn;
               return -1;
            }
         } else {
            TString scmd = s(0,3) + fn;
            Int_t n = SendCommand(scmd, list);
            delete [] fn;
            return n;
         }
      } else {
         Error("Exec", "macro %s not found", file.Data());
         return -1;
      }
      delete [] fn;
   }
   if (plusMaster) {
      Int_t n = GetParallel();
      SetParallelSilent(0);
      Int_t res = SendCommand(cmd, list);
      SetParallelSilent(n);
      if (res < 0)
         return res;
   }
   return SendCommand(cmd, list);
}
Int_t TProof::SendCommand(const char *cmd, ESlaves list)
{
   
   
   
   
   
   
   
   if (!IsValid()) return -1;
   Broadcast(cmd, kMESS_CINT, list);
   Collect(list);
   return fStatus;
}
Int_t TProof::SendCurrentState(ESlaves list)
{
   
   
   
   if (!IsValid()) return -1;
   
   
   Broadcast(gDirectory->GetPath(), kPROOF_RESET, list);
   return GetParallel();
}
Int_t TProof::SendInitialState()
{
   
   
   
   if (!IsValid()) return -1;
   SetLogLevel(fLogLevel, gProofDebugMask);
   return GetNumberOfActiveSlaves();
}
Bool_t TProof::CheckFile(const char *file, TSlave *slave, Long_t modtime)
{
   
   
   
   
   
   
   
   
   
   
   
   Bool_t sendto = kFALSE;
   
   TString sn = slave->GetName();
   sn += ":";
   sn += slave->GetOrdinal();
   sn += ":";
   sn += gSystem->BaseName(file);
   
   FileMap_t::const_iterator it;
   if ((it = fFileMap.find(sn)) != fFileMap.end()) {
      
      MD5Mod_t md = (*it).second;
      if (md.fModtime != modtime) {
         TMD5 *md5 = TMD5::FileChecksum(file);
         if (md5) {
            if ((*md5) != md.fMD5) {
               sendto       = kTRUE;
               md.fMD5      = *md5;
               md.fModtime  = modtime;
               fFileMap[sn] = md;
               
               
               
               
               
               if (IsMaster()) {
                  sendto = kFALSE;
                  TMessage mess(kPROOF_CHECKFILE);
                  mess << TString(gSystem->BaseName(file)) << md.fMD5;
                  slave->GetSocket()->Send(mess);
                  TMessage *reply;
                  slave->GetSocket()->Recv(reply);
                  if (reply->What() != kPROOF_CHECKFILE)
                     sendto = kTRUE;
                  delete reply;
               }
            }
            delete md5;
         } else {
            Error("CheckFile", "could not calculate local MD5 check sum - dont send");
            return kFALSE;
         }
      }
   } else {
      
      TMD5 *md5 = TMD5::FileChecksum(file);
      MD5Mod_t md;
      if (md5) {
         md.fMD5      = *md5;
         md.fModtime  = modtime;
         fFileMap[sn] = md;
         delete md5;
      } else {
         Error("CheckFile", "could not calculate local MD5 check sum - dont send");
         return kFALSE;
      }
      TMessage mess(kPROOF_CHECKFILE);
      mess << TString(gSystem->BaseName(file)) << md.fMD5;
      slave->GetSocket()->Send(mess);
      TMessage *reply;
      slave->GetSocket()->Recv(reply);
      sendto = (!reply || reply->What() != kPROOF_CHECKFILE) ? kTRUE : kFALSE;
      if (reply)
         delete reply;
      else
         Error("CheckFile", "received empty message from worker: %s", slave->GetName());
   }
   return sendto;
}
Int_t TProof::SendFile(const char *file, Int_t opt, const char *rfile, TSlave *wrk)
{
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   if (!IsValid()) return -1;
   
   TList *slaves = fActiveSlaves;
   
   if (wrk) {
      slaves = new TList();
      slaves->Add(wrk);
   }
   if (slaves->GetSize() == 0) return 0;
#ifndef R__WIN32
   Int_t fd = open(file, O_RDONLY);
#else
   Int_t fd = open(file, O_RDONLY | O_BINARY);
#endif
   if (fd < 0) {
      SysError("SendFile", "cannot open file %s", file);
      return -1;
   }
   
   Long64_t size;
   Long_t id, flags, modtime;
   if (gSystem->GetPathInfo(file, &id, &size, &flags, &modtime) == 1) {
      Error("SendFile", "cannot stat file %s", file);
      return -1;
   }
   if (size == 0) {
      Error("SendFile", "empty file %s", file);
      return -1;
   }
   
   Bool_t bin   = (opt & kBinary)  ? kTRUE : kFALSE;
   Bool_t force = (opt & kForce)   ? kTRUE : kFALSE;
   Bool_t fw    = (opt & kForward) ? kTRUE : kFALSE;
   const Int_t kMAXBUF = 32768;  
   char buf[kMAXBUF];
   Int_t nsl = 0;
   TIter next(slaves);
   TSlave *sl;
   const char *fnam = (rfile) ? rfile : gSystem->BaseName(file);
   while ((sl = (TSlave *)next())) {
      if (!sl->IsValid())
         continue;
      Bool_t sendto = force ? kTRUE : CheckFile(file, sl, modtime);
      
      
      
      if (sl->fSlaveType == TSlave::kSlave && !sendto)
         continue;
      
      
      size = sendto ? size : 0;
      PDB(kPackage,2)
         if (size > 0) {
            if (!nsl)
               Info("SendFile", "sending file %s to:", file);
            printf("   slave = %s:%s\n", sl->GetName(), sl->GetOrdinal());
         }
      sprintf(buf, "%s %d %lld %d", fnam, bin, size, fw);
      if (sl->GetSocket()->Send(buf, kPROOF_SENDFILE) == -1) {
         MarkBad(sl);
         continue;
      }
      if (!sendto)
         continue;
      lseek(fd, 0, SEEK_SET);
      Int_t len;
      do {
         while ((len = read(fd, buf, kMAXBUF)) < 0 && TSystem::GetErrno() == EINTR)
            TSystem::ResetErrno();
         if (len < 0) {
            SysError("SendFile", "error reading from file %s", file);
            Interrupt(kSoftInterrupt, kActive);
            close(fd);
            return -1;
         }
         if (len > 0 && sl->GetSocket()->SendRaw(buf, len) == -1) {
            SysError("SendFile", "error writing to slave %s:%s (now offline)",
                     sl->GetName(), sl->GetOrdinal());
            MarkBad(sl);
            break;
         }
      } while (len > 0);
      nsl++;
   }
   close(fd);
   
   if (slaves != fActiveSlaves)
      SafeDelete(slaves);
   return nsl;
}
Int_t TProof::SendObject(const TObject *obj, ESlaves list)
{
   
   
   if (!IsValid() || !obj) return -1;
   TMessage mess(kMESS_OBJECT);
   mess.WriteObject(obj);
   return Broadcast(mess, list);
}
Int_t TProof::SendPrint(Option_t *option)
{
   
   
   if (!IsValid()) return -1;
   Broadcast(option, kPROOF_PRINT, kActive);
   return Collect(kActive);
}
void TProof::SetLogLevel(Int_t level, UInt_t mask)
{
   
   char str[32];
   fLogLevel        = level;
   gProofDebugLevel = level;
   gProofDebugMask  = (TProofDebug::EProofDebugMask) mask;
   sprintf(str, "%d %u", level, mask);
   Broadcast(str, kPROOF_LOGLEVEL, kAll);
}
void TProof::SetRealTimeLog(Bool_t on)
{
   
   
   
   
   
   if (IsValid()) {
      TMessage mess(kPROOF_REALTIMELOG);
      mess << on;
      Broadcast(mess);
   } else {
      Warning("SetRealTimeLog","session is invalid - do nothing");
   }
}
Int_t TProof::SetParallelSilent(Int_t nodes, Bool_t random)
{
   
   
   
   if (!IsValid()) return -1;
   if (IsMaster()) {
      GoParallel(nodes, kFALSE, random);
      return SendCurrentState();
   } else {
      PDB(kGlobal,1) Info("SetParallelSilent", "request %d node%s", nodes,
          nodes == 1 ? "" : "s");
      TMessage mess(kPROOF_PARALLEL);
      mess << nodes << random;
      Broadcast(mess);
      Collect();
      Int_t n = GetParallel();
      PDB(kGlobal,1) Info("SetParallelSilent", "got %d node%s", n, n == 1 ? "" : "s");
      return n;
   }
}
Int_t TProof::SetParallel(Int_t nodes, Bool_t random)
{
   
   
   Int_t n = SetParallelSilent(nodes, random);
   if (!IsMaster()) {
      if (n < 1) {
         Printf("PROOF set to sequential mode");
      } else {
         TString subfix = (n == 1) ? "" : "s";
         if (random)
            subfix += ", randomly selected";
         Printf("PROOF set to parallel mode (%d worker%s)", n, subfix.Data());
      }
   }
   return n;
}
Int_t TProof::GoParallel(Int_t nodes, Bool_t attach, Bool_t random)
{
   
   
   
   
   
   
   if (!IsValid()) return -1;
   if (nodes < 0) nodes = 0;
   fActiveSlaves->Clear();
   fActiveMonitor->RemoveAll();
   
   
   TSlave *sl = 0;
   TList *wlst = new TList;
   TIter nxt(fSlaves);
   fInactiveSlaves->Clear();
   while ((sl = (TSlave *)nxt())) {
      if (sl->IsValid() && !fBadSlaves->FindObject(sl)) {
         if (strcmp("IGNORE", sl->GetImage()) == 0) continue;
         if ((sl->GetSlaveType() != TSlave::kSlave) &&
             (sl->GetSlaveType() != TSlave::kMaster)) {
            Error("GoParallel", "TSlave is neither Master nor Slave");
            R__ASSERT(0);
         }
         
         wlst->Add(sl);
         
         fInactiveSlaves->Add(sl);
         sl->SetStatus(TSlave::kInactive);
      }
   }
   Int_t nwrks = (nodes > wlst->GetSize()) ? wlst->GetSize() : nodes;
   int cnt = 0;
   fEndMaster = IsMaster() ? kTRUE : kFALSE;
   while (cnt < nwrks) {
      
      if (random) {
         Int_t iwrk = (Int_t) (gRandom->Rndm() * wlst->GetSize());
         sl = (TSlave *) wlst->At(iwrk);
      } else {
         
         sl = (TSlave *) wlst->First();
      }
      if (!sl) {
         Error("GoParallel", "attaching to candidate!");
         break;
      }
      Int_t slavenodes = 0;
      if (sl->GetSlaveType() == TSlave::kSlave) {
         sl->SetStatus(TSlave::kActive);
         fActiveSlaves->Add(sl);
         fInactiveSlaves->Remove(sl);
         fActiveMonitor->Add(sl->GetSocket());
         slavenodes = 1;
      } else if (sl->GetSlaveType() == TSlave::kMaster) {
         fEndMaster = kFALSE;
         TMessage mess(kPROOF_PARALLEL);
         if (!attach) {
            mess << nodes-cnt;
         } else {
            
            mess.SetWhat(kPROOF_LOGFILE);
            mess << -1 << -1;
         }
         if (sl->GetSocket()->Send(mess) == -1) {
            MarkBad(sl);
            slavenodes = 0;
         } else {
            Collect(sl);
            sl->SetStatus(TSlave::kActive);
            fActiveSlaves->Add(sl);
            fInactiveSlaves->Remove(sl);
            fActiveMonitor->Add(sl->GetSocket());
            if (sl->GetParallel() > 0) {
               slavenodes = sl->GetParallel();
            } else {
               slavenodes = 0;
            }
         }
      }
      
      wlst->Remove(sl);
      cnt += slavenodes;
   }
   
   wlst->SetOwner(0);
   SafeDelete(wlst);
   
   AskStatistics();
   
   FindUniqueSlaves();
   
   if (!attach)
      SendGroupView();
   Int_t n = GetParallel();
   if (!IsMaster()) {
      if (n < 1)
         printf("PROOF set to sequential mode\n");
      else
         printf("PROOF set to parallel mode (%d worker%s)\n",
                n, n == 1 ? "" : "s");
   }
   PDB(kGlobal,1) Info("GoParallel", "got %d node%s", n, n == 1 ? "" : "s");
   return n;
}
void TProof::ShowCache(Bool_t all)
{
   
   
   if (!IsValid()) return;
   TMessage mess(kPROOF_CACHE);
   mess << Int_t(kShowCache) << all;
   Broadcast(mess, kUnique);
   if (all) {
      TMessage mess2(kPROOF_CACHE);
      mess2 << Int_t(kShowSubCache) << all;
      Broadcast(mess2, fNonUniqueMasters);
      Collect(kAllUnique);
   } else {
      Collect(kUnique);
   }
}
void TProof::ClearCache()
{
    
   if (!IsValid()) return;
   TMessage mess(kPROOF_CACHE);
   mess << Int_t(kClearCache);
   Broadcast(mess, kUnique);
   TMessage mess2(kPROOF_CACHE);
   mess2 << Int_t(kClearSubCache);
   Broadcast(mess2, fNonUniqueMasters);
   Collect(kAllUnique);
   
   fFileMap.clear();
}
void TProof::ShowPackages(Bool_t all)
{
   
   
   
   if (!IsValid()) return;
   if (!IsMaster()) {
      if (fGlobalPackageDirList && fGlobalPackageDirList->GetSize() > 0) {
         
         TIter nxd(fGlobalPackageDirList);
         TNamed *nm = 0;
         while ((nm = (TNamed *)nxd())) {
            printf("*** Global Package cache %s client:%s ***\n",
                   nm->GetName(), nm->GetTitle());
            fflush(stdout);
            gSystem->Exec(Form("%s %s", kLS, nm->GetTitle()));
            printf("\n");
            fflush(stdout);
         }
      }
      printf("*** Package cache client:%s ***\n", fPackageDir.Data());
      fflush(stdout);
      gSystem->Exec(Form("%s %s", kLS, fPackageDir.Data()));
   }
   TMessage mess(kPROOF_CACHE);
   mess << Int_t(kShowPackages) << all;
   Broadcast(mess, kUnique);
   if (all) {
      TMessage mess2(kPROOF_CACHE);
      mess2 << Int_t(kShowSubPackages) << all;
      Broadcast(mess2, fNonUniqueMasters);
      Collect(kAllUnique);
   } else {
      Collect(kUnique);
   }
}
void TProof::ShowEnabledPackages(Bool_t all)
{
   
   
   
   if (!IsValid()) return;
   if (!IsMaster()) {
      printf("*** Enabled packages on client on %s\n", gSystem->HostName());
      TIter next(fEnabledPackagesOnClient);
      while (TObjString *str = (TObjString*) next())
         printf("%s\n", str->GetName());
   }
   TMessage mess(kPROOF_CACHE);
   mess << Int_t(kShowEnabledPackages) << all;
   Broadcast(mess);
   Collect();
}
Int_t TProof::ClearPackages()
{
   
   
   if (!IsValid()) return -1;
   if (UnloadPackages() == -1)
      return -1;
   if (DisablePackages() == -1)
      return -1;
   return fStatus;
}
Int_t TProof::ClearPackage(const char *package)
{
   
   
   if (!IsValid()) return -1;
   if (!package || !strlen(package)) {
      Error("ClearPackage", "need to specify a package name");
      return -1;
   }
   
   TString pac = package;
   if (pac.EndsWith(".par"))
      pac.Remove(pac.Length()-4);
   pac = gSystem->BaseName(pac);
   if (UnloadPackage(pac) == -1)
      return -1;
   if (DisablePackage(pac) == -1)
      return -1;
   return fStatus;
}
R__HIDDEN Int_t TProof::DisablePackage(const char *package)
{
   
   
   if (!IsValid()) return -1;
   if (!package || !strlen(package)) {
      Error("DisablePackage", "need to specify a package name");
      return -1;
   }
   
   TString pac = package;
   if (pac.EndsWith(".par"))
      pac.Remove(pac.Length()-4);
   pac = gSystem->BaseName(pac);
   if (DisablePackageOnClient(pac) == -1)
      return -1;
   TMessage mess(kPROOF_CACHE);
   mess << Int_t(kDisablePackage) << pac;
   Broadcast(mess, kUnique);
   TMessage mess2(kPROOF_CACHE);
   mess2 << Int_t(kDisableSubPackage) << pac;
   Broadcast(mess2, fNonUniqueMasters);
   Collect(kAllUnique);
   return fStatus;
}
R__HIDDEN Int_t TProof::DisablePackageOnClient(const char *package)
{
   
   
   if (!IsMaster()) {
      
      fPackageLock->Lock();
      gSystem->Exec(Form("%s %s/%s", kRM, fPackageDir.Data(), package));
      gSystem->Exec(Form("%s %s/%s.par", kRM, fPackageDir.Data(), package));
      fPackageLock->Unlock();
   }
   return 0;
}
R__HIDDEN Int_t TProof::DisablePackages()
{
   
   
   if (!IsValid()) return -1;
   
   if (!IsMaster()) {
      fPackageLock->Lock();
      gSystem->Exec(Form("%s %s/*", kRM, fPackageDir.Data()));
      fPackageLock->Unlock();
   }
   TMessage mess(kPROOF_CACHE);
   mess << Int_t(kDisablePackages);
   Broadcast(mess, kUnique);
   TMessage mess2(kPROOF_CACHE);
   mess2 << Int_t(kDisableSubPackages);
   Broadcast(mess2, fNonUniqueMasters);
   Collect(kAllUnique);
   return fStatus;
}
Int_t TProof::BuildPackage(const char *package, EBuildPackageOpt opt)
{
   
   
   
   
   
   
   
   
   if (!IsValid()) return -1;
   if (!package || !strlen(package)) {
      Error("BuildPackage", "need to specify a package name");
      return -1;
   }
   
   TString pac = package;
   if (pac.EndsWith(".par"))
      pac.Remove(pac.Length()-4);
   pac = gSystem->BaseName(pac);
   Bool_t buildOnClient = kTRUE;
   if (opt == kDontBuildOnClient) {
      buildOnClient = kFALSE;
      opt = kBuildAll;
   }
   if (opt <= kBuildAll) {
      TMessage mess(kPROOF_CACHE);
      mess << Int_t(kBuildPackage) << pac;
      Broadcast(mess, kUnique);
      TMessage mess2(kPROOF_CACHE);
      mess2 << Int_t(kBuildSubPackage) << pac;
      Broadcast(mess2, fNonUniqueMasters);
   }
   if (opt >= kBuildAll) {
      
      
      Int_t st = 0;
      if (buildOnClient)
         st = BuildPackageOnClient(pac);
      Collect(kAllUnique);
      if (fStatus < 0 || st < 0)
         return -1;
   }
   return 0;
}
Int_t TProof::BuildPackageOnClient(const TString &package)
{
   
   
   
   
   
   if (!IsMaster()) {
      Int_t status = 0;
      TString pdir, ocwd;
      
      pdir = fPackageDir + "/" + package;
      if (gSystem->AccessPathName(pdir, kReadPermission) ||
         gSystem->AccessPathName(pdir + "/PROOF-INF", kReadPermission)) {
         
         if (fGlobalPackageDirList && fGlobalPackageDirList->GetSize() > 0) {
            
            TIter nxd(fGlobalPackageDirList);
            TNamed *nm = 0;
            while ((nm = (TNamed *)nxd())) {
               pdir = Form("%s/%s", nm->GetTitle(), package.Data());
               if (!gSystem->AccessPathName(pdir, kReadPermission) &&
                   !gSystem->AccessPathName(pdir + "/PROOF-INF", kReadPermission)) {
                  
                  break;
               }
               pdir = "";
            }
            if (pdir.Length() <= 0) {
               
               Error("BuildPackageOnClient", "failure locating %s ...", package.Data());
               return -1;
            } else {
               
               if (gDebug > 0)
                  Info("BuildPackageOnClient", "found global package: %s", pdir.Data());
               return 0;
            }
         }
      }
      PDB(kPackage, 1)
         Info("BuildPackageOnCLient",
              "package %s exists and has PROOF-INF directory", package.Data());
      fPackageLock->Lock();
      ocwd = gSystem->WorkingDirectory();
      gSystem->ChangeDirectory(pdir);
      
      if (!gSystem->AccessPathName("PROOF-INF/BUILD.sh")) {
         
         
         FILE *f = fopen("PROOF-INF/proofvers.txt", "r");
         if (f) {
            TString v;
            v.Gets(f);
            fclose(f);
            if (v != gROOT->GetVersion()) {
               if (gSystem->Exec("PROOF-INF/BUILD.sh clean")) {
                  Error("BuildPackageOnClient", "cleaning package %s on the client failed", package.Data());
                  status = -1;
               }
            }
         }
         if (gSystem->Exec("PROOF-INF/BUILD.sh")) {
            Error("BuildPackageOnClient", "building package %s on the client failed", package.Data());
            status = -1;
         }
         f = fopen("PROOF-INF/proofvers.txt", "w");
         if (f) {
            fputs(gROOT->GetVersion(), f);
            fclose(f);
         }
      } else {
         PDB(kPackage, 1)
            Info("BuildPackageOnCLient",
                 "package %s exists but has no PROOF-INF/BUILD.sh script", package.Data());
      }
      gSystem->ChangeDirectory(ocwd);
      fPackageLock->Unlock();
      return status;
   }
   return 0;
}
Int_t TProof::LoadPackage(const char *package, Bool_t notOnClient)
{
   
   
   
   
   if (!IsValid()) return -1;
   if (!package || !strlen(package)) {
      Error("LoadPackage", "need to specify a package name");
      return -1;
   }
   
   TString pac = package;
   if (pac.EndsWith(".par"))
      pac.Remove(pac.Length()-4);
   pac = gSystem->BaseName(pac);
   if (!notOnClient)
      if (LoadPackageOnClient(pac) == -1)
         return -1;
   TMessage mess(kPROOF_CACHE);
   mess << Int_t(kLoadPackage) << pac;
   Broadcast(mess);
   Collect();
   return fStatus;
}
R__HIDDEN Int_t TProof::LoadPackageOnClient(const TString &package)
{
   
   
   
   
   if (!IsMaster()) {
      Int_t status = 0;
      TString pdir, ocwd;
      
      if (fEnabledPackagesOnClient->FindObject(package)) {
         Info("LoadPackageOnClient",
              "package %s already loaded", package.Data());
         return 0;
      }
      
      pdir = fPackageDir + "/" + package;
      if (gSystem->AccessPathName(pdir, kReadPermission)) {
         
         if (fGlobalPackageDirList && fGlobalPackageDirList->GetSize() > 0) {
            
            TIter nxd(fGlobalPackageDirList);
            TNamed *nm = 0;
            while ((nm = (TNamed *)nxd())) {
               pdir = Form("%s/%s", nm->GetTitle(), package.Data());
               if (!gSystem->AccessPathName(pdir, kReadPermission)) {
                  
                  break;
               }
               pdir = "";
            }
            if (pdir.Length() <= 0) {
               
               Error("LoadPackageOnClient", "failure locating %s ...", package.Data());
               return -1;
            }
         }
      }
      ocwd = gSystem->WorkingDirectory();
      gSystem->ChangeDirectory(pdir);
      
      if (!gSystem->AccessPathName("PROOF-INF/SETUP.C")) {
         Int_t err = 0;
         Int_t errm = gROOT->Macro("PROOF-INF/SETUP.C", &err);
         if (errm < 0)
            status = -1;
         if (err > TInterpreter::kNoError && err <= TInterpreter::kFatal)
            status = -1;
      } else {
         PDB(kPackage, 1)
            Info("LoadPackageOnCLient",
                 "package %s exists but has no PROOF-INF/SETUP.C script", package.Data());
      }
      gSystem->ChangeDirectory(ocwd);
      if (!status) {
         
         fPackageLock->Lock();
         FileStat_t stat;
         Int_t st = gSystem->GetPathInfo(package, stat);
         
         
         
         if (stat.fIsLink)
            gSystem->Unlink(package);
         else if (st == 0) {
            Error("LoadPackageOnClient", "cannot create symlink %s in %s on client, "
                  "another item with same name already exists", package.Data(), ocwd.Data());
            fPackageLock->Unlock();
            return -1;
         }
         gSystem->Symlink(pdir, package);
         fPackageLock->Unlock();
         
         gSystem->AddIncludePath(TString("-I") + package);
         
         gROOT->ProcessLine(TString(".include ") + package);
         fEnabledPackagesOnClient->Add(new TObjString(package));
         PDB(kPackage, 1)
            Info("LoadPackageOnClient",
                 "package %s successfully loaded", package.Data());
      } else
         Error("LoadPackageOnClient", "loading package %s on client failed", package.Data());
      return status;
   }
   return 0;
}
R__HIDDEN Int_t TProof::UnloadPackage(const char *package)
{
   
   
   if (!IsValid()) return -1;
   if (!package || !strlen(package)) {
      Error("UnloadPackage", "need to specify a package name");
      return -1;
   }
   
   TString pac = package;
   if (pac.EndsWith(".par"))
      pac.Remove(pac.Length()-4);
   pac = gSystem->BaseName(pac);
   if (UnloadPackageOnClient(pac) == -1)
      return -1;
   TMessage mess(kPROOF_CACHE);
   mess << Int_t(kUnloadPackage) << pac;
   Broadcast(mess);
   Collect();
   return fStatus;
}
R__HIDDEN Int_t TProof::UnloadPackageOnClient(const char *package)
{
   
   
   
   
   if (!IsMaster()) {
      TObjString *pack = (TObjString *) fEnabledPackagesOnClient->FindObject(package);
      if (pack) {
         
         TString aclicincpath = gSystem->GetIncludePath();
         TString cintincpath = gInterpreter->GetIncludePath();
         
         aclicincpath.Remove(aclicincpath.Length() - cintincpath.Length() - 1);
         
         aclicincpath.ReplaceAll(TString(" -I") + package, "");
         gSystem->SetIncludePath(aclicincpath);
         
         
         delete fEnabledPackagesOnClient->Remove(pack);
      }
      
      if (!gSystem->AccessPathName(package))
         if (gSystem->Unlink(package) != 0)
            Warning("UnloadPackageOnClient", "unable to remove symlink to %s", package);
   }
   return 0;
}
R__HIDDEN Int_t TProof::UnloadPackages()
{
   
   
   if (!IsValid()) return -1;
   if (!IsMaster()) {
      
      TIter nextpackage(fEnabledPackagesOnClient);
      while (TObjString *objstr = dynamic_cast<TObjString*>(nextpackage()))
         if (UnloadPackageOnClient(objstr->String()) == -1 )
            return -1;
   }
   TMessage mess(kPROOF_CACHE);
   mess << Int_t(kUnloadPackages);
   Broadcast(mess);
   Collect();
   return fStatus;
}
Int_t TProof::EnablePackage(const char *package, Bool_t notOnClient)
{
   
   
   
   
   
   if (!IsValid()) return -1;
   if (!package || !strlen(package)) {
      Error("EnablePackage", "need to specify a package name");
      return -1;
   }
   
   TString pac = package;
   if (pac.EndsWith(".par"))
      pac.Remove(pac.Length()-4);
   pac = gSystem->BaseName(pac);
   EBuildPackageOpt opt = kBuildAll;
   if (notOnClient)
      opt = kDontBuildOnClient;
   if (BuildPackage(pac, opt) == -1)
      return -1;
   if (LoadPackage(pac, notOnClient) == -1)
      return -1;
   return 0;
}
Int_t TProof::UploadPackage(const char *pack, EUploadPackageOpt opt)
{
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   if (!IsValid()) return -1;
   TString par = pack;
   if (!par.EndsWith(".par"))
      
      par += ".par";
   
   gSystem->ExpandPathName(par);
   if (gSystem->AccessPathName(par, kReadPermission)) {
      TString tried = par;
      
      par = Form("%s/%s", fPackageDir.Data(), gSystem->BaseName(par));
      if (gSystem->AccessPathName(par, kReadPermission)) {
         
         if (fGlobalPackageDirList && fGlobalPackageDirList->GetSize() > 0) {
            
            TIter nxd(fGlobalPackageDirList);
            TNamed *nm = 0;
            TString pdir;
            while ((nm = (TNamed *)nxd())) {
               pdir = Form("%s/%s", nm->GetTitle(), pack);
               if (!gSystem->AccessPathName(pdir, kReadPermission)) {
                  
                  break;
               }
               pdir = "";
            }
            if (pdir.Length() > 0) {
               
               if (gDebug > 0)
                  Info("UploadPackage", "global package found (%s): no upload needed",
                                        pdir.Data());
               return 0;
            }
         }
         Error("UploadPackage", "PAR file '%s' not found; paths tried: %s, %s",
                                gSystem->BaseName(par), tried.Data(), par.Data());
         return -1;
      }
   }
   
   
   
   
   
   
   
   
   
   
   TMD5 *md5 = TMD5::FileChecksum(par);
   if (UploadPackageOnClient(par, opt, md5) == -1) {
      delete md5;
      return -1;
   }
   TMessage mess(kPROOF_CHECKFILE);
   mess << TString("+")+TString(gSystem->BaseName(par)) << (*md5);
   TMessage mess2(kPROOF_CHECKFILE);
   mess2 << TString("-")+TString(gSystem->BaseName(par)) << (*md5);
   TMessage mess3(kPROOF_CHECKFILE);
   mess3 << TString("=")+TString(gSystem->BaseName(par)) << (*md5);
   delete md5;
   if (fProtocol > 8) {
      
      mess << (UInt_t) opt;
      mess2 << (UInt_t) opt;
      mess3 << (UInt_t) opt;
   }
   
   TIter next(fUniqueSlaves);
   TSlave *sl = 0;
   while ((sl = (TSlave *) next())) {
      if (!sl->IsValid())
         continue;
      sl->GetSocket()->Send(mess);
      TMessage *reply;
      sl->GetSocket()->Recv(reply);
      if (reply->What() != kPROOF_CHECKFILE) {
         if (fProtocol > 5) {
            
            if (SendFile(par, (kBinary | kForce), Form("%s/%s/%s",
                         sl->GetProofWorkDir(), kPROOF_PackDir,
                         gSystem->BaseName(par)), sl) < 0) {
               Error("UploadPackage", "problems uploading file %s", par.Data());
               SafeDelete(reply);
               return -1;
            }
         } else {
            
            TFTP ftp(TString("root://")+sl->GetName(), 1);
            if (!ftp.IsZombie()) {
               ftp.cd(Form("%s/%s", sl->GetProofWorkDir(), kPROOF_PackDir));
               ftp.put(par, gSystem->BaseName(par));
            }
         }
         
         sl->GetSocket()->Send(mess2);
         SafeDelete(reply);
         sl->GetSocket()->Recv(reply);
         if (!reply || reply->What() != kPROOF_CHECKFILE) {
            Error("UploadPackage", "unpacking of package %s failed", par.Data());
            SafeDelete(reply);
            return -1;
         }
      }
      SafeDelete(reply);
   }
   
   TIter nextmaster(fNonUniqueMasters);
   TSlave *ma;
   while ((ma = (TSlave *) nextmaster())) {
      if (!ma->IsValid())
         continue;
      ma->GetSocket()->Send(mess3);
      TMessage *reply = 0;
      ma->GetSocket()->Recv(reply);
      if (!reply || reply->What() != kPROOF_CHECKFILE) {
         
         Error("UploadPackage", "package %s did not exist on submaster %s",
               par.Data(), ma->GetOrdinal());
         SafeDelete(reply);
         return -1;
      }
      SafeDelete(reply);
   }
   return 0;
}
Int_t TProof::UploadPackageOnClient(const TString &par, EUploadPackageOpt opt, TMD5 *md5)
{
   
   
   
   
   
   
   
   
   
   
   Int_t status = 0;
   if (!IsMaster()) {
      
      
      
      fPackageLock->Lock();
      TString lpar = fPackageDir + "/" + gSystem->BaseName(par);
      FileStat_t stat;
      Int_t st = gSystem->GetPathInfo(lpar, stat);
      
      
      
      if (stat.fIsLink)
         gSystem->Unlink(lpar);
      else if (st == 0) {
         Error("UploadPackageOnClient", "cannot create symlink %s on client, "
               "another item with same name already exists",
               lpar.Data());
         fPackageLock->Unlock();
         return -1;
      }
      if (!gSystem->IsAbsoluteFileName(par)) {
         TString fpar = par;
         gSystem->Symlink(gSystem->PrependPathName(gSystem->WorkingDirectory(), fpar), lpar);
      } else
         gSystem->Symlink(par, lpar);
      
      
      TString packnam = par(0, par.Length() - 4);  
      packnam = gSystem->BaseName(packnam);        
      TString md5f = fPackageDir + "/" + packnam + "/PROOF-INF/md5.txt";
      TMD5 *md5local = TMD5::ReadChecksum(md5f);
      if (!md5local || (*md5) != (*md5local)) {
         
         Int_t st = 0;
         if ((opt & TProof::kRemoveOld)) {
            
            st = gSystem->Exec(Form("%s %s/%s", kRM, fPackageDir.Data(),
                               packnam.Data()));
            if (st)
               Error("UploadPackageOnClient", "failure executing: %s %s/%s",
                     kRM, fPackageDir.Data(), packnam.Data());
         }
         
         char *gunzip = gSystem->Which(gSystem->Getenv("PATH"), kGUNZIP,
                                       kExecutePermission);
         if (gunzip) {
            
            st = gSystem->Exec(Form(kUNTAR2, gunzip, par.Data(), fPackageDir.Data()));
            if (st)
               Error("Uploadpackage", "failure executing: %s",
                     Form(kUNTAR2, gunzip, par.Data(), fPackageDir.Data()));
            delete [] gunzip;
         } else
            Error("UploadPackageOnClient", "%s not found", kGUNZIP);
         
         if (gSystem->AccessPathName(fPackageDir + "/" + packnam, kWritePermission)) {
            
            Error("UploadPackageOnClient",
                  "package %s did not unpack into %s/%s", par.Data(), fPackageDir.Data(),
                  packnam.Data());
            status = -1;
         } else {
            
            TMD5::WriteChecksum(md5f, md5);
         }
      }
      fPackageLock->Unlock();
      delete md5local;
   }
   return status;
}
Int_t TProof::Load(const char *macro, Bool_t notOnClient)
{
   
   
   
   
   
   if (!IsValid()) return -1;
   if (!macro || !strlen(macro)) {
      Error("Load", "need to specify a macro name");
      return -1;
   }
   if (!IsMaster()) {
      
      TString implname = macro;
      TString acmode, args, io;
      implname = gSystem->SplitAclicMode(implname, acmode, args, io);
      
      Int_t dot = implname.Last('.');
      if (dot == kNPOS) {
         Info("Load", "macro '%s' does not contain a '.': do nothing", macro);
         return -1;
      }
      
      Bool_t hasHeader = kTRUE;
      TString headname = implname;
      headname.Remove(dot);
      headname += ".h";
      if (gSystem->AccessPathName(headname, kReadPermission)) {
         TString h = headname;
         headname.Remove(dot);
         headname += ".hh";
         if (gSystem->AccessPathName(headname, kReadPermission)) {
            hasHeader = kFALSE;
            if (gDebug > 0)
               Info("Load", "no associated header file found: tried: %s %s",
                            h.Data(), headname.Data());
         }
      }
      
      
      if (SendFile(implname) == -1) {
         Info("Load", "problems sending implementation file %s", implname.Data());
         return -1;
      }
      if (hasHeader)
         if (SendFile(headname) == -1) {
            Info("Load", "problems sending header file %s", headname.Data());
            return -1;
         }
      
      TString basemacro = gSystem->BaseName(macro);
      TMessage mess(kPROOF_CACHE);
      mess << Int_t(kLoadMacro) << basemacro;
      Broadcast(mess, kUnique);
      
      if (!notOnClient)
         
         
         gROOT->ProcessLine(Form(".L %s", macro));
      
      Collect(kAllUnique);
   } else {
      
      
      
      
      TString basemacro = gSystem->BaseName(macro);
      TMessage mess(kPROOF_CACHE);
      mess << Int_t(kLoadMacro) << basemacro;
      Broadcast(mess, kUnique);
   }
   
   return 0;
}
Int_t TProof::AddDynamicPath(const char *libpath)
{
   
   
   
   
   if ((!libpath || !strlen(libpath))) {
      if (gDebug > 0)
         Info("AddDynamicPath", "list is empty - nothing to do");
      return 0;
   }
   TMessage m(kPROOF_LIB_INC_PATH);
   m << TString("lib") << (Bool_t)kTRUE;
   
   if (libpath && strlen(libpath))
      m << TString(libpath);
   else
      m << TString("-");
   
   Broadcast(m);
   Collect();
   return 0;
}
Int_t TProof::AddIncludePath(const char *incpath)
{
   
   
   
   
   if ((!incpath || !strlen(incpath))) {
      if (gDebug > 0)
         Info("AddIncludePath", "list is empty - nothing to do");
      return 0;
   }
   TMessage m(kPROOF_LIB_INC_PATH);
   m << TString("inc") << (Bool_t)kTRUE;
   
   if (incpath && strlen(incpath))
      m << TString(incpath);
   else
      m << TString("-");
   
   Broadcast(m);
   Collect();
   return 0;
}
Int_t TProof::RemoveDynamicPath(const char *libpath)
{
   
   
   
   
   if ((!libpath || !strlen(libpath))) {
      if (gDebug > 0)
         Info("AddDynamicPath", "list is empty - nothing to do");
      return 0;
   }
   TMessage m(kPROOF_LIB_INC_PATH);
   m << TString("lib") <<(Bool_t)kFALSE;
   
   if (libpath && strlen(libpath))
      m << TString(libpath);
   else
      m << TString("-");
   
   Broadcast(m);
   Collect();
   return 0;
}
Int_t TProof::RemoveIncludePath(const char *incpath)
{
   
   
   
   
   if ((!incpath || !strlen(incpath))) {
      if (gDebug > 0)
         Info("RemoveIncludePath", "list is empty - nothing to do");
      return 0;
   }
   TMessage m(kPROOF_LIB_INC_PATH);
   m << TString("inc") << (Bool_t)kFALSE;
   
   if (incpath && strlen(incpath))
      m << TString(incpath);
   else
      m << TString("-");
   
   Broadcast(m);
   Collect();
   return 0;
}
TList *TProof::GetListOfPackages()
{
   
   if (!IsValid())
      return (TList *)0;
   TMessage mess(kPROOF_CACHE);
   mess << Int_t(kListPackages);
   Broadcast(mess);
   Collect();
   return fAvailablePackages;
}
TList *TProof::GetListOfEnabledPackages()
{
   
   if (!IsValid())
      return (TList *)0;
   TMessage mess(kPROOF_CACHE);
   mess << Int_t(kListEnabledPackages);
   Broadcast(mess);
   Collect();
   return fEnabledPackages;
}
void TProof::PrintProgress(Long64_t total, Long64_t processed, Float_t procTime)
{
   
   fprintf(stderr, "[TProof::Progress] Total %lld events\t|", total);
   for (int l = 0; l < 20; l++) {
      if (total > 0) {
         if (l < 20*processed/total)
            fprintf(stderr, "=");
         else if (l == 20*processed/total)
            fprintf(stderr, ">");
         else if (l > 20*processed/total)
            fprintf(stderr, ".");
      } else
         fprintf(stderr, "=");
   }
   Float_t evtrti = (procTime > 0. && processed > 0) ? processed / procTime : -1.;
   if (evtrti > 0.)
      fprintf(stderr, "| %.02f %% [%.1f evts/s]\r",
              (total ? ((100.0*processed)/total) : 100.0), evtrti);
   else
      fprintf(stderr, "| %.02f %%\r",
              (total ? ((100.0*processed)/total) : 100.0));
   if (processed >= total)
      fprintf(stderr, "\n");
}
void TProof::Progress(Long64_t total, Long64_t processed)
{
   
   
   PDB(kGlobal,1)
      Info("Progress","%2f (%lld/%lld)", 100.*processed/total, processed, total);
   if (gROOT->IsBatch()) {
      
      PrintProgress(total, processed);
   } else {
      EmitVA("Progress(Long64_t,Long64_t)", 2, total, processed);
   }
}
void TProof::Progress(Long64_t total, Long64_t processed, Long64_t bytesread,
                      Float_t initTime, Float_t procTime,
                      Float_t evtrti, Float_t mbrti)
{
   
   
   PDB(kGlobal,1)
      Info("Progress","%lld %lld %lld %f %f %f %f", total, processed, bytesread,
                                initTime, procTime, evtrti, mbrti);
   if (gROOT->IsBatch()) {
      
      PrintProgress(total, processed, procTime);
   } else {
      EmitVA("Progress(Long64_t,Long64_t,Long64_t,Float_t,Float_t,Float_t,Float_t)",
             7, total, processed, bytesread, initTime, procTime, evtrti, mbrti);
   }
}
void TProof::Feedback(TList *objs)
{
   
   
   PDB(kGlobal,1)
      Info("Feedback","%d objects", objs->GetSize());
   PDB(kFeedback,1) {
      Info("Feedback","%d objects", objs->GetSize());
      objs->ls();
   }
   Emit("Feedback(TList *objs)", (Long_t) objs);
}
void TProof::CloseProgressDialog()
{
   
   PDB(kGlobal,1)
      Info("CloseProgressDialog",
           "called: have progress dialog: %d", fProgressDialogStarted);
   
   if (!fProgressDialogStarted)
      return;
   Emit("CloseProgressDialog()");
}
void TProof::ResetProgressDialog(const char *sel, Int_t sz, Long64_t fst,
                                 Long64_t ent)
{
   
   PDB(kGlobal,1)
      Info("ResetProgressDialog","(%s,%d,%lld,%lld)", sel, sz, fst, ent);
   EmitVA("ResetProgressDialog(const char*,Int_t,Long64_t,Long64_t)",
          4, sel, sz, fst, ent);
}
void TProof::StartupMessage(const char *msg, Bool_t st, Int_t done, Int_t total)
{
   
   PDB(kGlobal,1)
      Info("StartupMessage","(%s,%d,%d,%d)", msg, st, done, total);
   EmitVA("StartupMessage(const char*,Bool_t,Int_t,Int_t)",
          4, msg, st, done, total);
}
void TProof::DataSetStatus(const char *msg, Bool_t st, Int_t done, Int_t total)
{
   
   PDB(kGlobal,1)
      Info("DataSetStatus","(%s,%d,%d,%d)", msg, st, done, total);
   EmitVA("DataSetStatus(const char*,Bool_t,Int_t,Int_t)",
          4, msg, st, done, total);
}
void TProof::SendDataSetStatus(const char *msg, UInt_t n,
                                 UInt_t tot, Bool_t st)
{
   
   if (IsMaster()) {
      TMessage mess(kPROOF_DATASET_STATUS);
      mess << TString(msg) << tot << n << st;
      gProofServ->GetSocket()->Send(mess);
   }
}
void TProof::QueryResultReady(const char *ref)
{
   
   PDB(kGlobal,1)
      Info("QueryResultReady","ref: %s", ref);
   Emit("QueryResultReady(const char*)",ref);
}
void TProof::ValidateDSet(TDSet *dset)
{
   
   if (dset->ElementsValid()) return;
   TList nodes;
   nodes.SetOwner();
   TList slholder;
   slholder.SetOwner();
   TList elemholder;
   elemholder.SetOwner();
   
   TIter nextSlave(GetListOfActiveSlaves());
   while (TSlave *sl = dynamic_cast<TSlave*>(nextSlave())) {
      TList *sllist = 0;
      TPair *p = dynamic_cast<TPair*>(nodes.FindObject(sl->GetName()));
      if (!p) {
         sllist = new TList;
         sllist->SetName(sl->GetName());
         slholder.Add(sllist);
         TList *elemlist = new TList;
         elemlist->SetName(TString(sl->GetName())+"_elem");
         elemholder.Add(elemlist);
         nodes.Add(new TPair(sllist, elemlist));
      } else {
         sllist = dynamic_cast<TList*>(p->Key());
      }
      sllist->Add(sl);
   }
   
   TList nonLocal; 
   
   for (Int_t i = 0; i < 2; i++) {
      Bool_t local = i>0?kFALSE:kTRUE;
      TIter nextElem(local ? dset->GetListOfElements() : &nonLocal);
      while (TDSetElement *elem = dynamic_cast<TDSetElement*>(nextElem())) {
         if (elem->GetValid()) continue;
         TPair *p = dynamic_cast<TPair*>(local?nodes.FindObject(TUrl(elem->GetFileName()).GetHost()):nodes.At(0));
         if (p) {
            TList *eli = dynamic_cast<TList*>(p->Value());
            TList *sli = dynamic_cast<TList*>(p->Key());
            eli->Add(elem);
            
            TPair *p2 = p;
            Bool_t stop = kFALSE;
            while (!stop) {
               TPair *p3 = dynamic_cast<TPair*>(nodes.After(p2->Key()));
               if (p3) {
                  Int_t nelem = dynamic_cast<TList*>(p3->Value())->GetSize();
                  Int_t nsl = dynamic_cast<TList*>(p3->Key())->GetSize();
                  if (nelem*sli->GetSize() < eli->GetSize()*nsl) p2 = p3;
                  else stop = kTRUE;
               } else {
                  stop = kTRUE;
               }
            }
            if (p2!=p) {
               nodes.Remove(p->Key());
               nodes.AddAfter(p2->Key(), p);
            }
         } else {
            if (local) {
               nonLocal.Add(elem);
            } else {
               Error("ValidateDSet", "No Node to allocate TDSetElement to");
               R__ASSERT(0);
            }
         }
      }
   }
   
   TList usedslaves;
   TIter nextNode(&nodes);
   SetDSet(dset); 
   while (TPair *node = dynamic_cast<TPair*>(nextNode())) {
      TList *slaves = dynamic_cast<TList*>(node->Key());
      TList *setelements = dynamic_cast<TList*>(node->Value());
      
      Int_t nslaves = slaves->GetSize();
      Int_t nelements = setelements->GetSize();
      for (Int_t i=0; i<nslaves; i++) {
         TDSet copyset(dset->GetType(), dset->GetObjName(),
                       dset->GetDirectory());
         for (Int_t j = (i*nelements)/nslaves;
                    j < ((i+1)*nelements)/nslaves;
                    j++) {
            TDSetElement *elem =
               dynamic_cast<TDSetElement*>(setelements->At(j));
            copyset.Add(elem->GetFileName(), elem->GetObjName(),
                        elem->GetDirectory(), elem->GetFirst(),
                        elem->GetNum(), elem->GetMsd());
         }
         if (copyset.GetListOfElements()->GetSize()>0) {
            TMessage mesg(kPROOF_VALIDATE_DSET);
            mesg << ©set;
            TSlave *sl = dynamic_cast<TSlave*>(slaves->At(i));
            PDB(kGlobal,1) Info("ValidateDSet",
                                "Sending TDSet with %d elements to slave %s"
                                " to be validated",
                                copyset.GetListOfElements()->GetSize(),
                                sl->GetOrdinal());
            sl->GetSocket()->Send(mesg);
            usedslaves.Add(sl);
         }
      }
   }
   PDB(kGlobal,1) Info("ValidateDSet","Calling Collect");
   Collect(&usedslaves);
   SetDSet(0);
}
void TProof::AddInput(TObject *obj)
{
   
   
   fPlayer->AddInput(obj);
}
void TProof::ClearInput()
{
   
   fPlayer->ClearInput();
   
   AddInput(fFeedback);
}
TObject *TProof::GetOutput(const char *name)
{
   
   
   return fPlayer->GetOutput(name);
}
TList *TProof::GetOutputList()
{
   
   return fPlayer->GetOutputList();
}
void TProof::SetParameter(const char *par, const char *value)
{
   
   
   TList *il = fPlayer->GetInputList();
   TObject *item = il->FindObject(par);
   if (item) {
      il->Remove(item);
      delete item;
   }
   il->Add(new TNamed(par, value));
}
void TProof::SetParameter(const char *par, Long_t value)
{
   
   TList *il = fPlayer->GetInputList();
   TObject *item = il->FindObject(par);
   if (item) {
      il->Remove(item);
      delete item;
   }
   il->Add(new TParameter<Long_t>(par, value));
}
void TProof::SetParameter(const char *par, Long64_t value)
{
   
   TList *il = fPlayer->GetInputList();
   TObject *item = il->FindObject(par);
   if (item) {
      il->Remove(item);
      delete item;
   }
   il->Add(new TParameter<Long64_t>(par, value));
}
void TProof::SetParameter(const char *par, Double_t value)
{
   
   TList *il = fPlayer->GetInputList();
   TObject *item = il->FindObject(par);
   if (item) {
      il->Remove(item);
      delete item;
   }
   il->Add(new TParameter<Double_t>(par, value));
}
TObject *TProof::GetParameter(const char *par) const
{
   
   
   TList *il = fPlayer->GetInputList();
   return il->FindObject(par);
}
void TProof::DeleteParameters(const char *wildcard)
{
   
   
   if (!wildcard) wildcard = "";
   TRegexp re(wildcard, kTRUE);
   Int_t nch = strlen(wildcard);
   TList *il = fPlayer->GetInputList();
   TObject *p;
   TIter next(il);
   while ((p = next())) {
      TString s = p->GetName();
      if (nch && s != wildcard && s.Index(re) == kNPOS) continue;
      il->Remove(p);
      delete p;
   }
}
void TProof::ShowParameters(const char *wildcard) const
{
   
   
   if (!wildcard) wildcard = "";
   TRegexp re(wildcard, kTRUE);
   Int_t nch = strlen(wildcard);
   TList *il = fPlayer->GetInputList();
   TObject *p;
   TIter next(il);
   while ((p = next())) {
      TString s = p->GetName();
      if (nch && s != wildcard && s.Index(re) == kNPOS) continue;
      if (p->IsA() == TNamed::Class()) {
         Printf("%s\t\t\t%s", s.Data(), p->GetTitle());
      } else if (p->IsA() == TParameter<Long_t>::Class()) {
         Printf("%s\t\t\t%ld", s.Data(), dynamic_cast<TParameter<Long_t>*>(p)->GetVal());
      } else if (p->IsA() == TParameter<Long64_t>::Class()) {
         Printf("%s\t\t\t%lld", s.Data(), dynamic_cast<TParameter<Long64_t>*>(p)->GetVal());
      } else if (p->IsA() == TParameter<Double_t>::Class()) {
         Printf("%s\t\t\t%f", s.Data(), dynamic_cast<TParameter<Double_t>*>(p)->GetVal());
      } else {
         Printf("%s\t\t\t%s", s.Data(), p->GetTitle());
      }
   }
}
void TProof::AddFeedback(const char *name)
{
   
   PDB(kFeedback, 3)
      Info("AddFeedback", "Adding object \"%s\" to feedback", name);
   if (fFeedback->FindObject(name) == 0)
      fFeedback->Add(new TObjString(name));
}
void TProof::RemoveFeedback(const char *name)
{
   
   TObject *obj = fFeedback->FindObject(name);
   if (obj != 0) {
      fFeedback->Remove(obj);
      delete obj;
   }
}
void TProof::ClearFeedback()
{
   
   fFeedback->Delete();
}
void TProof::ShowFeedback() const
{
   
   if (fFeedback->GetSize() == 0) {
      Info("","no feedback requested");
      return;
   }
   fFeedback->Print();
}
TList *TProof::GetFeedbackList() const
{
   
   return fFeedback;
}
TTree *TProof::GetTreeHeader(TDSet *dset)
{
   
   
   TList *l = GetListOfActiveSlaves();
   TSlave *sl = (TSlave*) l->First();
   if (sl == 0) {
      Error("GetTreeHeader", "No connection");
      return 0;
   }
   TSocket *soc = sl->GetSocket();
   TMessage msg(kPROOF_GETTREEHEADER);
   msg << dset;
   soc->Send(msg);
   TMessage *reply;
   Int_t d = soc->Recv(reply);
   if (reply <= 0) {
      Error("GetTreeHeader", "Error getting a replay from the master.Result %d", (int) d);
      return 0;
   }
   TString s1;
   TTree * t;
   (*reply) >> s1;
   (*reply) >> t;
   PDB(kGlobal, 1)
      if (t)
         Info("GetTreeHeader", Form("%s, message size: %d, entries: %d\n",
             s1.Data(), reply->BufferSize(), (int) t->GetMaxEntryLoop()));
      else
         Info("GetTreeHeader", Form("%s, message size: %d\n", s1.Data(), reply->BufferSize()));
   delete reply;
   return t;
}
TDrawFeedback *TProof::CreateDrawFeedback()
{
   
   
   return fPlayer->CreateDrawFeedback(this);
}
void TProof::SetDrawFeedbackOption(TDrawFeedback *f, Option_t *opt)
{
   
   fPlayer->SetDrawFeedbackOption(f, opt);
}
void TProof::DeleteDrawFeedback(TDrawFeedback *f)
{
   
   fPlayer->DeleteDrawFeedback(f);
}
TList *TProof::GetOutputNames()
{
   
   return 0;
}
void TProof::Browse(TBrowser *b)
{
   
   b->Add(fActiveSlaves, fActiveSlaves->Class(), "fActiveSlaves");
   b->Add(&fMaster, fMaster.Class(), "fMaster");
   b->Add(fFeedback, fFeedback->Class(), "fFeedback");
   b->Add(fChains, fChains->Class(), "fChains");
   b->Add(fPlayer->GetInputList(), fPlayer->GetInputList()->Class(), "InputList");
   if (fPlayer->GetOutputList())
      b->Add(fPlayer->GetOutputList(), fPlayer->GetOutputList()->Class(), "OutputList");
   if (fPlayer->GetListOfResults())
      b->Add(fPlayer->GetListOfResults(),
             fPlayer->GetListOfResults()->Class(), "ListOfResults");
}
void TProof::SetPlayer(TVirtualProofPlayer *player)
{
   
   if (fPlayer)
      delete fPlayer;
   fPlayer = player;
};
TVirtualProofPlayer *TProof::MakePlayer(const char *player, TSocket *s)
{
   
   
   
   if (!player)
      player = "remote";
   SetPlayer(TVirtualProofPlayer::Create(player, this, s));
   return GetPlayer();
}
void TProof::AddChain(TChain *chain)
{
   
   fChains->Add(chain);
}
void TProof::RemoveChain(TChain *chain)
{
   
   fChains->Remove(chain);
}
void *TProof::SlaveStartupThread(void *arg)
{
   
   if (fgSemaphore) fgSemaphore->Wait();
   TProofThreadArg *ta = (TProofThreadArg *)arg;
   PDB(kGlobal,1)
      ::Info("TProof::SlaveStartupThread",
             "Starting slave %s on host %s", ta->fOrd.Data(), ta->fUrl->GetHost());
   TSlave *sl = 0;
   if (ta->fType == TSlave::kSlave) {
      
      sl = ta->fProof->CreateSlave(ta->fUrl->GetUrl(), ta->fOrd,
                                   ta->fPerf, ta->fImage, ta->fWorkdir);
      
      if (sl && sl->IsValid())
         sl->SetupServ(TSlave::kSlave, 0);
   } else {
      
      sl = ta->fProof->CreateSubmaster(ta->fUrl->GetUrl(), ta->fOrd,
                                       ta->fImage, ta->fMsd);
      
      if (sl && sl->IsValid())
         sl->SetupServ(TSlave::kMaster, ta->fWorkdir);
   }
   if (sl && sl->IsValid()) {
      {  R__LOCKGUARD2(gProofMutex);
         
         ta->fSlaves->Add(sl);
         if (ta->fClaims) { 
            
            TCondorSlave *c = ta->fCslave;
            ta->fClaims->Remove(c);
         }
      }
      
      PDB(kGlobal,1)
         ::Info("TProof::SlaveStartupThread",
                "slave %s on host %s created and added to list",
                ta->fOrd.Data(), ta->fUrl->GetHost());
   } else {
      
      SafeDelete(sl);
      ::Error("TProof::SlaveStartupThread",
              "slave %s on host %s could not be created",
              ta->fOrd.Data(), ta->fUrl->GetHost());
   }
   if (fgSemaphore) fgSemaphore->Post();
   return 0;
}
void TProof::GetLog(Int_t start, Int_t end)
{
   
   
   if (!IsValid() || IsMaster()) return;
   TMessage msg(kPROOF_LOGFILE);
   msg << start << end;
   Broadcast(msg, kActive);
   Collect(kActive);
}
void TProof::PutLog(TQueryResult *pq)
{
   
   if (!pq) return;
   TList *lines = pq->GetLogFile()->GetListOfLines();
   if (lines) {
      TIter nxl(lines);
      TObjString *l = 0;
      while ((l = (TObjString *)nxl()))
         EmitVA("LogMessage(const char*,Bool_t)", 2, l->GetName(), kFALSE);
   }
}
void TProof::ShowLog(const char *queryref)
{
   
   
   
   
   Retrieve(queryref);
   if (fPlayer) {
      if (queryref) {
         if (fPlayer->GetListOfResults()) {
            TIter nxq(fPlayer->GetListOfResults());
            TQueryResult *qr = 0;
            while ((qr = (TQueryResult *) nxq()))
               if (strstr(queryref, qr->GetTitle()) &&
                   strstr(queryref, qr->GetName()))
                  break;
            if (qr) {
               PutLog(qr);
               return;
            }
         }
      }
   }
}
void TProof::ShowLog(Int_t qry)
{
   
   
   
   
   
   
   
   Int_t nowlog = lseek(fileno(fLogFileR), (off_t) 0, SEEK_CUR);
   
   Int_t startlog = nowlog;
   Int_t endlog = lseek(fileno(fLogFileR), (off_t) 0, SEEK_END);
   lseek(fileno(fLogFileR), (off_t) nowlog, SEEK_SET);
   if (qry == 0) {
      startlog = 0;
      lseek(fileno(fLogFileR), (off_t) 0, SEEK_SET);
   } else if (qry != -1) {
      TQueryResult *pq = 0;
      if (qry == -2) {
         
         pq = (GetQueryResults()) ? ((TQueryResult *)(GetQueryResults()->Last())) : 0;
         if (!pq) {
            GetListOfQueries();
            if (fQueries)
               pq = (TQueryResult *)(fQueries->Last());
         }
      } else if (qry > 0) {
         TList *queries = GetQueryResults();
         if (queries) {
            TIter nxq(queries);
            while ((pq = (TQueryResult *)nxq()))
               if (qry == pq->GetSeqNum())
                  break;
         }
         if (!pq) {
            queries = GetListOfQueries();
            TIter nxq(queries);
            while ((pq = (TQueryResult *)nxq()))
               if (qry == pq->GetSeqNum())
                  break;
         }
      }
      if (pq) {
         PutLog(pq);
         return;
      } else {
         if (gDebug > 0)
            Info("ShowLog","query %d not found in list", qry);
         qry = -1;
      }
   }
   
   UInt_t tolog = (UInt_t)(endlog - startlog);
   
   if (tolog <= 0)
   
   lseek(fileno(fLogFileR), (off_t) startlog, SEEK_SET);
   
   Int_t np = 0;
   char line[2048];
   Int_t wanted = (tolog > sizeof(line)) ? sizeof(line) : tolog;
   while (fgets(line, wanted, fLogFileR)) {
      Int_t r = strlen(line);
      if (!SendingLogToWindow()) {
         if (line[r-1] != '\n') line[r-1] = '\n';
         if (r > 0) {
            char *p = line;
            while (r) {
               Int_t w = write(fileno(stdout), p, r);
               if (w < 0) {
                  SysError("ShowLogFile", "error writing to stdout");
                  break;
               }
               r -= w;
               p += w;
            }
         }
         tolog -= strlen(line);
         np++;
         
         if (!(np%10)) {
            char *opt = Getline("More (y/n)? [y]");
            if (opt[0] == 'n')
               break;
         }
         
         if (tolog <= 0)
            break;
         
         wanted = (tolog > sizeof(line)) ? sizeof(line) : tolog;
      } else {
         
         if (line[r-1] == '\n') line[r-1] = 0;
         LogMessage(line, kFALSE);
      }
   }
   if (!SendingLogToWindow()) {
      
      write(fileno(stdout), "\n", 1);
   }
   
   if (qry > -1)
      lseek(fileno(fLogFileR), (off_t) nowlog, SEEK_SET);
}
void TProof::cd(Int_t id)
{
   
   
   if (GetManager()) {
      TProofDesc *d = GetManager()->GetProofDesc(id);
      if (d) {
         if (d->GetProof()) {
            gProof = d->GetProof();
            return;
         }
      }
      
      gProof = this;
   }
   return;
}
void TProof::Detach(Option_t *opt)
{
   
   
   
   if (!IsValid()) return;
   
   TSlave *sl = (TSlave *) fActiveSlaves->First();
   TSocket *s = sl->GetSocket();
   if (!sl || !(sl->IsValid()) || !s) {
      Error("Detach","corrupted worker instance: wrk:%p, sock:%p", sl, s);
      return;
   }
   Bool_t shutdown = (strchr(opt,'s') || strchr(opt,'S')) ? kTRUE : kFALSE;
   
   if (shutdown && !IsIdle()) {
      
      Remove("cleanupqueue");
      
      Long_t timeout = gEnv->GetValue("Proof.ShutdownTimeout", 60);
      timeout = (timeout > 20) ? timeout : 20;
      
      StopProcess(kFALSE, (Long_t) (timeout / 2));
      
      Collect(kActive, timeout);
   }
   
   DeActivateAsyncInput();
   
   sl->FlushSocket();
   
   Close(opt);
   
   if (fProgressDialogStarted)
      CloseProgressDialog();
   
   if (GetManager() && GetManager()->QuerySessions("L")) {
      TIter nxd(GetManager()->QuerySessions("L"));
      TProofDesc *d = 0;
      while ((d = (TProofDesc *)nxd())) {
         if (d->GetProof() == this) {
            d->SetProof(0);
            GetManager()->QuerySessions("L")->Remove(d);
            break;
         }
      }
   }
   
   if (!fProgressDialogStarted)
      delete this;
   else
      
      fValid = kFALSE;
   return;
}
void TProof::SetAlias(const char *alias)
{
   
   
   
   
   TNamed::SetTitle(alias);
   if (IsMaster())
      
      TNamed::SetName(alias);
   
   if (!IsValid()) return;
   if (!IsProofd() && !IsMaster()) {
      TSlave *sl = (TSlave *) fActiveSlaves->First();
      if (sl)
         sl->SetAlias(alias);
   }
   return;
}
Int_t TProof::UploadDataSet(const char *dataSetName,
                            TList *files,
                            const char *desiredDest,
                            Int_t opt,
                            TList *skippedFiles)
{
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   if (strchr(dataSetName, '/')) {
      if (strstr(dataSetName, "public") != dataSetName) {
         Error("UploadDataSet",
               "Name of public dataset should start with public/");
         return kError;
      }
   }
   if (opt & kOverwriteAllFiles && opt & kOverwriteNoFiles
       || opt & kNoOverwriteDataSet && opt & kAppend
       || opt & kOverwriteDataSet && opt & kAppend
       || opt & kNoOverwriteDataSet && opt & kOverwriteDataSet
       || opt & kAskUser && opt & (kOverwriteDataSet |
                                   kNoOverwriteDataSet |
                                   kAppend |
                                   kOverwriteAllFiles |
                                   kOverwriteNoFiles)) {
      Error("UploadDataSet", "you specified contradicting options.");
      return kError;
   }
   
   Int_t overwriteAll = (opt & kOverwriteAllFiles) ? kTRUE : kFALSE;
   Int_t overwriteNone = (opt & kOverwriteNoFiles) ? kTRUE : kFALSE;
   Int_t goodName = (opt & (kOverwriteDataSet | kAppend)) ? 1 : -1;
   Int_t appendToDataSet = (opt & kAppend) ? kTRUE : kFALSE;
   Int_t overwriteNoDataSet = (opt & kNoOverwriteDataSet) ? kTRUE : kFALSE;
   
   if ((!skippedFiles || !&skippedFiles) && overwriteNone) {
      Error("UploadDataSet",
            "Provide pointer to TList object as skippedFiles argument when using kOverwriteNoFiles option.");
      return kError;
   }
   
   if (skippedFiles && &skippedFiles)
      if (skippedFiles->Class() != TList::Class()) {
         Error("UploadDataSet",
               "Provided skippedFiles argument does not point to a TList object.");
         return kError;
      }
   TSocket *master;
   if (fActiveSlaves->GetSize())
      master = ((TSlave*)(fActiveSlaves->First()))->GetSocket();
   else {
      Error("UploadDataSet", "No connection to the master!");
      return kError;
   }
   Int_t fileCount = 0; 
   TMessage *retMess;
   if (goodName == -1) { 
      
      
      TMessage nameMess(kPROOF_DATASETS);
      nameMess << Int_t(kCheckDataSetName);
      nameMess << TString(dataSetName);
      Broadcast(nameMess);
      master->Recv(retMess);
      Collect(); 
      if (retMess->What() == kMESS_NOTOK) {
         
         while (goodName == -1 && !overwriteNoDataSet) {
            Printf("Dataset %s already exist. ",
                   dataSetName);
            Printf("Do you want to overwrite it[Yes/No/Append]?");
            TString answer;
            answer.ReadToken(cin);
            if (!strncasecmp(answer.Data(), "y", 1)) {
               goodName = 1;
            } else if (!strncasecmp(answer.Data(), "n", 1)) {
               goodName = 0;
            } else if (!strncasecmp(answer.Data(), "a", 1)) {
               goodName = 1;
               appendToDataSet = kTRUE;
            }
         }
      }
      else if (retMess->What() == kMESS_OK)
         goodName = 1;
      else
         Error("UploadDataSet", "unrecongnized message type: %d!",
            retMess->What());
      delete retMess;
   } 
   if (goodName == 1) {  
      
      char *relativeDestDir = Form("%s/%s/",
                                   gSystem->GetUserInfo()->fUser.Data(),
                                   desiredDest?desiredDest:"");
                                   
      relativeDestDir = CollapseSlashesInPath(relativeDestDir);
      TString dest = Form("%s/%s", GetDataPoolUrl(), relativeDestDir);
      delete[] relativeDestDir;
      
      TList *fileList = new TList();
      TIter next(files);
      while (TFileInfo *fileInfo = ((TFileInfo*)next())) {
         TUrl *fileUrl = fileInfo->GetFirstUrl();
         if (gSystem->AccessPathName(fileUrl->GetUrl()) == kFALSE) {
            
            
            const char *ent = gSystem->BaseName(fileUrl->GetFile());
            Int_t goodFileName = 1;
            if (!overwriteAll &&
               gSystem->AccessPathName(Form("%s/%s", dest.Data(), ent), kFileExists)
                  == kFALSE) {  
               goodFileName = -1;
               while (goodFileName == -1 && !overwriteAll && !overwriteNone) {
                  Printf("File %s already exists. ", Form("%s/%s", dest.Data(), ent));
                  Printf("Do you want to overwrite it [Yes/No/all/none]?");
                  TString answer;
                  answer.ReadToken(cin);
                  if (!strncasecmp(answer.Data(), "y", 1))
                     goodFileName = 1;
                  else if (!strncasecmp(answer.Data(), "all", 3))
                     overwriteAll = kTRUE;
                  else if (!strncasecmp(answer.Data(), "none", 4))
                     overwriteNone = kTRUE;
                  else if (!strncasecmp(answer.Data(), "n", 1))
                     goodFileName = 0;
               }
            } 
            
            if (goodFileName == 1 || overwriteAll) {
               
               Printf("Uploading %s to %s/%s",
                      fileUrl->GetUrl(), dest.Data(), ent);
               if (TFile::Cp(fileUrl->GetUrl(), Form("%s/%s", dest.Data(), ent))) {
                  fileList->Add(new TFileInfo(Form("%s/%s", dest.Data(), ent)));
               } else
                  Error("UploadDataSet", "file %s was not copied", fileUrl->GetUrl());
            } else {  
               fileList->Add(new TFileInfo(Form("%s/%s", dest.Data(), ent)));
               if (skippedFiles && &skippedFiles) {
                  
                  
                  skippedFiles->Add(new TFileInfo(fileUrl->GetUrl()));
               }
            }
         } 
      } 
      if ((fileCount = fileList->GetSize()) == 0) {
         Printf("No files were copied. The dataset will not be saved");
      } else {
         if (CreateDataSet(dataSetName, fileList,
                     appendToDataSet?kAppend:kOverwriteDataSet) <= 0) {
            Error("UploadDataSet", "Error while saving dataset!");
            fileCount = kError;
         }
      }
      fileList->SetOwner();
      delete fileList;
   } else if (overwriteNoDataSet) {
      Printf("Dataset %s already exists", dataSetName);
      return kDataSetExists;
   } 
   return fileCount;
}
Int_t TProof::UploadDataSet(const char *dataSetName,
                            const char *files,
                            const char *desiredDest,
                            Int_t opt,
                            TList *skippedFiles)
{
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   TList *fileList = new TList();
   void *dataSetDir = gSystem->OpenDirectory(gSystem->DirName(files));
   const char* ent;
   TString filesExp(gSystem->BaseName(files));
   filesExp.ReplaceAll("*",".*");
   TRegexp rg(filesExp);
   while ((ent = gSystem->GetDirEntry(dataSetDir))) {
      TString entryString(ent);
      if (entryString.Index(rg) != kNPOS) {
         
         
         
         TUrl *url = new TUrl(Form("%s/%s",
                                   gSystem->DirName(files), ent), kTRUE);
         if (gSystem->AccessPathName(url->GetUrl(), kReadPermission) == kFALSE)
            fileList->Add(new TFileInfo(url->GetUrl()));
         delete url;
      } 
   } 
   Int_t fileCount;
   if ((fileCount = fileList->GetSize()) == 0)
      Printf("No files match your selection. The dataset will not be saved");
   else
      fileCount = UploadDataSet(dataSetName, fileList, desiredDest,
                                opt, skippedFiles);
   fileList->SetOwner();
   delete fileList;
   return fileCount;
}
Int_t TProof::UploadDataSetFromFile(const char *dataset, const char *file,
                                    const char *dest, Int_t opt)
{
   
   
   
   
   
   Int_t fileCount = 0;
   ifstream f;
   f.open(gSystem->ExpandPathName(file), ifstream::out);
   if (f.is_open()) {
      while (f.good()) {
         TString line;
         line.ReadToDelim(f);
         if (fileCount == 0) {
            
            fileCount += UploadDataSet(dataset, line.Data(), dest, opt);
         } else 
            fileCount += UploadDataSet(dataset, line.Data(), dest,
                                       opt | kAppend);
      }
      f.close();
   } else {
      Error("UploadDataSetFromFile", "unable to open the specified file");
      return -1;
   }
   return fileCount;
}
Int_t TProof::CreateDataSet(const char *dataSetName,
                              TList *files,
                              Int_t opt)
{
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   if (strchr(dataSetName, '/')) {
      if (strstr(dataSetName, "public") != dataSetName) {
         Error("CreateDataSet",
               "Name of public dataset should start with public/");
         return kError;
      }
   }
   if (opt & kOverwriteDataSet && opt & kAppend
       || opt & kNoOverwriteDataSet && opt & kAppend
       || opt & kNoOverwriteDataSet && opt & kOverwriteDataSet
       || opt & kAskUser && opt & (kOverwriteDataSet |
                                   kNoOverwriteDataSet |
                                   kAppend)) {
      Error("CreateDataSet", "you specified contradicting options.");
      return kError;
   }
   if (opt & kOverwriteAllFiles || opt & kOverwriteNoFiles) {
      Error("CreateDataSet", "you specified unsupported options.");
      return kError;
   }
   
   Int_t goodName = (opt & (kOverwriteDataSet | kAppend)) ? 1 : -1;
   Int_t appendToDataSet = (opt & kAppend) ? kTRUE : kFALSE;
   Int_t overwriteNoDataSet = (opt & kNoOverwriteDataSet) ? kTRUE : kFALSE;
   TSocket *master;
   if (fActiveSlaves->GetSize())
      master = ((TSlave*)(fActiveSlaves->First()))->GetSocket();
   else {
      Error("CreateDataSet", "No connection to the master!");
      return kError;
   }
   Int_t fileCount = 0; 
   
   TMessage *retMess;
   if (goodName == -1) { 
      
      
      TMessage nameMess(kPROOF_DATASETS);
      nameMess << Int_t(kCheckDataSetName);
      nameMess << TString(dataSetName);
      Broadcast(nameMess);
      master->Recv(retMess);
      Collect(); 
      if (retMess->What() == kMESS_NOTOK) {
         
         while (goodName == -1 && !overwriteNoDataSet) {
            Printf("Dataset %s already exists. ",
                   dataSetName);
            Printf("Do you want to overwrite it[Yes/No/Append]?");
            TString answer;
            answer.ReadToken(cin);
            if (!strncasecmp(answer.Data(), "y", 1)) {
               goodName = 1;
            } else if (!strncasecmp(answer.Data(), "n", 1)) {
               goodName = 0;
            } else if (!strncasecmp(answer.Data(), "a", 1)) {
               goodName = 1;
               appendToDataSet = kTRUE;
            }
         }
      }
      else if (retMess->What() == kMESS_OK)
         goodName = 1;
      else
         Error("CreateDataSet", "unrecongnized message type: %d!",
            retMess->What());
      delete retMess;
   } 
   if (goodName == 1) {
      if ((fileCount = files->GetSize()) == 0) {
         Printf("No files specified!");
      } else {
         TMessage mess(kPROOF_DATASETS);
         if (appendToDataSet)
            mess << Int_t(kAppendDataSet);
         else
            mess << Int_t(kCreateDataSet);
         mess << TString(dataSetName);
         mess.WriteObject(files);
         Broadcast(mess);
         
         if (master->Recv(retMess) <= 0) {
            Error("CreateDataSet", "No response form the master");
            fileCount = -1;
         } else {
            if (retMess->What() == kMESS_NOTOK) {
               Printf("Dataset was not saved.");
               fileCount = -1;
            } else if (retMess->What() != kMESS_OK)
               Error("CreateDataSet",
                     "Unexpected message type: %d", retMess->What());
            delete retMess;
         }
      }
   } else if (overwriteNoDataSet) {
      Printf("Dataset %s already exists", dataSetName);
      Collect();
      return kDataSetExists;
   } 
   Collect();
   return fileCount;
}
TList *TProof::GetDataSets(const char *dir)
{
   
   
   
   
   TSocket *master;
   if (fActiveSlaves->GetSize())
      master = ((TSlave*)(fActiveSlaves->First()))->GetSocket();
   else {
      Error("GetDataSets", "No connection to the master!");
      return 0;
   }
   if (dir) {
      
      if (strstr(dir, "public") != dir && strchr(dir, '~') != dir) {
         
         Error("GetDataSets",
               "directory should be of form '[~userName/]public'");
         return 0;
      }
   }
   TMessage mess(kPROOF_DATASETS);
   mess << Int_t(kGetDataSets);
   mess << TString(dir?dir:"");
   Broadcast(mess);
   TMessage *retMess;
   master->Recv(retMess);
   TList *dataSetList = 0;
   if (retMess->What() == kMESS_OBJECT) {
      dataSetList = (TList*)(retMess->ReadObject(TList::Class()));
      if (!dataSetList)
         Error("GetDataSets", "Error receiving list of datasets");
   } else
      Printf("The dataset directory could not be open");
   Collect();
   delete retMess;
   return dataSetList;
}
void TProof::ShowDataSets(const char *dir)
{
   
   
   
   
   
   TList *dataSetList;
   if ((dataSetList = GetDataSets(dir))) {
      if (dir)
         Printf("DataSets in %s :", dir);
      else
         Printf("Existing DataSets:");
      TIter next(dataSetList);
      while (TObjString *obj = (TObjString*)next())
         Printf("%s", obj->GetString().Data());
      dataSetList->SetOwner();
      delete dataSetList;
   } else
      Printf("Error getting a list of datasets");
}
TList *TProof::GetDataSet(const char *dataset)
{
   
   
   TSocket *master;
   if (fActiveSlaves->GetSize())
      master = ((TSlave*)(fActiveSlaves->First()))->GetSocket();
   else {
      Error("GetDataSet", "No connection to the master!");
      return 0;
   }
   TMessage nameMess(kPROOF_DATASETS);
   nameMess << Int_t(kGetDataSet);
   nameMess << TString(dataset);
   if (Broadcast(nameMess) < 0)
      Error("GetDataSet", "Sending request failed");
   TMessage *retMess;
   master->Recv(retMess);
   TList *fileList = 0;
   if (retMess->What() == kMESS_OK) {
      if (!(fileList = (TList*)(retMess->ReadObject(TList::Class()))))
         Error("GetDataSet", "Error reading list of files");
   } else if (retMess->What() != kMESS_NOTOK)
      Error("GetDataSet", "Wrong message type %d", retMess->What());
   Collect();
   delete retMess;
   return fileList;
}
void TProof::ShowDataSet(const char *dataset)
{
   
   TList *fileList;
   if ((fileList = GetDataSet(dataset))) {
      if (fileList->GetSize()) {
         
         Printf("Files in %s:", dataset);
         TIter next(fileList);
         while (TFileInfo *obj = (TFileInfo*)next())
            Printf("%s", obj->GetFirstUrl()->GetUrl());
      } else
         Printf("There are no files in %s", dataset);
      delete fileList;
   }
   else
      Printf("No such dataset: %s", dataset);
}
Int_t TProof::RemoveDataSet(const char *dataSet)
{
   
   
   
   TSocket *master;
   if (fActiveSlaves->GetSize())
      master = ((TSlave*)(fActiveSlaves->First()))->GetSocket();
   else {
      Error("RemoveDataSet", "No connection to the master!");
      return kError;
   }
   TMessage nameMess(kPROOF_DATASETS);
   nameMess << Int_t(kRemoveDataSet);
   nameMess << TString(dataSet);
   if (Broadcast(nameMess) < 0)
      Error("RemoveDataSet", "Sending request failed");
   TMessage *mess;
   TString errorMess;
   master->Recv(mess);
   Collect();
   if (mess->What() != kMESS_OK) {
      if (mess->What() != kMESS_NOTOK)
         Error("RemoveDataSet", "unrecongnized message type: %d!",
               mess->What());
      delete mess;
      return -1;
   } else {
      delete mess;
      return 0;
   }
}
Int_t TProof::VerifyDataSet(const char *dataSet)
{
   
   
   Int_t nMissingFiles = 0;
   TSocket *master;
   if (fActiveSlaves->GetSize())
      master = ((TSlave*)(fActiveSlaves->First()))->GetSocket();
   else {
      Error("VerifyDataSet", "No connection to the master!");
      return kError;
   }
   TMessage nameMess(kPROOF_DATASETS);
   nameMess << Int_t(kVerifyDataSet);
   nameMess << TString(dataSet);
   if (Broadcast(nameMess) < 0)
      Error("VerifyDataSet", "Sending request failed");
   TMessage *mess;
   master->Recv(mess);
   Collect();
   if (mess->What() == kMESS_OK) {
      TList *missingFiles;
      missingFiles = (TList*)(mess->ReadObject(TList::Class()));
      nMissingFiles = missingFiles->GetSize();
      if (nMissingFiles == 0)
         Printf("The files from %s dataset are all present on the cluster",
                dataSet);
      else {
         Printf("The following files are missing from dataset %s ", dataSet);
         Printf("at the moment:");
         TIter next(missingFiles);
         TFileInfo* fileInfo;
         while ((fileInfo = (TFileInfo*)next())) {
            Printf("\t%s", fileInfo->GetFirstUrl()->GetUrl());
         }
      }
      missingFiles->SetOwner();
      delete missingFiles;
   } else if (mess->What() == kMESS_NOTOK) {
      Printf("ValidateDataSet: no such dataset %s", dataSet);
      delete mess;
      return  -1;
   } else
      Fatal("ValidateDataSet", "unknown message type %d", mess->What());
   delete mess;
   return nMissingFiles;
}
void TProof::InterruptCurrentMonitor()
{
   
   if (fCurrentMonitor)
      fCurrentMonitor->Interrupt();
}
void TProof::ActivateWorker(const char *ord)
{
   
   
   
   
   
   
   ModifyWorkerLists(ord, kTRUE);
}
void TProof::DeactivateWorker(const char *ord)
{
   
   
   
   
   
   
   ModifyWorkerLists(ord, kFALSE);
}
void TProof::ModifyWorkerLists(const char *ord, Bool_t add)
{
   
   
   
   
   
   
   
   if (!ord || strlen(ord) <= 0) {
      Info("ModifyWorkerLists",
           "An ordinal number - e.g. \"0.4\" or \"*\" for all - is required as input");
      return;
   }
   Bool_t fw = kTRUE;    
   Bool_t rs = kFALSE;   
   
   TList *in = (add) ? fInactiveSlaves : fActiveSlaves;
   TList *out = (add) ? fActiveSlaves : fInactiveSlaves;
   if (IsMaster()) {
      fw = IsEndMaster() ? kFALSE : kTRUE;
      
      if (in->GetSize() > 0) {
         TIter nxw(in);
         TSlave *wrk = 0;
         while ((wrk = (TSlave *) nxw())) {
            if (ord[0] == '*' || !strncmp(wrk->GetOrdinal(), ord, strlen(ord))) {
               
               if (!out->FindObject(wrk)) {
                  out->Add(wrk);
                  if (add)
                     fActiveMonitor->Add(wrk->GetSocket());
               }
               
               in->Remove(wrk);
               if (!add) {
                  fActiveMonitor->Remove(wrk->GetSocket());
                  wrk->SetStatus(TSlave::kInactive);
               } else
                  wrk->SetStatus(TSlave::kActive);
               
               fw = kFALSE;
               
               rs = kTRUE;
               
               if (ord[0] != '*')
                  break;
            }
         }
      }
   }
   
   if (rs)
      FindUniqueSlaves();
   
   Int_t action = (add) ? (Int_t) kActivateWorker : (Int_t) kDeactivateWorker;
   if (fw) {
      TMessage mess(kPROOF_WORKERLISTS);
      mess << action << TString(ord);
      Broadcast(mess);
      Collect();
   }
}
TProof *TProof::Open(const char *cluster, const char *conffile,
                                   const char *confdir, Int_t loglevel)
{
   
   
   
   
   
   
   
   
   
   
   const char *pn = "TProof::Open";
   
   
   if (!cluster) {
      TPluginManager *pm = gROOT->GetPluginManager();
      if (!pm) {
         ::Error(pn, "plugin manager not found");
         return 0;
      }
      if (gROOT->IsBatch()) {
         ::Error(pn, "we are in batch mode, cannot show PROOF Session Viewer");
         return 0;
      }
      
      TPluginHandler *sv = pm->FindHandler("TSessionViewer", "");
      if (!sv) {
         ::Error(pn, "no plugin found for TSessionViewer");
         return 0;
      }
      if (sv->LoadPlugin() == -1) {
         ::Error(pn, "plugin for TSessionViewer could not be loaded");
         return 0;
      }
      sv->ExecPlugin(0);
      return 0;
   } else {
      
      TUrl u(cluster);
      
      TString o(u.GetOptions());
      Int_t locid = -1;
      Bool_t create = kFALSE;
      if (o.Length() > 0) {
         if (o.BeginsWith("N",TString::kIgnoreCase)) {
            create = kTRUE;
         } else if (o.IsDigit()) {
            locid = o.Atoi();
         }
         u.SetOptions("");
      }
      
      TProofMgr *mgr = TProofMgr::Create(u.GetUrl());
      TProof *proof = 0;
      if (mgr && mgr->IsValid()) {
         
         
         Bool_t attach = (create || mgr->IsProofd()) ? kFALSE : kTRUE;
         if (attach) {
            TProofDesc *d = 0;
            if (locid < 0)
               
               d = (TProofDesc *) mgr->QuerySessions("")->First();
            else
               d = (TProofDesc *) mgr->GetProofDesc(locid);
            if (d) {
               proof = (TProof*) mgr->AttachSession(d->GetLocalId());
               if (!proof || !proof->IsValid()) {
                  if (locid)
                     ::Error(pn, "new session could not be attached");
                  SafeDelete(proof);
               }
            }
         }
         
         if (!proof) {
            proof = (TProof*) mgr->CreateSession(conffile, confdir, loglevel);
            if (!proof || !proof->IsValid()) {
               ::Error(pn, "new session could not be created");
               SafeDelete(proof);
            }
         }
      }
      return proof;
   }
}
TProofMgr *TProof::Mgr(const char *url)
{
   
   
   if (!url)
      return (TProofMgr *)0;
   
   return TProofMgr::Create(url);
}
void TProof::Reset(const char *url)
{
   
   if (url) {
      TProofMgr *mgr = TProof::Mgr(url);
      if (mgr && mgr->IsValid())
         mgr->Reset();
      else
         ::Error("TProof::Reset",
                 "unable to initialize a valid manager instance");
   }
}
const TList *TProof::GetEnvVars()
{
   
   return fgProofEnvList;
}
void TProof::AddEnvVar(const char *name, const char *value)
{
   
   
   if (gDebug > 0) ::Info("TProof::AddEnvVar","%s=%s", name, value);
   if (fgProofEnvList == 0) {
      
      fgProofEnvList = new TList;
      fgProofEnvList->SetOwner();
   } else {
      
      TObject *o = fgProofEnvList->FindObject(name);
      if (o != 0) {
         fgProofEnvList->Remove(o);
      }
   }
   fgProofEnvList->Add(new TNamed(name, value));
}
void TProof::DelEnvVar(const char *name)
{
   
   
   if (fgProofEnvList == 0) return;
   TObject *o = fgProofEnvList->FindObject(name);
   if (o != 0) {
      fgProofEnvList->Remove(o);
   }
}
void TProof::ResetEnvVars()
{
   
   
   if (fgProofEnvList == 0) return;
   SafeDelete(fgProofEnvList);
}
void TProof::SaveWorkerInfo()
{
   
   
   
   
   if (!IsMaster())
      return;
   
   if (!gProofServ) {
      Error("SaveWorkerInfo","gProofServ undefined");
      return;
   }
   
   const_cast<TProof*>(this)->AskStatistics();
   
   if (!fSlaves && !fBadSlaves) {
      Warning("SaveWorkerInfo","all relevant worker lists is undefined");
      return;
   }
   
   TString fnwrk = Form("%s/.workers",
                        gSystem->DirName(gProofServ->GetSessionDir()));
   FILE *fwrk = fopen(fnwrk.Data(),"w");
   if (!fwrk) {
      Error("SaveWorkerInfo",
            "cannot open %s for writing (errno: %d)", fnwrk.Data(), errno);
      return;
   }
   
   TIter nxa(fSlaves);
   TSlave *wrk = 0;
   while ((wrk = (TSlave *) nxa())) {
      Int_t status = (fBadSlaves && fBadSlaves->FindObject(wrk)) ? 0 : 1;
      
      fprintf(fwrk,"%s@%s:%d %d %s %s.log\n",
                   wrk->GetUser(), wrk->GetName(), wrk->GetPort(), status,
                   wrk->GetOrdinal(), wrk->GetWorkDir());
   }
   
   fclose(fwrk);
   
   return;
}
Int_t TProof::GetParameter(TCollection *c, const char *par, TString &value)
{
   
   
   
   TObject *obj = c->FindObject(par);
   if (obj) {
      TNamed *par = dynamic_cast<TNamed*>(obj);
      if (par) {
         value = par->GetTitle();
         return 0;
      }
   }
   return -1;
}
Int_t TProof::GetParameter(TCollection *c, const char *par, Long_t &value)
{
   
   
   
   TObject *obj = c->FindObject(par);
   if (obj) {
      TParameter<Long_t> *par = dynamic_cast<TParameter<Long_t>*>(obj);
      if (par) {
         value = par->GetVal();
         return 0;
      }
   }
   return -1;
}
Int_t TProof::GetParameter(TCollection *c, const char *par, Long64_t &value)
{
   
   
   
   TObject *obj = c->FindObject(par);
   if (obj) {
      TParameter<Long64_t> *par = dynamic_cast<TParameter<Long64_t>*>(obj);
      if (par) {
         value = par->GetVal();
         return 0;
      }
   }
   return -1;
}
Int_t TProof::GetParameter(TCollection *c, const char *par, Double_t &value)
{
   
   
   
   TObject *obj = c->FindObject(par);
   if (obj) {
      TParameter<Double_t> *par = dynamic_cast<TParameter<Double_t>*>(obj);
      if (par) {
         value = par->GetVal();
         return 0;
      }
   }
   return -1;
}
This page has been automatically generated. If you have any comments or suggestions about the page layout send a mail to ROOT support, or contact the developers with any questions or problems regarding ROOT.