// @(#)root/proof:$Name:  $:$Id: TProofPlayer.cxx,v 1.38 2004/07/09 01:34:51 rdm Exp $
// Author: Maarten Ballintijn   07/01/02

/*************************************************************************
 * Copyright (C) 1995-2001, Rene Brun and Fons Rademakers.               *
 * All rights reserved.                                                  *
 *                                                                       *
 * For the licensing terms see $ROOTSYS/LICENSE.                         *
 * For the list of contributors see $ROOTSYS/README/CREDITS.             *
 *************************************************************************/

//////////////////////////////////////////////////////////////////////////
//                                                                      //
// TProofPlayer                                                         //
//                                                                      //
//////////////////////////////////////////////////////////////////////////

#include "TProofPlayer.h"

#include "THashList.h"
#include "TEventIter.h"
#include "TVirtualPacketizer.h"
#include "TPacketizer.h"
#include "TPacketizer2.h"
#include "TSelector.h"
#include "TSocket.h"
#include "TProofServ.h"
#include "TProof.h"
#include "TSlave.h"
#include "TROOT.h"
#include "TError.h"
#include "MessageTypes.h"
#include "TMessage.h"
#include "TDSetProxy.h"
#include "TString.h"
#include "TSystem.h"
#include "TFile.h"
#include "TProofDebug.h"
#include "TTimer.h"
#include "TMap.h"
#include "TPerfStats.h"
#include "TStatus.h"

#include "Api.h"


class TAutoBinVal : public TObjString {
private:
   Double_t fXmin, fXmax, fYmin, fYmax, fZmin, fZmax;

public:
   TAutoBinVal(const char *name, Double_t xmin, Double_t xmax, Double_t ymin,
               Double_t ymax, Double_t zmin, Double_t zmax) : TObjString(name)
   {
      fXmin = xmin; fXmax = xmax;
      fYmin = ymin; fYmax = ymax;
      fZmin = zmin; fZmax = zmax;
   }
   void GetAll(Double_t& xmin, Double_t& xmax, Double_t& ymin,
               Double_t& ymax, Double_t& zmin, Double_t& zmax)
   {
      xmin = fXmin; xmax = fXmax;
      ymin = fYmin; ymax = fYmax;
      zmin = fZmin; zmax = fZmax;
   }

};


//------------------------------------------------------------------------------


ClassImp(TProofPlayer)

//______________________________________________________________________________
TProofPlayer::TProofPlayer()
   : fAutoBins(0), fOutput(0), fSelector(0), fSelectorClass(0),
     fFeedbackTimer(0), fEvIter(0), fSelStatus(0)
{
   // Default ctor.

   fInput         = new TList;
}

//______________________________________________________________________________
TProofPlayer::~TProofPlayer()
{
   delete fInput;
   if (fSelectorClass && fSelectorClass->IsLoaded()) delete fSelector;
   delete fFeedbackTimer;
   delete fEvIter;
}

//______________________________________________________________________________
void TProofPlayer::StopProcess(Bool_t abort)
{
   if (fEvIter != 0) fEvIter->StopProcess(abort);
}

//______________________________________________________________________________
void TProofPlayer::AddInput(TObject *inp)
{
   fInput->Add(inp);
}

//______________________________________________________________________________
void TProofPlayer::ClearInput()
{
   fInput->Clear();
}

//______________________________________________________________________________
TObject *TProofPlayer::GetOutput(const char *name) const
{
   if (fOutput != 0) {
      return fOutput->FindObject(name);
   } else {
      return 0;
   }
}

//______________________________________________________________________________
TList *TProofPlayer::GetOutputList() const
{
   return fOutput;
}

//______________________________________________________________________________
void TProofPlayer::StoreOutput(TList *)
{
   MayNotUse("StoreOutput");
}

//______________________________________________________________________________
void TProofPlayer::StoreFeedback(TObject *, TList *)
{
   MayNotUse("StoreFeedback");
}

//______________________________________________________________________________
void TProofPlayer::Progress(Long64_t total, Long64_t processed)
{
   PDB(kGlobal,1)
      Info("Progress","%2f (%lld/%lld)", 100.*processed/total, processed, total);

   Long_t parm[2];
   parm[0] = (Long_t) (&total);
   parm[1] = (Long_t) (&processed);
   Emit("Progress(Long64_t,Long64_t)", parm);

   gProof->Progress(total, processed);
}

