// @(#)root/proof:$Name:  $:$Id: TProof.cxx,v 1.73 2004/11/24 07:41:32 brun Exp $
// Author: Fons Rademakers   13/02/97

/*************************************************************************
 * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers.               *
 * All rights reserved.                                                  *
 *                                                                       *
 * For the licensing terms see $ROOTSYS/LICENSE.                         *
 * For the list of contributors see $ROOTSYS/README/CREDITS.             *
 *************************************************************************/

//////////////////////////////////////////////////////////////////////////
//                                                                      //
// TProof                                                               //
//                                                                      //
// This class controls a Parallel ROOT Facility, PROOF, cluster.        //
// It fires the slave servers, it keeps track of how many slaves are    //
// running, it keeps track of the slaves running status, it broadcasts  //
// messages to all slaves, it collects results, etc.                    //
//                                                                      //
//////////////////////////////////////////////////////////////////////////

#include <fcntl.h>
#include <errno.h>
#ifdef WIN32
#   include <io.h>
#   include <sys/stat.h>
#   include <sys/types.h>
#else
#   include <unistd.h>
#endif

#include "TProof.h"
#include "TSortedList.h"
#include "TSlave.h"
#include "TMonitor.h"
#include "TMessage.h"
#include "TSystem.h"
#include "TError.h"
#include "TUrl.h"
#include "TFTP.h"
#include "TROOT.h"
#include "TH1.h"
#include "TProofPlayer.h"
#include "TDSet.h"
#include "TEnv.h"
#include "TPluginManager.h"
#include "TCondor.h"
#include "Riostream.h"

//----- PROOF Interrupt signal handler -----------------------------------------------
//______________________________________________________________________________
class TProofInterruptHandler : public TSignalHandler {
private:
   TProof *fProof;
public:
   TProofInterruptHandler(TProof *p)
      : TSignalHandler(kSigInterrupt, kFALSE), fProof(p) { }
   Bool_t Notify();
};

//______________________________________________________________________________
Bool_t TProofInterruptHandler::Notify()
{
   // TProof interrupt handler.

   fProof->Interrupt(TProof::kHardInterrupt);
   return kTRUE;
}

//----- Input handler for messages from TProofServ -----------------------------
//______________________________________________________________________________
class TProofInputHandler : public TFileHandler {
private:
   TSocket *fSocket;
   TProof  *fProof;
public:
   TProofInputHandler(TProof *p, TSocket *s)
      : TFileHandler(s->GetDescriptor(), 1) { fProof = p; fSocket = s; }
   Bool_t Notify();
   Bool_t ReadNotify() { return Notify(); }
};

//______________________________________________________________________________
Bool_t TProofInputHandler::Notify()
{
   fProof->HandleAsyncInput(fSocket);
   return kTRUE;
}


//------------------------------------------------------------------------------

ClassImp(TSlaveInfo)

//______________________________________________________________________________
void TSlaveInfo::Print(Option_t *opt) const
{
   // Print slave info. If opt = "active" print only the active
   // slaves, if opt="notactive" print only the not active slaves,
   // if opt = "bad" print only the bad slaves, else
   // print all slaves.

   TString stat = fStatus == kActive ? "active" :
                  fStatus == kBad ? "bad" :
                  "not active";

   if (!opt) opt = "";
   if (!strcmp(opt, "active") && fStatus != kActive)
      return;
   if (!strcmp(opt, "notactive") && fStatus != kNotActive)
      return;
   if (!strcmp(opt, "bad") && fStatus != kBad)
      return;

   cout << "Slave: "          << fOrdinal
        << "  hostname: "     << fHostName
        << "  perf index: "   << fPerfIndex
        << "  "               << stat
        << endl;
}



//------------------------------------------------------------------------------

ClassImp(TProof)


//______________________________________________________________________________
 TProof::TProof(const char *masterurl, const char *conffile,
               const char *confdir, Int_t loglevel)
{
   // Create a PROOF environment. Starting PROOF involves either connecting
   // to a master server, which in turn will start a set of slave servers, or
   // directly starting as master server (if master = ""). Masterurl is of
   // the form: proof://host[:port] or proofs://host[:port]. Conffile is
   // the name of the config file describing the remote PROOF cluster
   // (this argument alows you to describe different cluster configurations).
   // The default is proof.conf. Confdir is the directory where the config
   // file and other PROOF related files are (like motd and noproof files).
   // Loglevel is the log level (default = 1).

   if (!conffile || strlen(conffile) == 0)
      conffile = kPROOF_ConfFile;
   if (!confdir  || strlen(confdir) == 0)
      confdir = kPROOF_ConfDir;

   // Can have only one PROOF session open at a time (for the time being).
   if (gProof) {
      Warning("TProof", "closing currently open PROOF session");
      gProof->Close();
   }

   Init(masterurl, conffile, confdir, loglevel);

   gProof = this;
}

//______________________________________________________________________________
 TProof::~TProof()
{
   // Clean up PROOF environment.

   Close();
   SafeDelete(fIntHandler);
   SafeDelete(fSlaves);
   SafeDelete(fActiveSlaves);
   SafeDelete(fUniqueSlaves);
   SafeDelete(fBadSlaves);
   SafeDelete(fAllMonitor);
   SafeDelete(fActiveMonitor);
   SafeDelete(fUniqueMonitor);
   SafeDelete(fCondor);
   SafeDelete(fSlaveInfo);
   SafeDelete(fFeedback);

   if (gProof == this) {
      gProof = 0;
   }
}

//______________________________________________________________________________
 Int_t TProof::Init(const char *masterurl, const char *conffile,
                   const char *confdir, Int_t loglevel)
{
   // Start the PROOF environment. Starting PROOF involves either connecting
   // to a master server, which in turn will start a set of slave servers, or
   // directly starting as master server (if master = ""). For a description
   // of the arguments see the TProof ctor. Returns the number of started
   // master or slave servers, returns 0 in case of error, in which case
   // fValid remains false.

   Assert(gSystem);

   fValid = kFALSE;

   TUrl *u;
   if (!masterurl || !*masterurl)
      u = new TUrl("proof://__master__");
   else if (strstr(masterurl, "://"))
      u = new TUrl(masterurl);
   else
      u = new TUrl(Form("proof://%s", masterurl));

   fUser           = u->GetUser();
   fMaster         = u->GetHost();
   fPort           = u->GetPort();
   fConfDir        = confdir;
   fConfFile       = conffile;
   fWorkDir        = gSystem->WorkingDirectory();
   fLogLevel       = loglevel;
   fProtocol       = kPROOF_Protocol;
   fMasterServ     = fMaster == "__master__" ? kTRUE : kFALSE;
   fSendGroupView  = kTRUE;
   fImage          = fMasterServ ? "" : "<local>";
   fIntHandler     = 0;
   fProgressDialog = 0;
   fStatus         = 0;
   fParallel       = 0;
   fSlaveInfo      = 0;
   fFeedback       = 0;
   fPlayer         = 0;
   fCondor         = 0;
   fSecContext     = 0;
   fUrlProtocol    = u->GetProtocol();

   delete u;

   // global logging
   gProofDebugLevel = fLogLevel;
   gProofDebugMask  = TProofDebug::kAll;

   // sort slaves by descending performance index
   fSlaves        = new TSortedList(kSortDescending);
   fActiveSlaves  = new TList;
   fUniqueSlaves  = new TList;
   fBadSlaves     = new TList;
   fAllMonitor    = new TMonitor;
   fActiveMonitor = new TMonitor;
   fUniqueMonitor = new TMonitor;

   // If this is a master server, find the config file and start slave
   // servers as specified in the config file
   if (IsMaster()) {

      TString fconf;
      fconf.Form("%s/.%s", gSystem->Getenv("HOME"), conffile);
      PDB(kGlobal,2) Info("Init", "checking PROOF config file %s", fconf.Data());
      if (gSystem->AccessPathName(fconf, kReadPermission)) {
         fconf.Form("%s/proof/etc/%s", confdir, conffile);
         PDB(kGlobal,2) Info("Init", "checking PROOF config file %s", fconf.Data());
         if (gSystem->AccessPathName(fconf, kReadPermission)) {
            Error("Init", "no PROOF config file found");
            return 0;
         }
      }

      PDB(kGlobal,1) Info("Init", "using PROOF config file: %s", fconf.Data());

      Bool_t staticSlaves = gEnv->GetValue("Proofd.StaticSlaves", kTRUE);
      PDB(kGlobal,1) Info("Init", "using StaticSlaves: %s", staticSlaves ? "kTRUE" : "kFALSE");

      if (staticSlaves) {
         FILE *pconf;
         if ((pconf = fopen(fconf, "r"))) {

            fConfFile = fconf;

            // read the config file
            char line[1024];
            TString host = gSystem->GetHostByName(gSystem->HostName()).GetHostName();
            int  ord = 0;

            // check for valid master line
            while (fgets(line, sizeof(line), pconf)) {
               char word[12][128];
               if (line[0] == '#') continue;   // skip comment lines
               int nword = sscanf(line, "%s %s %s %s %s %s %s %s %s %s %s %s", word[0], word[1],
                   word[2], word[3], word[4], word[5], word[6],
                   word[7], word[8], word[9], word[10], word[11]);

               // see if master may run on this node, accept both old "node"
               // and new "master" lines
               if (nword >= 2 &&
                   (!strcmp(word[0], "node") || !strcmp(word[0], "master")) &&
                   !fImage.Length()) {
                  TInetAddress a = gSystem->GetHostByName(word[1]);
                  if (!host.CompareTo(a.GetHostName()) ||
                      !strcmp(word[1], "localhost")) {
                     char *image = word[1];
                     if (nword > 2 && !strncmp(word[2], "image=", 6))
                        image = word[2]+6;
                     fImage = image;
                  }
               }
            }

            if (fImage.Length() == 0) {
               fclose(pconf);
               Error("Init", "no appropriate master line found in %s", fconf.Data());
               return 0;
            }

            // check for valid slave lines and start slaves
            rewind(pconf);
            while (fgets(line, sizeof(line), pconf)) {
               char word[12][128];
               if (line[0] == '#') continue;   // skip comment lines
               int nword = sscanf(line, "%s %s %s %s %s %s %s %s %s %s %s %s", word[0], word[1],
                   word[2], word[3], word[4], word[5], word[6],
                   word[7], word[8], word[9], word[10], word[11]);

               // find all slave servers, accept both "slave" and "worker" lines
               if (nword >= 2 &&
                   (!strcmp(word[0], "slave") || !strcmp(word[0], "worker"))) {
                  int perfidx  = 100;
                  int sport    = fPort;

                  const char *image = word[1];
                  for (int i = 2; i < nword; i++) {

                     if (!strncmp(word[i], "perf=", 5))
                        perfidx = atoi(word[i]+5);
                     if (!strncmp(word[i], "image=", 6))
                        image = word[i]+6;
                     if (!strncmp(word[i], "port=", 5))
                        sport = atoi(word[i]+5);

                  }

                  // Get slave FQDN ...
                  TString SlaveFqdn;
                  TInetAddress SlaveAddr = gSystem->GetHostByName((const char *)word[1]);
                  if (SlaveAddr.IsValid()) {
                     SlaveFqdn = SlaveAddr.GetHostName();
                     if (SlaveFqdn == "UnNamedHost")
                     SlaveFqdn = SlaveAddr.GetHostAddress();
                  }

                  // create slave server
                  TSlave *slave = new TSlave(word[1], sport, ord++, perfidx,
                                             image, this);

                  fSlaves->Add(slave);
                  if (slave->IsValid()) {
                     fAllMonitor->Add(slave->GetSocket());
                     slave->SetInputHandler(new TProofInputHandler(this,
                                            slave->GetSocket()));
                  } else {
                     fBadSlaves->Add(slave);
                  }
                  PDB(kGlobal,3)
                     Info("Init","slave on host %s created and added to list", word[1]);
               }
            }
         }
         fclose(pconf);

      } else {
         // non static config

         fCondor = new TCondor;

         fImage = fCondor->GetImage(gSystem->HostName());
         if (fImage.Length() == 0) {
            Error("Init", "no appropriate master line found in %s for system %s",
                  fconf.Data(), gSystem->HostName());
            return 0;
         }

         TList *claims = fCondor->Claim(9999, "dummy");

         TIter next(claims);
         TCondorSlave *c;
         Int_t ord = 0;
         while((c = (TCondorSlave*) next()) != 0) {

            // Get slave FQDN ...
            TString SlaveFqdn;
            TInetAddress SlaveAddr = gSystem->GetHostByName((const char *)c->fHostname);
            if (SlaveAddr.IsValid()) {
               SlaveFqdn = SlaveAddr.GetHostName();
               if (SlaveFqdn == "UnNamedHost")
               SlaveFqdn = SlaveAddr.GetHostAddress();
            }

            TSlave *slave = new TSlave(c->fHostname, c->fPort, ord++,
                                       c->fPerfIdx, c->fImage, this);

            fSlaves->Add(slave);
            if (slave->IsValid()) {
               fAllMonitor->Add(slave->GetSocket());
               slave->SetInputHandler(new TProofInputHandler(this,
                                      slave->GetSocket()));
            } else {
               fBadSlaves->Add(slave);
            }
         }
      }

   } else {

      // create master server
      TSlave *slave = new TSlave(fMaster, fPort, 0, 100, "master", this);

      if (slave->IsValid()) {
         // check protocol compatability
         // protocol 1 is not supported anymore
         if (fProtocol == 1) {
            Error("Init", "client and remote protocols not compatible (%d and %d)",
                  fProtocol, kPROOF_Protocol);
            delete slave;
            return 0;
         }

         fSlaves->Add(slave);
         fAllMonitor->Add(slave->GetSocket());
         Collect(slave);
         if (fStatus == -99) {
            Error("Init", "not allowed to connect to PROOF master server");
            return 0;
         }

         if (!slave->IsValid()) {
            delete slave;
            Error("Init", "failed to setup connection with PROOF master server");
            return 0;
         }

         slave->SetInputHandler(new TProofInputHandler(this, slave->GetSocket()));

         fIntHandler = new TProofInterruptHandler(this);
         fIntHandler->Add();

         if (!gROOT->IsBatch()) {
            if ((fProgressDialog = gROOT->GetPluginManager()->FindHandler("TProofProgressDialog")))
               if (fProgressDialog->LoadPlugin() == -1)
                  fProgressDialog = 0;
         }

      } else {
         delete slave;
         Error("Init", "failed to connect to a PROOF master server");
         return 0;
      }
   }

   // we are now properly initialized
   fValid = kTRUE;

   // de-activate monitor (will be activated in Collect)
   fAllMonitor->DeActivateAll();

   // by default go into parallel mode
   GoParallel(9999);

   // send relevant initial state to slaves
   SendInitialState();

   SetActive(kFALSE);

   if (IsValid())
      gROOT->GetListOfSockets()->Add(this);

   return fActiveSlaves->GetSize();
}

//______________________________________________________________________________
 void TProof::Close(Option_t *)
{
   // Close all open slave servers.

   if (fSlaves) {
      if (fIntHandler)
         fIntHandler->Remove();

      // If local client ...
      if (!IsMaster()) {
         // ... tell master and slaves to stop
         Interrupt(kShutdownInterrupt, kAll);
      }

      fActiveSlaves->Clear("nodelete");
      fUniqueSlaves->Clear("nodelete");
      fBadSlaves->Clear("nodelete");
      fSlaves->Delete();
   }
}

//______________________________________________________________________________
 TSlave *TProof::FindSlave(TSocket *s) const
{
   // Find slave that has TSocket s. Returns 0 in case slave is not found.

   TSlave *sl;
   TIter   next(fSlaves);

   while ((sl = (TSlave *)next())) {
      if (sl->IsValid() && sl->GetSocket() == s)
         return sl;
   }
   return 0;
}

//______________________________________________________________________________
 void TProof::FindUniqueSlaves()
{
   // Add to the fUniqueSlave list the active slaves that have a unique
   // (user) file system image. This information is used to transfer files
   // only once to nodes that share a file system (an image).

   fUniqueSlaves->Clear();
   fUniqueMonitor->RemoveAll();

   TIter next(fActiveSlaves);

   TSlave *sl;
   while ((sl = (TSlave *)next())) {
      if (fImage == sl->fImage) continue;
      TIter next2(fUniqueSlaves);
      TSlave *sl2;
      Int_t   add = fUniqueSlaves->IsEmpty() ? 1 : 0;
      while ((sl2 = (TSlave *)next2())) {
         if (sl->fImage == sl2->fImage) {
            add = 0;
            break;
         }
         add++;
      }
      if (add) {
         fUniqueSlaves->Add(sl);
         fUniqueMonitor->Add(sl->GetSocket());
      }
   }

   // will be actiavted in Collect()
   fUniqueMonitor->DeActivateAll();
}