//______________________________________________________________________________
void TProofPlayer::Feedback(TList *objs)
{
   PDB(kGlobal,1) Info("Feedback","%d Objects", objs->GetSize());
   PDB(kFeedback,1) {
      Info("Feedback","%d Objects", objs->GetSize());
      objs->ls();
   }

   Emit("Feedback(TList *objs)", (Long_t) objs);

   gProof->Feedback(objs);
}

//______________________________________________________________________________
Int_t TProofPlayer::Process(TDSet *dset, const char *selector_file,
                            Option_t *option, Long64_t nentries, Long64_t first,
                            TEventList * /*evl*/)
{
   PDB(kGlobal,1) Info("Process","Enter");

   fOutput = 0;
   if (fSelectorClass && fSelectorClass->IsLoaded()) delete fSelector;
   fSelector = TSelector::GetSelector(selector_file);

   if ( !fSelector ) {
      fSelectorClass = 0;
      Error("Process", "Cannot load: %s", selector_file );
      return -1;
   }

   fSelectorClass = fSelector->IsA();
   Int_t version = fSelector->Version();

   fOutput = fSelector->GetOutputList();

   TPerfStats::Start(fInput, fOutput);

   fSelStatus = new TStatus;
   fOutput->Add(fSelStatus);

   TCleanup clean(this);
   SetupFeedback();

   fSelector->SetOption(option);
   fSelector->SetInputList(fInput);

   dset->Reset();

   fEvIter = TEventIter::Create(dset, fSelector, first, nentries);


   if (version == 0) {
      PDB(kLoop,1) Info("Process","Call Begin(0)");
      fSelector->Begin(0);
   } else {
      if (gProof != 0 && !gProof->IsMaster()) {
         // on client (for local run)
         PDB(kLoop,1) Info("Process","Call Begin(0)");
         fSelector->Begin(0);
      }
      if (fSelStatus->IsOk()) {
         PDB(kLoop,1) Info("Process","Call SlaveBegin(0)");
         fSelector->SlaveBegin(0);  // Init is called explicitly
                                    // from GetNextEvent()
      }
   }

   PDB(kLoop,1) Info("Process","Looping over Process()");

   // Loop over range
   Long64_t entry;
   while (fSelStatus->IsOk() && (entry = fEvIter->GetNextEvent()) >= 0 && fSelStatus->IsOk()) {

      if(version == 0) {
         PDB(kLoop,3)Info("Process","Call ProcessCut(%lld)", entry);
         if(fSelector->ProcessCut(entry)) {
            PDB(kLoop,3)Info("Process","Call ProcessFill(%lld)", entry);
            fSelector->ProcessFill(entry);
         }
      } else {
         PDB(kLoop,3)Info("Process","Call Process(%lld)", entry);
         fSelector->Process(entry);
      }

      gSystem->DispatchOneEvent(kTRUE);
      if (gROOT->IsInterrupted()) break;
   }

   StopFeedback();

   delete fEvIter; fEvIter = 0;

   // Finalize

   if (fSelStatus->IsOk()) {
      if (version == 0) {
         PDB(kLoop,1) Info("Process","Call Terminate()");
         fSelector->Terminate();
      } else {
         PDB(kLoop,1) Info("Process","Call SlaveTerminate()");
         fSelector->SlaveTerminate();
         if (gProof != 0 && !gProof->IsMaster() && fSelStatus->IsOk()) {
            PDB(kLoop,1) Info("Process","Call Terminate()");
            fSelector->Terminate();
         }
      }
   }

   TPerfStats::Stop();

   return 0;
}

//______________________________________________________________________________
void TProofPlayer::UpdateAutoBin(const char *name,
                                 Double_t& xmin, Double_t& xmax,
                                 Double_t& ymin, Double_t& ymax,
                                 Double_t& zmin, Double_t& zmax)
{
   if ( fAutoBins == 0 ) {
      fAutoBins = new THashList;
   }

   TAutoBinVal *val = (TAutoBinVal*) fAutoBins->FindObject(name);

   if ( val == 0 ) {
      val = new TAutoBinVal(name,xmin,xmax,ymin,ymax,zmin,zmax);
      fAutoBins->Add(val);
   } else {
      val->GetAll(xmin,xmax,ymin,ymax,zmin,zmax);
   }
}