//______________________________________________________________________________
 Int_t TProof::GetNumberOfSlaves() const
{
   // Return number of slaves as described in the config file.

   return fSlaves->GetSize();
}

//______________________________________________________________________________
 Int_t TProof::GetNumberOfActiveSlaves() const
{
   // Return number of active slaves, i.e. slaves that are valid and in
   // the current computing group.

   return fActiveSlaves->GetSize();
}

//______________________________________________________________________________
 Int_t TProof::GetNumberOfUniqueSlaves() const
{
   // Return number of unique slaves, i.e. active slaves that have each a
   // unique different user files system.

   return fUniqueSlaves->GetSize();
}

//______________________________________________________________________________
 Int_t TProof::GetNumberOfBadSlaves() const
{
   // Return number of bad slaves. This are slaves that we in the config
   // file, but refused to startup or that died during the PROOF session.

   return fBadSlaves->GetSize();
}

//______________________________________________________________________________
 void TProof::AskStatus()
{
   // Ask the status of the slaves.

   if (!IsValid()) return;

   Broadcast(kPROOF_STATUS, kActive);
   Collect(kActive);
}

//______________________________________________________________________________
 void TProof::Interrupt(EUrgent type, ESlaves list)
{
   // Send interrupt OOB byte to master or slave servers.

   if (!IsValid()) return;

   char oobc = (char) type;

   TList *slaves = 0;
   if (list == kAll)    slaves = fSlaves;
   if (list == kActive) slaves = fActiveSlaves;
   if (list == kUnique) slaves = fUniqueSlaves;

   if (slaves->GetSize() == 0) return;

   const int kBufSize = 1024;
   char waste[kBufSize];

   TSlave *sl;
   TIter   next(slaves);

   while ((sl = (TSlave *)next())) {
      if (sl->IsValid()) {
         TSocket *s = sl->GetSocket();

         // Send one byte out-of-band message to server
         if (s->SendRaw(&oobc, 1, kOob) <= 0) {
            Error("Interrupt", "error sending oobc to slave %d", sl->GetOrdinal());
            continue;
         }

         if (type == kHardInterrupt) {
            char  oob_byte;
            int   n, nch, nbytes = 0, nloop = 0;

            // Receive the OOB byte
            while ((n = s->RecvRaw(&oob_byte, 1, kOob)) < 0) {
               if (n == -2) {   // EWOULDBLOCK
                  //
                  // The OOB data has not yet arrived: flush the input stream
                  //
                  // In some systems (Solaris) regular recv() does not return upon
                  // receipt of the oob byte, which makes the below call to recv()
                  // block indefinitely if there are no other data in the queue.
                  // FIONREAD ioctl can be used to check if there are actually any
                  // data to be flushed.  If not, wait for a while for the oob byte
                  // to arrive and try to read it again.
                  //
                  s->GetOption(kBytesToRead, nch);
                  if (nch == 0) {
                     gSystem->Sleep(1000);
                     continue;
                  }

                  if (nch > kBufSize) nch = kBufSize;
                  n = s->RecvRaw(waste, nch);
                  if (n <= 0) {
                     Error("Interrupt", "error receiving waste from slave %d",
                           sl->GetOrdinal());
                     break;
                  }
                  nbytes += n;
               } else if (n == -3) {   // EINVAL
                  //
                  // The OOB data has not arrived yet
                  //
                  gSystem->Sleep(100);
                  if (++nloop > 100) {  // 10 seconds time-out
                     Error("Interrupt", "server %d does not respond", sl->GetOrdinal());
                     break;
                  }
               } else {
                  Error("Interrupt", "error receiving OOB from server %d",
                        sl->GetOrdinal());
                  break;
               }
            }

            //
            // Continue flushing the input socket stream until the OOB
            // mark is reached
            //
            while (1) {
               int atmark;

               s->GetOption(kAtMark, atmark);

               if (atmark)
                  break;

               // find out number of bytes to read before atmark
               s->GetOption(kBytesToRead, nch);
               if (nch == 0) {
                  gSystem->Sleep(1000);
                  continue;
               }

               if (nch > kBufSize) nch = kBufSize;
               n = s->RecvRaw(waste, nch);
               if (n <= 0) {
                  Error("Interrupt", "error receiving waste (2) from slave %d",
                        sl->GetOrdinal());
                  break;
               }
               nbytes += n;
            }
            if (nbytes > 0) {
               if (IsMaster())
                  Printf("*** Slave %s:%d synchronized: %d bytes discarded",
                         sl->GetName(), sl->GetOrdinal(), nbytes);
               else
                  Printf("*** PROOF synchronized: %d bytes discarded", nbytes);
            }

            // Get log file from master or slave after a hard interrupt
            Collect(sl);

         } else if (type == kSoftInterrupt) {

            // Get log file from master or slave after a soft interrupt
            Collect(sl);

         } else if (type == kShutdownInterrupt) {

            ; // nothing expected to be returned

         } else {

            // Unexpected message, just receive log file
            Collect(sl);

         }
      }
   }
}

//______________________________________________________________________________
 Int_t TProof::GetParallel() const
{
   // Returns number of slaves active in parallel mode. Returns 0 in case
   // there are no active slaves. Returns -1 in case of error.

   if (!IsValid()) return -1;

   if (IsMaster())
      return GetNumberOfActiveSlaves();

   return fParallel;
}

//______________________________________________________________________________
 TList *TProof::GetSlaveInfo()
{
   // Returns number of slaves active in parallel mode. Returns 0 in case
   // there are no active slaves. Returns -1 in case of error.

   if (!IsValid()) return 0;

   if (fSlaveInfo == 0) {
      fSlaveInfo = new TList;
      fSlaveInfo->SetOwner();
   } else {
      fSlaveInfo->Delete();
   }

   if (IsMaster()) {
      TIter next(GetListOfSlaves());
      TSlave *slave;

      while (((slave = (TSlave *) next()))) {
         TSlaveInfo *slaveinfo = new TSlaveInfo(slave->GetOrdinal(),
                                                slave->GetName(),
                                                slave->GetPerfIdx());
         fSlaveInfo->Add(slaveinfo);
      }

      TIter nextinfo(fSlaveInfo);
      TSlaveInfo *info;
      while ((info = (TSlaveInfo *) nextinfo())) {
         TIter next1(GetListOfActiveSlaves());
         while ((slave = (TSlave *) next1())) {
            if (info->GetOrdinal() == slave->GetOrdinal()) {
               info->SetStatus(TSlaveInfo::kActive);
               break;
            }
         }

         TIter next2(GetListOfBadSlaves());
         while ((slave = (TSlave *) next2())) {
            if (info->GetOrdinal() == slave->GetOrdinal()) {
               info->SetStatus(TSlaveInfo::kBad);
               break;
            }
         }
      }

   } else {
      Broadcast(kPROOF_GETSLAVEINFO);
      Collect();
   }

   return fSlaveInfo;
}

//______________________________________________________________________________
 Int_t TProof::Broadcast(const TMessage &mess, TList *slaves)
{
   // Broadcast a message to all slaves in the specified list. Returns
   // the number of slaves the message was successfully sent to.
   // Returns -1 in case of error.

   if (!IsValid()) return -1;

   if (slaves->GetSize() == 0) return 0;

   int   nsent = 0;
   TIter next(slaves);

   TSlave *sl;
   while ((sl = (TSlave *)next())) {
      if (sl->IsValid()) {
         if (sl->GetSocket()->Send(mess) == -1)
            MarkBad(sl);
         else
            nsent++;
      }
   }

   return nsent;
}

//______________________________________________________________________________
 Int_t TProof::Broadcast(const TMessage &mess, ESlaves list)
{
   // Broadcast a message to all slaves in the specified list (either
   // all slaves or only the active slaves). Returns the number of slaves
   // the message was successfully sent to. Returns -1 in case of error.

   TList *slaves = 0;
   if (list == kAll)    slaves = fSlaves;
   if (list == kActive) slaves = fActiveSlaves;
   if (list == kUnique) slaves = fUniqueSlaves;

   return Broadcast(mess, slaves);
}

//______________________________________________________________________________
 Int_t TProof::Broadcast(const char *str, Int_t kind, TList *slaves)
{
   // Broadcast a character string buffer to all slaves in the specified
   // list. Use kind to set the TMessage what field. Returns the number of
   // slaves the message was sent to. Returns -1 in case of error.

   TMessage mess(kind);
   if (str) mess.WriteString(str);
   return Broadcast(mess, slaves);
}

//______________________________________________________________________________
 Int_t TProof::Broadcast(const char *str, Int_t kind, ESlaves list)
{
   // Broadcast a character string buffer to all slaves in the specified
   // list (either all slaves or only the active slaves). Use kind to
   // set the TMessage what field. Returns the number of slaves the message
   // was sent to. Returns -1 in case of error.

   TMessage mess(kind);
   if (str) mess.WriteString(str);
   return Broadcast(mess, list);
}

//______________________________________________________________________________
 Int_t TProof::BroadcastObject(const TObject *obj, Int_t kind, TList *slaves)
{
   // Broadcast an object to all slaves in the specified list. Use kind to
   // set the TMEssage what field. Returns the number of slaves the message
   // was sent to. Returns -1 in case of error.

   TMessage mess(kind);
   mess.WriteObject(obj);
   return Broadcast(mess, slaves);
}

//______________________________________________________________________________
 Int_t TProof::BroadcastObject(const TObject *obj, Int_t kind, ESlaves list)
{
   // Broadcast an object to all slaves in the specified list. Use kind to
   // set the TMEssage what field. Returns the number of slaves the message
   // was sent to. Returns -1 in case of error.

   TMessage mess(kind);
   mess.WriteObject(obj);
   return Broadcast(mess, list);
}

//______________________________________________________________________________
 Int_t TProof::BroadcastRaw(const void *buffer, Int_t length, TList *slaves)
{
   // Broadcast a raw buffer of specified length to all slaves in the
   // specified list. Returns the number of slaves the buffer was sent to.
   // Returns -1 in case of error.

   if (!IsValid()) return -1;

   if (slaves->GetSize() == 0) return 0;

   int   nsent = 0;
   TIter next(slaves);

   TSlave *sl;
   while ((sl = (TSlave *)next())) {
      if (sl->IsValid()) {
         if (sl->GetSocket()->SendRaw(buffer, length) == -1)
            MarkBad(sl);
         else
            nsent++;
      }
   }

   return nsent;
}

//______________________________________________________________________________
 Int_t TProof::BroadcastRaw(const void *buffer, Int_t length, ESlaves list)
{
   // Broadcast a raw buffer of specified length to all slaves in the
   // specified list. Returns the number of slaves the buffer was sent to.
   // Returns -1 in case of error.

   TList *slaves = 0;
   if (list == kAll)    slaves = fSlaves;
   if (list == kActive) slaves = fActiveSlaves;
   if (list == kUnique) slaves = fUniqueSlaves;

   return BroadcastRaw(buffer, length, slaves);
}

//______________________________________________________________________________
 Int_t TProof::Collect(const TSlave *sl)
{
   // Collect responses from slave sl. Returns the number of slaves that
   // responded (=1).

   if (!sl->IsValid()) return 0;

   TMonitor *mon = fAllMonitor;

   mon->DeActivateAll();
   mon->Activate(sl->GetSocket());

   return Collect(mon);
}

//______________________________________________________________________________
 Int_t TProof::Collect(TList *slaves)
{
   // Collect responses from the slave servers. Returns the number of slaves
   // that responded.

   TMonitor *mon = fAllMonitor;
   mon->DeActivateAll();

   TIter next(slaves);
   TSlave *sl;
   while ((sl = (TSlave*) next())) {
      if (sl->IsValid())
         mon->Activate(sl->GetSocket());
   }

   return Collect(mon);
}

//______________________________________________________________________________
 Int_t TProof::Collect(ESlaves list)
{
   // Collect responses from the slave servers. Returns the number of slaves
   // that responded.

   TMonitor *mon = 0;
   if (list == kAll)    mon = fAllMonitor;
   if (list == kActive) mon = fActiveMonitor;
   if (list == kUnique) mon = fUniqueMonitor;

   mon->ActivateAll();

   return Collect(mon);
}

//______________________________________________________________________________
 Int_t TProof::Collect(TMonitor *mon)
{
   // Collect responses from the slave servers. Returns the number of messages
   // received. Can be 0 if there are no active slaves.

   fStatus = 0;
   if (!mon->GetActive()) return 0;

   DeActivateAsyncInput();

   int cnt = 0, loop = 1;

   fBytesRead = 0;
   fRealTime  = 0.0;
   fCpuTime   = 0.0;

   while (loop) {
      char      str[512];
      TMessage *mess;
      TSocket  *s;
      TSlave   *sl;
      TObject  *obj;
      Int_t     what;

      s = mon->Select();

      if (s->Recv(mess) < 0) {
         MarkBad(s);
         continue;
      }

      if (!mess) {
         // we get here in case the remote server died
         MarkBad(s);
         if (!mon->GetActive()) loop = 0;
         continue;
      }

      what = mess->What();

      switch (what) {

         case kMESS_OBJECT:
            obj = mess->ReadObject(mess->GetClass());
            if (obj->InheritsFrom(TH1::Class())) {
               TH1 *h = (TH1*)obj;
               h->SetDirectory(0);
               TH1 *horg = (TH1*)gDirectory->GetList()->FindObject(h->GetName());
               if (horg)
                  horg->Add(h);
               else
                  h->SetDirectory(gDirectory);
            }
            break;

         case kPROOF_FATAL:
            MarkBad(s);
            if (!mon->GetActive()) loop = 0;
            break;

         case kPROOF_GETOBJECT:
            mess->ReadString(str, sizeof(str));
            obj = gDirectory->Get(str);
            if (obj)
               s->SendObject(obj);
            else
               s->Send(kMESS_NOTOK);
            break;

         case kPROOF_GETPACKET:
            {
               TDSetElement *elem = 0;
               if (fPlayer) {
                  sl = FindSlave(s);
                  elem = fPlayer->GetNextPacket(sl, mess);
               }

               TMessage answ(kPROOF_GETPACKET);

               if (elem != 0) {
                  answ << kTRUE
                       << TString(elem->GetFileName())
                       << TString(elem->GetDirectory())
                       << TString(elem->GetObjName())
                       << elem->GetFirst()
                       << elem->GetNum();
               } else {
                  answ << kFALSE;
               }

               s->Send(answ);
            }
            break;

         case kPROOF_LOGFILE:
            {
               Int_t size;
               (*mess) >> size;
               RecvLogFile(s, size);
            }
            break;

         case kPROOF_LOGDONE:
            (*mess) >> fStatus >> fParallel;
            PDB(kGlobal,2) Info("Collect:kPROOF_LOGDONE","status %d  parallel %d",
               fStatus, fParallel);
            mon->DeActivate(s);
            if (!mon->GetActive()) loop = 0;
            break;

         case kPROOF_STATUS:
            if (IsMaster()) {
               sl = FindSlave(s);
               (*mess) >> sl->fBytesRead >> sl->fRealTime >> sl->fCpuTime
                       >> sl->fWorkDir;
               fBytesRead += sl->fBytesRead;
               fRealTime  += sl->fRealTime;
               fCpuTime   += sl->fCpuTime;
            } else {
               (*mess) >> fParallel;
            }
            mon->DeActivate(s);
            if (!mon->GetActive()) loop = 0;
            break;

         case kPROOF_OUTPUTLIST:
            {
               PDB(kGlobal,2) Info("Collect:kPROOF_OUTPUTLIST","Enter");
               TList *out = (TList *) mess->ReadObject(TList::Class());
               if (out) {
                  out->SetOwner();
                  fPlayer->StoreOutput(out); // Adopts the list
               }
            }
            break;

         case kPROOF_FEEDBACK:
            {
               PDB(kGlobal,2) Info("Collect:kPROOF_FEEDBACK","Enter");
               TList *out = (TList *) mess->ReadObject(TList::Class());
               out->SetOwner();
               sl = FindSlave(s);
               fPlayer->StoreFeedback(sl, out); // Adopts the list
            }
            break;

         case kPROOF_AUTOBIN:
            {
               TString name;
               Double_t xmin, xmax, ymin, ymax, zmin, zmax;

               (*mess) >> name >> xmin >> xmax >> ymin >> ymax >> zmin >> zmax;

               if (fPlayer != 0 ) {
                  fPlayer->UpdateAutoBin(name,xmin,xmax,ymin,ymax,zmin,zmax);
               }

               TMessage answ(kPROOF_AUTOBIN);

               answ << name << xmin << xmax << ymin << ymax << zmin << zmax;

               s->Send(answ);
            }
            break;

         case kPROOF_PROGRESS:
            {
               PDB(kGlobal,2) Info("Collect:kPROOF_PROGRESS","Enter");
               Long64_t total, processed;

               (*mess) >> total >> processed;

               fPlayer->Progress(total, processed);
            }
            break;

         case kPROOF_GETSLAVEINFO:
            {
               PDB(kGlobal,2) Info("Collect:kPROOF_GETSLAVEINFO","Enter");

               (*mess) >> fSlaveInfo;
               mon->DeActivate(s);
               if (!mon->GetActive()) loop = 0;
            }
            break;

         default:
            Error("Collect", "unknown command received from slave (%d)", what);
            break;
      }

      cnt++;
      delete mess;
   }

   // make sure group view is up to date
   SendGroupView();

   ActivateAsyncInput();

   return cnt;
}