//______________________________________________________________________________
TDSetElement *TProofPlayer::GetNextPacket(TSlave *, TMessage *)
{
   MayNotUse("GetNextPacket");
   return 0;
}

//______________________________________________________________________________
void TProofPlayer::SetupFeedback()
{
   MayNotUse("SetupFeedback");
}

//______________________________________________________________________________
void TProofPlayer::StopFeedback()
{
   MayNotUse("StopFeedback");
}

//______________________________________________________________________________
Int_t TProofPlayer::DrawSelect(TDSet *set, const char *varexp,
                               const char *selection, Option_t *option,
                               Long64_t nentries, Long64_t firstentry)
{
   TNamed *varexpobj = new TNamed("varexp", varexp);
   TNamed *selectionobj = new TNamed("selection", selection);

   fInput->Clear();  // good idea? what about a feedbacklist, but old query
                     // could have left objs? clear at end? no, may want to
                     // rerun, separate player?

   fInput->Add(varexpobj);
   fInput->Add(selectionobj);

   Int_t r = Process(set, "TProofDraw", option, nentries, firstentry);

   fInput->Remove(varexpobj);
   fInput->Remove(selectionobj);
   delete varexpobj;
   delete selectionobj;

   return r;
}



//------------------------------------------------------------------------------

ClassImp(TProofPlayerLocal)


//------------------------------------------------------------------------------

ClassImp(TProofPlayerRemote)


//______________________________________________________________________________
TProofPlayerRemote::~TProofPlayerRemote()
{
   delete fOutput;      // owns the output list
   delete fOutputLists;

   if (fFeedbackLists != 0) {
      TIter next(fFeedbackLists);
      while (TMap *m = (TMap*) next()) {
         m->DeleteValues();
      }
   }
   delete fFeedbackLists;
   delete fPacketizer;
}

//______________________________________________________________________________
Int_t TProofPlayerRemote::Process(TDSet *dset, const char *selector_file,
                                  Option_t *option, Long64_t nentries,
                                  Long64_t first, TEventList * /*evl*/)
{
   // Process specified TDSet on PROOF.
   // Returns -1 in case error, 0 otherwise.

   PDB(kGlobal,1) Info("Process","Enter");

   delete fOutput;
   fOutput = new TList;

   if(fProof->IsMaster()){
      TPerfStats::Start(fInput, fOutput);
   } else {
      TPerfStats::Setup(fInput);
   }

   // If the filename does not contain "." assume class is compiled in
   if ( strchr(selector_file,'.') != 0 ) {
      TString filename = selector_file;
      TString aclicMode;
      TString arguments;
      TString io;
      filename = gSystem->SplitAclicMode(filename, aclicMode, arguments, io);

      PDB(kSelector,1) Info("Process", "Sendfile: %s", filename.Data() );
      if ( fProof->SendFile(filename) == -1 ) return -1;

      // NOTE: should we allow more extension?
      if ( filename.EndsWith(".C") ) {
         filename.Replace(filename.Length()-1,1,"h");
         if (!gSystem->AccessPathName(filename,kReadPermission)) {
            PDB(kSelector,1) Info("Process", "SendFile: %s", filename.Data() );
            if ( fProof->SendFile(filename) == -1 ) return -1;
         }
      }
   }

   TMessage mesg(kPROOF_PROCESS);
   TString fn(gSystem->BaseName(selector_file));

   TDSet *set = dset;
   if (fProof->IsMaster()) {

      PDB(kPacketizer,1) Info("Process","Create Proxy TDSet");
      set = new TDSetProxy( dset->GetType(), dset->GetObjName(),
                            dset->GetDirectory() );

      delete fPacketizer;
      if (fInput->FindObject("PROOF_NewPacketizer") != 0) {
         Info("Process","!!! Using TPacketizer2 !!!");
         fPacketizer = new TPacketizer2(dset, fProof->GetListOfActiveSlaves(),
                                        first, nentries, fInput);
      } else {
         PDB(kGlobal,1) Info("Process","Using Standard TPacketizer");
         fPacketizer = new TPacketizer(dset, fProof->GetListOfActiveSlaves(),
                                       first, nentries, fInput);
      }

      if ( !fPacketizer->IsValid() ) {
         return -1;
      }
   } else {
      if (fSelectorClass && fSelectorClass->IsLoaded()) delete fSelector;
      fSelectorClass = 0;
      fSelector = TSelector::GetSelector(selector_file);
      if (fSelector == 0) return -1;
      fSelectorClass = fSelector->IsA();
      fSelector->SetInputList(fInput);
      fSelector->Begin(0);
   }

   TCleanup clean(this);
   SetupFeedback();

   TString opt = option;
   mesg << set << fn << fInput << opt << nentries << first; // no evl yet

   PDB(kGlobal,1) Info("Process","Calling Broadcast");
   fProof->Broadcast(mesg);

   PDB(kGlobal,1) Info("Process","Calling Collect");
   fProof->Collect();

   StopFeedback();

   PDB(kGlobal,1) Info("Process","Calling Merge Output");
   MergeOutput();


   if (fProof->IsMaster()) {
      TPerfStats::Stop();
   } else {
      TIter next(fOutput);
      TList *output = fSelector->GetOutputList();
      while(TObject* obj = next()) {
         output->Add(obj);
      }
      fSelector->Terminate();
   }

   return 0;
}