//______________________________________________________________________________
 void TProof::ActivateAsyncInput()
{
   // Activate the a-sync input handler.

   TIter next(fSlaves);
   TSlave *sl;

   while ((sl = (TSlave*) next()))
      if (sl->GetInputHandler())
         sl->GetInputHandler()->Add();
}

//______________________________________________________________________________
 void TProof::DeActivateAsyncInput()
{
   // De-actiate a-sync input handler.

   TIter next(fSlaves);
   TSlave *sl;

   while ((sl = (TSlave*) next()))
      if (sl->GetInputHandler())
         sl->GetInputHandler()->Remove();
}

//______________________________________________________________________________
 void TProof::HandleAsyncInput(TSocket *sl)
{
   // Handle input coming from the master server (when this is a client)
   // or from a slave server (when this is a master server). This is mainly
   // for a-synchronous communication. Normally when PROOF issues a command
   // the (slave) server messages are directly handle by Collect().

   TMessage *mess;
   Int_t     what;

   if (sl->Recv(mess) <= 0)
      return;                // do something more intelligent here

   what = mess->What();

   switch (what) {

      case kPROOF_PING:
         // do nothing (ping is already acknowledged)
         break;

      default:
         Error("HandleAsyncInput", "unknown command %d", what);
         break;
   }

   delete mess;
}

//______________________________________________________________________________
 void TProof::MarkBad(TSlave *sl)
{
   // Add a bad slave server to the bad slave list and remove it from
   // the active list and from the two monitor objects.

   fActiveSlaves->Remove(sl);
   FindUniqueSlaves();
   fBadSlaves->Add(sl);

   fAllMonitor->Remove(sl->GetSocket());
   fActiveMonitor->Remove(sl->GetSocket());

   sl->Close();

   fSendGroupView = kTRUE;
}

//______________________________________________________________________________
 void TProof::MarkBad(TSocket *s)
{
   // Add slave with socket s to the bad slave list and remove if from
   // the active list and from the two monitor objects.

   TSlave *sl = FindSlave(s);
   MarkBad(sl);
}

//______________________________________________________________________________
 Int_t TProof::Ping()
{
   // Ping PROOF. Returns 1 if master server responded.

   return Ping(kActive);
}

//______________________________________________________________________________
 Int_t TProof::Ping(ESlaves list)
{
   // Ping PROOF slaves. Returns the number of slaves that responded.

   TMessage mess(kPROOF_PING | kMESS_ACK);
   return Broadcast(mess, list);
}

//______________________________________________________________________________
 void TProof::Print(Option_t *option) const
{
   // Print status of PROOF cluster.

   if (!IsMaster()) {
      Printf("Connected to:             %s (%s)", GetMaster(),
                                             IsValid() ? "valid" : "invalid");
      Printf("Port number:              %d", GetPort());
      Printf("User:                     %s", GetUser());
      Printf("Security context:         %s", fSecContext->AsString());
      TSlave *sl = (TSlave *)fActiveSlaves->First();
      Printf("Proofd protocol version:  %d", sl->GetSocket()->GetRemoteProtocol());
      Printf("Client protocol version:  %d", GetClientProtocol());
      Printf("Remote protocol version:  %d", GetRemoteProtocol());
      Printf("Log level:                %d", GetLogLevel());
      if (IsValid())
         ((TProof*)this)->SendPrint(option);
   } else {
      ((TProof*)this)->AskStatus();
      if (IsParallel())
         Printf("*** Master server (parallel mode, %d slaves):",
                GetNumberOfActiveSlaves());
      else
         Printf("*** Master server (sequential mode):");

      Printf("Master host name:         %s", gSystem->HostName());
      Printf("Port number:              %d", GetPort());
      Printf("User:                     %s", GetUser());
      Printf("Protocol version:         %d", GetClientProtocol());
      Printf("Image name:               %s", GetImage());
      Printf("Working directory:        %s", gSystem->WorkingDirectory());
      Printf("Config directory:         %s", GetConfDir());
      Printf("Config file:              %s", GetConfFile());
      Printf("Log level:                %d", GetLogLevel());
      Printf("Number of slaves:         %d", GetNumberOfSlaves());
      Printf("Number of active slaves:  %d", GetNumberOfActiveSlaves());
      Printf("Number of unique slaves:  %d", GetNumberOfUniqueSlaves());
      Printf("Number of bad slaves:     %d", GetNumberOfBadSlaves());
      Printf("Total MB's processed:     %.2f", float(GetBytesRead())/(1024*1024));
      Printf("Total real time used (s): %.3f", GetRealTime());
      Printf("Total CPU time used (s):  %.3f", GetCpuTime());
      if (TString(option).Contains("a") && GetNumberOfSlaves()) {
         Printf("List of slaves:");
         fSlaves->ForEach(TSlave,Print)(option);
      }
   }
}

//______________________________________________________________________________
 Int_t TProof::Process(TDSet *set, const char *selector, Option_t *option,
                      Long64_t nentries, Long64_t first, TEventList *evl)
{
   // Process a data set (TDSet) using the specified selector (.C) file.
   // Returns -1 in case of error, 0 otherwise.

   if (!IsValid()) return -1;

   if (!fPlayer)
      fPlayer = new TProofPlayerRemote(this);

   if (fProgressDialog)
      fProgressDialog->ExecPlugin(5, this, selector, set->GetListOfElements()->GetSize(),
                                  first, nentries);

   return fPlayer->Process(set, selector, option, nentries, first, evl);
}

//______________________________________________________________________________
 Int_t TProof::DrawSelect(TDSet *set, const char *varexp, const char *selection, Option_t *option,
                         Long64_t nentries, Long64_t first)
{
   // Process a data set (TDSet) using the specified selector (.C) file.
   // Returns -1 in case of error, 0 otherwise.

   if (!IsValid()) return -1;

   if (!fPlayer)
      fPlayer = new TProofPlayerRemote(this);

   return fPlayer->DrawSelect(set, varexp, selection, option, nentries, first);
}

//______________________________________________________________________________
 void TProof::StopProcess(Bool_t abort)
{
   if (fPlayer != 0) fPlayer->StopProcess(abort);
}

//______________________________________________________________________________
 void TProof::AddInput(TObject *obj)
{
   // Add objects that might be needed during the processing of
   // the selector (see Process()).

   if (!fPlayer)
      fPlayer = new TProofPlayerRemote(this);

   fPlayer->AddInput(obj);
}

//______________________________________________________________________________
 void TProof::ClearInput()
{
   // Clear input object list.

   if (fPlayer) {
      fPlayer->ClearInput();
      if (fFeedback != 0) AddInput(fFeedback);
   }
}