//______________________________________________________________________________
void TProofPlayerRemote::MergeOutput()
{
   PDB(kOutput,1) Info("MergeOutput","Enter");

   if ( fOutputLists == 0 ) {
      PDB(kOutput,1) Info("MergeOutput","Leave (no output)");
      return;
   }

   TIter next(fOutputLists);

   TList *list;
   while ( (list = (TList *) next()) ) {
      Long_t offset = 0;

      TObject *obj = fOutput->FindObject(list->GetName());

      if (obj == 0) {
         obj = list->First();
         list->Remove(obj);
         fOutput->Add(obj);
      }

      if ( list->IsEmpty() ) continue;

      // direct CINT, also possible via TInterpreter?
      G__ClassInfo ci(obj->ClassName());
      G__CallFunc cf;

      if ( ci.IsValid() )
         cf.SetFuncProto( &ci, "Merge", "TCollection*", &offset);

      if ( cf.IsValid() ) {
         cf.SetArg((Long_t)list);
         cf.Exec(obj);
      } else {
         // No Merge interface, return individual objects
         while ( (obj = list->First()) ) {
            fOutput->Add(obj);
            list->Remove(obj);
         }
      }
   }

   delete fOutputLists; fOutputLists = 0;
   PDB(kOutput,1) Info("MergeOutput","Leave (%d object(s))", fOutput->GetSize());
}

//______________________________________________________________________________
void TProofPlayerRemote::StopProcess(Bool_t abort)
{
   if (fPacketizer != 0) fPacketizer->StopProcess(abort);
}

//______________________________________________________________________________
void TProofPlayerRemote::StoreOutput(TList *out)
{
   PDB(kOutput,1) Info("StoreOutput","Enter");

   if ( out == 0 ) {
      PDB(kOutput,1) Info("StoreOutput","Leave (empty)");
      return;
   }

   TIter next(out);
   out->SetOwner(kFALSE);  // take ownership of the contents

   if (fOutputLists == 0) {
      PDB(kOutput,2) Info("StoreOutput","Create fOutputLists");
      fOutputLists = new TList;
      fOutputLists->SetOwner();
   }

   TObject *obj;
   while( (obj = next()) ) {
      PDB(kOutput,2) Info("StoreOutput","Find '%s'", obj->GetName() );

      TList *list = (TList *) fOutputLists->FindObject( obj->GetName() );
      if ( list == 0 ) {
         PDB(kOutput,2) Info("StoreOutput","List not Found (creating)", obj->GetName() );
         list = new TList;
         list->SetName( obj->GetName() );
         list->SetOwner();
         fOutputLists->Add( list );
      }
      list->Add( obj );
   }

   delete out;
   PDB(kOutput,1) Info("StoreOutput","Leave");
}