//______________________________________________________________________________
 TObject *TProof::GetOutput(const char *name)
{
   // Get specified object that has been produced during the processing
   // (see Process()).

   if (fPlayer)
      return fPlayer->GetOutput(name);
   return 0;
}

//______________________________________________________________________________
 TList *TProof::GetOutputList()
{
   // Get list with all object created during processing (see Process()).

   if (fPlayer)
      return fPlayer->GetOutputList();
   return 0;
}

//______________________________________________________________________________
 void TProof::RecvLogFile(TSocket *s, Int_t size)
{
   // Receive the log file of the slave with socket s.

   const Int_t kMAXBUF = 16384;  //32768  //16384  //65536;
   char buf[kMAXBUF];

   Int_t  left, r;
   Long_t filesize = 0;

   while (filesize < size) {
      left = Int_t(size - filesize);
      if (left > kMAXBUF)
         left = kMAXBUF;
      r = s->RecvRaw(&buf, left);
      if (r > 0) {
         char *p = buf;

         filesize += r;
         while (r) {
            Int_t w;

            w = write(fileno(stdout), p, r);

            if (w < 0) {
               SysError("RecvLogFile", "error writing to stdout");
               break;
            }
            r -= w;
            p += w;
         }
      } else if (r < 0) {
         Error("RecvLogFile", "error during receiving log file");
         break;
      }
   }
}

//______________________________________________________________________________
 Int_t TProof::SendGroupView()
{
   // Send to all active slaves servers the current slave group size
   // and their unique id. Returns number of active slaves.
   // Returns -1 in case of error.

   if (!IsValid()) return -1;
   if (!IsMaster()) return 0;
   if (!fSendGroupView) return 0;
   fSendGroupView = kFALSE;

   TIter   next(fActiveSlaves);
   TSlave *sl;

   int  bad = 0, cnt = 0, size = GetNumberOfActiveSlaves();
   char str[32];

   while ((sl = (TSlave *)next())) {
      sprintf(str, "%d %d", cnt, size);
      if (sl->GetSocket()->Send(str, kPROOF_GROUPVIEW) == -1) {
         MarkBad(sl);
         bad++;
      } else
         cnt++;
   }

   // Send the group view again in case there was a change in the
   // group size due to a bad slave

   if (bad) SendGroupView();

   return GetNumberOfActiveSlaves();
}

//______________________________________________________________________________
 Int_t TProof::Exec(const char *cmd)
{
   // Send command to be executed on the PROOF master and/or slaves.
   // Command can be any legal command line command. Commands like
   // ".x file.C" or ".L file.C" will cause the file file.C to be send
   // to the PROOF cluster. Returns -1 in case of error, >=0 in case of
   // succes.

   return Exec(cmd, kActive);
}

//______________________________________________________________________________
 Int_t TProof::Exec(const char *cmd, ESlaves list)
{
   // Send command to be executed on the PROOF master and/or slaves.
   // Command can be any legal command line command. Commands like
   // ".x file.C" or ".L file.C" will cause the file file.C to be send
   // to the PROOF cluster. Returns -1 in case of error, >=0 in case of
   // succes.

   if (!IsValid()) return -1;

   TString s = cmd;
   s = s.Strip(TString::kBoth);

   if (!s.Length()) return 0;

   // check for macro file and make sure the file is available on all slaves
   if (s.BeginsWith(".L") || s.BeginsWith(".x") || s.BeginsWith(".X")) {
      TString file = s(2, s.Length());
      TString acm, arg, io;
      TString filename = gSystem->SplitAclicMode(file, acm, arg, io);
      char *fn = gSystem->Which(TROOT::GetMacroPath(), filename, kReadPermission);
      if (fn) {
         if (GetNumberOfUniqueSlaves() > 0) {
            if (SendFile(fn, kFALSE) < 0) {
               Error("Exec", "file %s could not be transfered", fn);
               delete [] fn;
               return -1;
            }
         } else {
            TString scmd = s(0,3) + fn;
            Int_t n = SendCommand(scmd, list);
            delete [] fn;
            return n;
         }
      } else {
         Error("Exec", "macro %s not found", file.Data());
         return -1;
      }
      delete [] fn;
   }

   return SendCommand(cmd, list);
}

//______________________________________________________________________________
 Int_t TProof::SendCommand(const char *cmd, ESlaves list)
{
   // Send command to be executed on the PROOF master and/or slaves.
   // Command can be any legal command line command, however commands
   // like ".x file.C" or ".L file.C" will not cause the file.C to be
   // transfered to the PROOF cluster. In that case use TProof::Exec().
   // Returns the status send by the remote server as part of the
   // kPROOF_LOGDONE message. Typically this is the return code of the
   // command on the remote side. Returns -1 in case of error.

   if (!IsValid()) return -1;

   Broadcast(cmd, kMESS_CINT, list);
   Collect(list);

   return fStatus;
}

//______________________________________________________________________________
 Int_t TProof::SendCurrentState(ESlaves list)
{
   // Transfer the current state of the master to the active slave servers.
   // The current state includes: the current working directory, etc.
   // Returns the number of active slaves. Returns -1 in case of error.

   if (!IsValid()) return -1;

   // Go to the new directory, reset the interpreter environment and
   // tell slave to delete all objects from its new current directory.
   Broadcast(gDirectory->GetPath(), kPROOF_RESET, list);

   return GetNumberOfActiveSlaves();
}

//______________________________________________________________________________
 Int_t TProof::SendInitialState()
{
   // Transfer the initial (i.e. current) state of the master to all
   // slave servers. Currently the initial state includes: log level.
   // Returns the number of active slaves. Returns -1 in case of error.

   if (!IsValid()) return -1;

   SetLogLevel(fLogLevel, gProofDebugMask);

   return GetNumberOfActiveSlaves();
}

//______________________________________________________________________________
 Long_t TProof::CheckFile(const char *file, TSlave *slave)
{
   // Check if a file needs to be send to the slave. Use the following
   // algorithm:
   //   - check if file appears in file map
   //     - if yes, get file's modtime and check against time in map,
   //       if modtime not same get md5 and compare against md5 in map,
   //       if not same return size
   //     - if no, get file's md5 and modtime and store in file map, ask
   //       slave if file exists with specific md5, if yes return 0,
   //       if no return file's size
   // Returns size of file in case file needs to be send, returns 0 in case
   // file is already on remote and -1 in case of error.

   Long64_t size;
   Long_t id, flags, modtime;
   if (gSystem->GetPathInfo(file, &id, &size, &flags, &modtime) == 1) {
      Error("CheckFile", "cannot stat file %s", file);
      return -1;
   }
   if (size == 0) {
      Error("CheckFile", "empty file %s", file);
      return -1;
   }

   Bool_t sendto = kFALSE;

   // create slave based filename
   TString sn = slave->GetName();
   sn += ":";
   sn += slave->GetOrdinal();
   sn += ":";
   sn += gSystem->BaseName(file);

   // check if file is in map
   FileMap_t::const_iterator it;
   if ((it = fFileMap.find(sn)) != fFileMap.end()) {
      // file in map
      MD5Mod_t md = (*it).second;
      if (md.fModtime != modtime) {
         TMD5 *md5 = TMD5::FileChecksum(file);
         if ((*md5) != md.fMD5) {
            sendto       = kTRUE;
            md.fMD5      = *md5;
            md.fModtime  = modtime;
            fFileMap[sn] = md;
            // When on the master, the master and/or slaves may share
            // their file systems and cache. Therefore always make a
            // check for the file. If the file already exists with the
            // expected md5 the kPROOF_CHECKFILE command will cause the
            // file to be copied from cache to slave sandbox.
            if (IsMaster()) {
               sendto = kFALSE;
               TMessage mess(kPROOF_CHECKFILE);
               mess << TString(gSystem->BaseName(file)) << md.fMD5;
               slave->GetSocket()->Send(mess);

               TMessage *reply;
               slave->GetSocket()->Recv(reply);
               if (reply->What() != kPROOF_CHECKFILE)
                  sendto = kTRUE;
               delete reply;
            }
         }
         delete md5;
      }
   } else {
      // file not in map
      TMD5 *md5 = TMD5::FileChecksum(file);
      MD5Mod_t md;
      md.fMD5      = *md5;
      md.fModtime  = modtime;
      fFileMap[sn] = md;
      delete md5;
      TMessage mess(kPROOF_CHECKFILE);
      mess << TString(gSystem->BaseName(file)) << md.fMD5;
      slave->GetSocket()->Send(mess);

      TMessage *reply;
      slave->GetSocket()->Recv(reply);
      if (reply->What() != kPROOF_CHECKFILE)
         sendto = kTRUE;
      delete reply;
   }

   if (sendto)
      return size;

   return 0;
}

//______________________________________________________________________________
 Int_t TProof::SendFile(const char *file, Bool_t bin)
{
   // Send a file to master or slave servers. Returns number of slaves
   // the file was sent to, maybe 0 in case master and slaves have the same
   // file system image, -1 in case of error. If bin is true binary
   // file transfer is used, otherwise ASCII mode.

   if (!IsValid()) return -1;

   TList *slaves = fActiveSlaves;

   if (slaves->GetSize() == 0) return 0;

#ifndef R__WIN32
   Int_t fd = open(file, O_RDONLY);
#else
   Int_t fd = open(file, O_RDONLY | O_BINARY);
#endif
   if (fd < 0) {
      SysError("SendFile", "cannot open file %s", file);
      return -1;
   }

   const Int_t kMAXBUF = 32768;  //16384  //65536;
   char buf[kMAXBUF];
   Int_t nsl = 0;

   TIter next(slaves);
   TSlave *sl;
   while ((sl = (TSlave *)next())) {
      if (!sl->IsValid())
         continue;

      Long_t size = CheckFile(file, sl);
      // if on client and size==0 broadcast anyway the kPROOF_SENDFILE command
      // to the master so that the master can propagate the file to possibly
      // newly added slaves
      if (IsMaster() && size == 0)
         continue;

      PDB(kPackage,2)
         if (size > 0) {
            if (!nsl)
               Info("SendFile", "sending file %s to:", file);
            printf("   slave = %s:%d\n", sl->GetName(), sl->GetOrdinal());
         }

      sprintf(buf, "%s %d %ld", gSystem->BaseName(file), bin, size);
      if (sl->GetSocket()->Send(buf, kPROOF_SENDFILE) == -1) {
         MarkBad(sl);
         continue;
      }

      if (size == 0)
         continue;

      lseek(fd, 0, SEEK_SET);

      Int_t len;
      do {
         while ((len = read(fd, buf, kMAXBUF)) < 0 && TSystem::GetErrno() == EINTR)
            TSystem::ResetErrno();

         if (len < 0) {
            SysError("SendFile", "error reading from file %s", file);
            Interrupt(kSoftInterrupt, kActive);
            close(fd);
            return -1;
         }

         if (sl->GetSocket()->SendRaw(buf, len) == -1) {
            SysError("SendFile", "error writing to slave %s:%d (now offline)",
                     sl->GetName(), sl->GetOrdinal());
            MarkBad(sl);
            break;
         }

      } while (len > 0);

      nsl++;
   }

   close(fd);

   return nsl;
}

//______________________________________________________________________________
 Int_t TProof::SendObject(const TObject *obj, ESlaves list)
{
   // Send object to master or slave servers. Returns number of slaves object
   // was sent to, -1 in case of error.

   if (!IsValid() || !obj) return -1;

   TMessage mess(kMESS_OBJECT);

   mess.WriteObject(obj);
   return Broadcast(mess, list);
}

//______________________________________________________________________________
 Int_t TProof::SendPrint(Option_t *option)
{
   // Send print command to master server. Returns number of slaves message
   // was sent to. Returns -1 in case of error.

   if (!IsValid()) return -1;

   Broadcast(option, kPROOF_PRINT, kActive);
   return Collect(kActive);
}

//______________________________________________________________________________
 void TProof::SetLogLevel(Int_t level, UInt_t mask)
{
   // Set server logging level.

   char str[32];
   fLogLevel        = level;
   gProofDebugLevel = level;
   gProofDebugMask  = (TProofDebug::EProofDebugMask) mask;
   sprintf(str, "%d %u", level, mask);
   Broadcast(str, kPROOF_LOGLEVEL, kAll);
}

//______________________________________________________________________________
 Int_t TProof::SetParallel(Int_t nodes)
{
   // Tell RPOOF how many slaves to use in parallel. Returns the number of
   // parallel slaves. Returns -1 in case of error.

   if (!IsValid()) return -1;

   if (IsMaster()) {
      GoParallel(nodes);
      return SendCurrentState();
   } else {
      PDB(kGlobal,1) Info("SetParallel", "request %d node%s", nodes,
         nodes == 1 ? "" : "s");
      TMessage mess(kPROOF_PARALLEL);
      mess << nodes;
      Broadcast(mess);
      Collect();
      PDB(kGlobal,1) Info("SetParallel", "got %d node%s", fParallel,
         fParallel == 1 ? "" : "s");

      return fParallel;
   }
}

//______________________________________________________________________________
 Int_t TProof::GoParallel(Int_t nodes)
{
   // Go in parallel mode with at most "nodes" slaves. Since the fSlaves
   // list is sorted by slave performace the active list will contain first
   // the most performant nodes. Returns the number of active slaves.
   // Returns -1 in case of error.

   if (!IsValid()) return -1;

   if (nodes < 0) nodes = 0;

   fActiveSlaves->Clear();
   fActiveMonitor->RemoveAll();

   TIter next(fSlaves);

   int cnt = 0;
   TSlave *sl;
   while (cnt < nodes && (sl = (TSlave *)next())) {
      if (sl->IsValid()) {
         fActiveSlaves->Add(sl);
         fActiveMonitor->Add(sl->GetSocket());
         cnt++;
      }
   }

   // Will be activated in Collect
   fActiveMonitor->DeActivateAll();

   // Get slave status (will set the slaves fWorkDir correctly)
   AskStatus();

   // Find active slaves with unique image
   FindUniqueSlaves();

   // Send new group-view to slaves
   SendGroupView();

   Int_t n = GetNumberOfActiveSlaves();
   if (IsMaster()) {
      if (n > 0)
         printf("PROOF set to parallel mode (%d slaves)\n", n);
      else
         printf("PROOF set to sequential mode\n");
   }

   PDB(kGlobal,1) Info("GoParallel", "got %d node%s", n, n == 1 ? "" : "s");
   return n;
}

//______________________________________________________________________________
 void TProof::ShowCache(Bool_t all)
{
   // List contents of file cache. If all is true show all caches also on
   // slaves. If everything is ok all caches are to be the same.

   if (!IsValid()) return;

   TMessage mess(kPROOF_CACHE);
   mess << Int_t(1) << all;
   Broadcast(mess, kUnique);
   Collect(kUnique);
}

//______________________________________________________________________________
 void TProof::ClearCache()
{
    // Remove files from all file caches.

   if (!IsValid()) return;

   TMessage mess(kPROOF_CACHE);
   mess << Int_t(2);
   Broadcast(mess, kUnique);
   Collect(kUnique);
   // clear file map so files get send again to remote nodes
   fFileMap.clear();
}

//______________________________________________________________________________
 void TProof::ShowPackages(Bool_t all)
{
   // List contents of package directory. If all is true show all package
   // directries also on slaves. If everything is ok all package directories
   // should be the same.

   if (!IsValid()) return;

   TMessage mess(kPROOF_CACHE);
   mess << Int_t(3) << all;
   Broadcast(mess, kUnique);
   Collect(kUnique);
}

//______________________________________________________________________________
 void TProof::ShowEnabledPackages(Bool_t all)
{
   // List which packages are enabled. If all is true show enabled packages
   // for all active slaves. If everything is ok all active slaves should
   // have the same packages enabled.

   if (!IsValid()) return;

   TMessage mess(kPROOF_CACHE);
   mess << Int_t(8) << all;
   Broadcast(mess);
   Collect();
}

//______________________________________________________________________________
 void TProof::ClearPackages()
{
   // Remove all packages.

   if (!IsValid()) return;

   TMessage mess(kPROOF_CACHE);
   mess << Int_t(4);
   Broadcast(mess, kUnique);
   Collect(kUnique);
}