//______________________________________________________________________________
TList *TProofPlayerRemote::MergeFeedback()
{
   PDB(kFeedback,1) Info("MergeFeedback","Enter");

   if ( gProof->IsMaster() ) {
      // process local feedback objects

      TList *fb = new TList;
      TIter next(fFeedback);
      while( TObjString *name = (TObjString*) next() ) {
         TObject *o = fOutput->FindObject(name->GetName());
         if (o != 0) fb->Add(o->Clone());
      }

      StoreFeedback(this, fb); // adopts fb
   }


   if ( fFeedbackLists == 0 ) {
      PDB(kFeedback,1) Info("MergeFeedback","Leave (no output)");
      return 0;
   }

   TList *fb = new TList;   // collection of feedback object

   TIter next(fFeedbackLists);

   TMap *map;
   while ( (map = (TMap*) next()) ) {
      Long_t offset = 0;

      // turn map into list ...

      TList *list = new TList;
      TIter keys(map);

      while ( TObject *key = keys() ) {
         list->Add(map->GetValue(key));
      }

      // clone first object, remove from list

      TObject *obj = list->First();
      list->Remove(obj);
      obj = obj->Clone();
      fb->Add(obj);

      if ( list->IsEmpty() ) {
         delete list;
         continue;
      }

      // merge list with clone
      // direct CINT, also possible via TInterpreter?
      G__ClassInfo ci(obj->ClassName());
      G__CallFunc cf;

      if ( ci.IsValid() )
         cf.SetFuncProto( &ci, "Merge", "TCollection*", &offset);

      if ( cf.IsValid() ) {
         cf.SetArg((Long_t)list);
         cf.Exec(obj);
      } else {
         // No Merge interface, return copy of individual objects
         while ( (obj = list->First()) ) {
            fb->Add(obj->Clone());
            list->Remove(obj);
         }
      }

      delete list;
   }

   PDB(kFeedback,1) Info("MergeFeedback","Leave (%d object(s))", fb->GetSize());

   return fb;
}

//______________________________________________________________________________
void TProofPlayerRemote::StoreFeedback(TObject *slave, TList *out)
{
   PDB(kFeedback,1) Info("StoreFeedback","Enter");

   if ( out == 0 ) {
      PDB(kFeedback,1) Info("StoreFeedback","Leave (empty)");
      return;
   }

   if ( !gProof->IsMaster() ) {
      // in client
      Feedback(out);
      delete out;
      return;
   }

   if (fFeedbackLists == 0) {
      PDB(kFeedback,2) Info("StoreFeedback","Create fFeedbackLists");
      fFeedbackLists = new TList;
      fFeedbackLists->SetOwner();
   }

   TIter next(out);
   out->SetOwner(kFALSE);  // take ownership of the contents

   TObject *obj;
   while( (obj = next()) ) {
      PDB(kFeedback,2) Info("StoreFeedback","Find '%s'", obj->GetName() );

      TMap *map = (TMap*) fFeedbackLists->FindObject(obj->GetName());
      if ( map == 0 ) {
         PDB(kFeedback,2) Info("StoreFeedback","Map not Found (creating)", obj->GetName() );
         map = new TMap;
         map->SetName( obj->GetName() );
         // TODO: needed? allowed? map->SetOwner();
         fFeedbackLists->Add(map);
      }

      map->Remove(slave);
      map->Add(slave, obj);
   }

   delete out;
   PDB(kFeedback,1) Info("StoreFeedback","Leave");
}

//______________________________________________________________________________
void TProofPlayerRemote::SetupFeedback()
{
   if (!gProof->IsMaster()) return; // Client does not need timer

   fFeedback = (TList*) fInput->FindObject("FeedbackList");

   PDB(kFeedback,1) Info("SetupFeedback","\"FeedbackList\" %sfound",
      fFeedback == 0 ? "NOT ":"");

   if (fFeedback == 0) return;

   // OK, feedback was requested, setup the timer

   fFeedbackTimer = new TTimer;
   fFeedbackTimer->SetObject(this);
   fFeedbackTimer->Start(500,kFALSE);
}

//______________________________________________________________________________
void TProofPlayerRemote::StopFeedback()
{
   if (fFeedbackTimer == 0) return;

   PDB(kFeedback,1) Info("StopFeedback","Stop Timer");

   delete fFeedbackTimer; fFeedbackTimer = 0;
}