//______________________________________________________________________________
 void TProof::ClearPackage(const char *package)
{
   // Remove a specific package.

   if (!IsValid()) return;

   if (!package || !strlen(package)) {
      Error("ClearPackage", "need to specify a package name");
      return;
   }

   // if name, erroneously, is a par pathname strip off .par and path
   TString pac = package;
   if (pac.EndsWith(".par"))
      pac.Remove(pac.Length()-4);
   pac = gSystem->BaseName(pac);

   TMessage mess(kPROOF_CACHE);
   mess << Int_t(5) << pac;
   Broadcast(mess, kUnique);
   Collect(kUnique);
}

//______________________________________________________________________________
 Int_t TProof::BuildPackage(const char *package)
{
   // Build specified package. Executes the PROOF-INF/BUILD.sh
   // script if it exists on all unique nodes.
   // Returns 0 in case of success and -1 in case of error.

   if (!IsValid()) return -1;

   if (!package || !strlen(package)) {
      Error("BuildPackage", "need to specify a package name");
      return -1;
   }

   // if name, erroneously, is a par pathname strip off .par and path
   TString pac = package;
   if (pac.EndsWith(".par"))
      pac.Remove(pac.Length()-4);
   pac = gSystem->BaseName(pac);

   TMessage mess(kPROOF_CACHE);
   mess << Int_t(6) << pac;
   Broadcast(mess, kUnique);
   Collect(kUnique);

   return fStatus;
}

//______________________________________________________________________________
 Int_t TProof::LoadPackage(const char *package)
{
   // Load specified package. Executes the PROOF-INF/SETUP.C script
   // on all active nodes.
   // Returns 0 in case of success and -1 in case of error.

   if (!IsValid()) return -1;

   if (!package || !strlen(package)) {
      Error("LoadPackage", "need to specify a package name");
      return -1;
   }

   // if name, erroneously, is a par pathname strip off .par and path
   TString pac = package;
   if (pac.EndsWith(".par"))
      pac.Remove(pac.Length()-4);
   pac = gSystem->BaseName(pac);

   TMessage mess(kPROOF_CACHE);
   mess << Int_t(7) << pac;
   Broadcast(mess);
   Collect();

   return fStatus;
}

//______________________________________________________________________________
 Int_t TProof::EnablePackage(const char *package)
{
   // Enable specified package. Executes the PROOF-INF/BUILD.sh
   // script if it exists followed by the PROOF-INF/SETUP.C script.
   // Returns 0 in case of success and -1 in case of error.

   if (!IsValid()) return -1;

   if (!package || !strlen(package)) {
      Error("EnablePackage", "need to specify a package name");
      return -1;
   }

   // if name, erroneously, is a par pathname strip off .par and path
   TString pac = package;
   if (pac.EndsWith(".par"))
      pac.Remove(pac.Length()-4);
   pac = gSystem->BaseName(pac);

   if (BuildPackage(pac) == -1)
      return -1;

   if (LoadPackage(pac) == -1)
      return -1;

   return 0;
}

//______________________________________________________________________________
 Int_t TProof::UploadPackage(const char *tpar, Int_t parallel)
{
   // Upload a PROOF archive (PAR file). A PAR file is a compressed
   // tar file with one special additional directory, PROOF-INF
   // (blatantly copied from Java's jar format). It must have the extension
   // .par. A PAR file can be directly a binary or a source with a build
   // procedure. In the PROOF-INF directory there can be a build script:
   // BUILD.sh to be called to build the package, in case of a binary PAR
   // file don't specify a build script or make it a no-op. Then there is
   // SETUP.C which sets the right environment variables to use the package,
   // like LD_LIBRARY_PATH, etc. Parallel is the number of parallel streams
   // that can be used to upload the package to the master server.
   // Returns 0 in case of success and -1 in case of error.

   if (!IsValid()) return -1;

   TString par = tpar;
   if (!par.EndsWith(".par")) {
      Error("UploadPackage", "package %s must have extension .par", tpar);
      return -1;
   }

   gSystem->ExpandPathName(par);

   if (gSystem->AccessPathName(par, kReadPermission)) {
      Error("UploadPackage", "package %s does not exist", par.Data());
      return -1;
   }

   // Strategy: get md5 of package and check if it is different from the
   // one stored on the remote node. If it is different lock the remote
   // package directory and use TFTP to ftp the package to the remote node,
   // unlock the directory.

   TMD5 *md5 = TMD5::FileChecksum(par);
   TMessage mess(kPROOF_CHECKFILE);
   mess << TString("+")+TString(gSystem->BaseName(par)) << (*md5);
   TMessage mess2(kPROOF_CHECKFILE);
   mess2 << TString("-")+TString(gSystem->BaseName(par)) << (*md5);
   delete md5;

   // loop over all unique nodes
   TIter next(fUniqueSlaves);
   TSlave *sl;
   while ((sl = (TSlave *) next())) {
      if (!sl->IsValid())
         continue;

      sl->GetSocket()->Send(mess);

      TMessage *reply;
      sl->GetSocket()->Recv(reply);
      if (reply->What() != kPROOF_CHECKFILE) {
         // remote directory is locked, upload file via TFTP
         if (IsMaster())
            parallel = 1;  // assume LAN
         {
            TFTP ftp(TString("root://")+sl->GetName(), parallel);
            if (!ftp.IsZombie()) {
               ftp.cd(Form("%s/%s", kPROOF_WorkDir, kPROOF_PackDir));
               ftp.put(par, gSystem->BaseName(par));
            }
         }
         // install package and unlock dir
         sl->GetSocket()->Send(mess2);
         delete reply;
         sl->GetSocket()->Recv(reply);
         if (reply->What() != kPROOF_CHECKFILE) {
            Error("UploadPackage", "unpacking of package %s failed", par.Data());
            delete reply;
            return -1;
         }
      }
      delete reply;
   }

   return 0;
}

//______________________________________________________________________________
 void TProof::Progress(Long64_t total, Long64_t processed)
{
   // Get query progress information. Connect a slot to this signal
   // to track progress.

   PDB(kGlobal,1)
      Info("Progress","%2f (%lld/%lld)", 100.*processed/total, processed, total);

   Long_t parm[2];
   parm[0] = (Long_t) (&total);
   parm[1] = (Long_t) (&processed);
   Emit("Progress(Long64_t,Long64_t)", parm);
}

//______________________________________________________________________________
 void TProof::SetActive(Bool_t active)
{
   // Suspend or resume PROOF via Condor.

   if (fCondor) {
      if (active) {
         PDB(kCondor,1) Info("SetActive","-- Condor Resume --");
         fCondor->Resume();
      } else {
         PDB(kCondor,1) Info("SetActive","-- Condor Suspend --");
         fCondor->Suspend();
      }
   }
}

//______________________________________________________________________________
 void TProof::Feedback(TList *objs)
{
   // Get list of feedback objects. Connect a slot to this signal
   // to monitor the feedback object.

   PDB(kGlobal,1) Info("Feedback","%d Objects", objs->GetSize());
   PDB(kFeedback,1) {
      Info("Feedback","%d objects", objs->GetSize());
      objs->ls();
   }

   Emit("Feedback(TList *objs)", (Long_t) objs);
}

//______________________________________________________________________________
 void TProof::AddFeedback(const char *name)
{
   if (fFeedback == 0) {
      fFeedback = new TList;
      fFeedback->SetOwner();
      fFeedback->SetName("FeedbackList");
      AddInput(fFeedback);
   }

   if (fFeedback->FindObject(name) == 0) fFeedback->Add(new TObjString(name));
}

//______________________________________________________________________________
 void TProof::RemoveFeedback(const char *name)
{
   if (fFeedback == 0) return;

   TObject *obj = fFeedback->FindObject(name);
   if (obj != 0) {
      fFeedback->Remove(obj);
      delete obj;
   }
}

//______________________________________________________________________________
 void TProof::ClearFeedback()
{
   if (fFeedback == 0) return;

   fFeedback->Delete();
}

//______________________________________________________________________________
 void TProof::ShowFeedback() const
{
   if (fFeedback == 0 || fFeedback->GetSize() == 0) {
      Info("","no feedback requested");
      return;
   }

   fFeedback->Print();
}

//______________________________________________________________________________
 TList*  TProof::GetFeedbackList() const
{
   return fFeedback;
}


ROOT page - Class index - Class Hierarchy - Top of the page

This page has been automatically generated. If you have any comments or suggestions about the page layout send a mail to ROOT support, or contact the developers with any questions or problems regarding ROOT.