//______________________________________________________________________________
Bool_t TProofPlayerRemote::HandleTimer(TTimer *)
{
   PDB(kFeedback,2) Info("HandleTimer","Entry");

   if ( fFeedbackTimer == 0 ) return kFALSE; // timer already switched off

   if ( fFeedbackLists == 0 ) return kFALSE;

   TList *fb = MergeFeedback();

   PDB(kFeedback,2) Info("HandleTimer","Sending %d objects", fb->GetSize());

   TMessage m(kPROOF_FEEDBACK);
   m << fb;

   // send message to client;
   gProofServ->GetSocket()->Send(m);

   delete fb;
   return kFALSE; // ignored?
}

//______________________________________________________________________________
TDSetElement *TProofPlayerRemote::GetNextPacket(TSlave *slave, TMessage *r)
{
   TDSetElement *e = fPacketizer->GetNextPacket( slave, r );

   if ( e != 0 ) {
      PDB(kPacketizer,2)
         Info("GetNextPacket","To slave-%d (%s): '%s' '%s' '%s' %lld %lld",
              slave->GetOrdinal(), slave->GetName(), e->GetFileName(),
              e->GetDirectory(), e->GetObjName(), e->GetFirst(), e->GetNum());
   } else {
      PDB(kPacketizer,2) Info("GetNextPacket","Done");
   }

   return e;
}


//------------------------------------------------------------------------------


ClassImp(TProofPlayerSlave)


//______________________________________________________________________________
TProofPlayerSlave::TProofPlayerSlave()
{
  fSocket = 0;
  fFeedback = 0;
}

//______________________________________________________________________________
TProofPlayerSlave::TProofPlayerSlave(TSocket *socket)
{
      fSocket = socket;
      fFeedback = 0;
}

//______________________________________________________________________________
void TProofPlayerSlave::SetupFeedback()
{
   //
   TList *fb = (TList*) fInput->FindObject("FeedbackList");

   PDB(kFeedback,1) Info("SetupFeedback","\"FeedbackList\" %sfound",
      fb == 0 ? "NOT ":"");

   if (fb == 0) return;

   // OK, feedback was requested, setup the timer

   fFeedbackTimer = new TTimer;
   fFeedbackTimer->SetObject(this);
   fFeedbackTimer->Start(500,kFALSE);

   fFeedback = fb;

}

//______________________________________________________________________________
void TProofPlayerSlave::StopFeedback()
{
   if (fFeedbackTimer == 0) return;

   PDB(kFeedback,1) Info("StopFeedback","Stop Timer");

   fFeedbackTimer->Stop();
   delete fFeedbackTimer;
   fFeedbackTimer = 0;
}

//______________________________________________________________________________
Bool_t TProofPlayerSlave::HandleTimer(TTimer *)
{
   PDB(kFeedback,2) Info("HandleTimer","Entry");

   if ( fFeedback == 0 ) return kFALSE;

   TList *fb = new TList;
   fb->SetOwner(kFALSE);

   if (fOutput == 0) {
      fOutput = fSelector->GetOutputList();
   }

   if (fOutput) {
      TIter next(fFeedback);
      while( TObjString *name = (TObjString*) next() ) {
         // TODO: find object in memory ... maybe allow only in fOutput ?
         TObject *o = fOutput->FindObject(name->GetName());
         if (o != 0) fb->Add(o);
      }
   }

   PDB(kFeedback,2) Info("HandleTimer","Sending %d objects", fb->GetSize());

   TMessage m(kPROOF_FEEDBACK);
   m << fb;

   // send message to client;
   gProofServ->GetSocket()->Send(m);

   delete fb;
   return kFALSE; // ignored?
}


//______________________________________________________________________________
Int_t TProofPlayerSlave::DrawSelect(TDSet * /*set*/, const char * /*varexp*/,
                               const char * /*selection*/, Option_t * /*option*/,
                               Long64_t /*nentries*/, Long64_t /*firstentry*/)
{
   MayNotUse("DrawSelect");

   return -1;
}


ROOT page - Class index - Class Hierarchy - Top of the page

This page has been automatically generated. If you have any comments or suggestions about the page layout send a mail to ROOT support, or contact the developers with any questions or problems regarding ROOT.