// @(#)root/proof:$Name:  $:$Id: TProofPlayer.h,v 1.40 2006/11/22 14:16:54 rdm Exp $
// Author: Maarten Ballintijn   07/01/02

/*************************************************************************
 * Copyright (C) 1995-2001, Rene Brun and Fons Rademakers.               *
 * All rights reserved.                                                  *
 *                                                                       *
 * For the licensing terms see $ROOTSYS/LICENSE.                         *
 * For the list of contributors see $ROOTSYS/README/CREDITS.             *
 *************************************************************************/

#ifndef ROOT_TProofPlayer
#define ROOT_TProofPlayer


//////////////////////////////////////////////////////////////////////////
//                                                                      //
// TProofPlayer                                                         //
//                                                                      //
//////////////////////////////////////////////////////////////////////////

#ifndef ROOT_TArrayL64
#include "TArrayL64.h"
#endif
#ifndef ROOT_TList
#include "TList.h"
#endif
#ifndef ROOT_TObject
#include "TObject.h"
#endif
#ifndef ROOT_TQObject
#include "TQObject.h"
#endif
#ifndef ROOT_TSystem
#include "TSystem.h"
#endif
#ifndef ROOT_TQueryResult
#include "TQueryResult.h"
#endif

class TSelector;
class TDSet;
class TDSetElement;
class TSlave;
class TEventList;
class TProof;
class TSocket;
class TVirtualPacketizer;
class TMessage;
class TSlave;
class TEventIter;
class TProofStats;
class TMutex;
class TStatus;
class TTimer;


//------------------------------------------------------------------------

class TProofPlayer : public TObject, public TQObject {

public:
   enum EExitStatus { kFinished, kStopped, kAborted };

private:
   TList        *fAutoBins;  // Map of min/max values by name for slaves

protected:
   TList        *fInput;           //-> list with input objects
   TList        *fOutput;          //   list with output objects
   TSelector    *fSelector;        //!  the latest selector
   TClass       *fSelectorClass;   //!  class of the latest selector
   TTimer       *fFeedbackTimer;   //!  timer for sending intermediate results
   TEventIter   *fEvIter;          //!  iterator on events or objects
   TStatus      *fSelStatus;       //!  status of query in progress
   EExitStatus   fExitStatus;      //   exit status
   Long64_t      fEventsProcessed; //   number of events processed
   Long64_t      fTotalEvents;     //   number of events requested

   TList        *fQueryResults;    //List of TQueryResult
   TQueryResult *fQuery;           //Instance of TQueryResult currently processed
   TQueryResult *fPreviousQuery;   //Previous instance of TQueryResult processed
   Int_t         fDrawQueries;     //Number of Draw queries in the list
   Int_t         fMaxDrawQueries;  //Max number of Draw queries kept

   TTimer       *fStopTimer;       //Timer associated with a stop request
   TMutex       *fStopTimerMtx;    //To protect the stop timer

   void         *GetSender() { return this; }  //used to set gTQSender

   virtual void SetupFeedback();  // specialized setup

public:   // fix for broken compilers so TCleanup can call StopFeedback()
   virtual void StopFeedback();   // specialized teardown

protected:
   class TCleanup {
   private:
      TProofPlayer *fPlayer;
   public:
      TCleanup(TProofPlayer *p) : fPlayer(p) { }
      ~TCleanup() { gSystem->Syslog(kLogErr, "!!!cleanup!!!"); fPlayer->StopFeedback(); }
   };

public:
   TProofPlayer();
   virtual ~TProofPlayer();

   virtual Long64_t  Process(TDSet *set,
                             const char *selector, Option_t *option = "",
                             Long64_t nentries = -1, Long64_t firstentry = 0,
                             TEventList *evl = 0);
   virtual Long64_t  Finalize(Bool_t force = kFALSE, Bool_t sync = kFALSE);
   virtual Long64_t  Finalize(TQueryResult *qr);
   virtual Long64_t  DrawSelect(TDSet *set, const char *varexp,
                                const char *selection, Option_t *option = "",
                                Long64_t nentries = -1, Long64_t firstentry = 0);

   virtual void      StopProcess(Bool_t abort, Int_t timeout = -1);
   virtual void      AddInput(TObject *inp);
   virtual void      ClearInput();
   virtual TObject  *GetOutput(const char *name) const;
   virtual TList    *GetOutputList() const;
   virtual TList    *GetInputList() const { return fInput; }
   virtual TList    *GetListOfResults() const { return fQueryResults; }
   virtual void      AddQueryResult(TQueryResult *q);
   virtual TQueryResult *GetCurrentQuery() const { return fQuery; }
   virtual TQueryResult *GetQueryResult(const char *ref);
   virtual void      RemoveQueryResult(const char *ref);
   virtual void      SetCurrentQuery(TQueryResult *q);
   virtual void      SetMaxDrawQueries(Int_t max) { fMaxDrawQueries = max; }
   virtual void      RestorePreviousQuery() { fQuery = fPreviousQuery; }
   virtual Int_t     AddOutputObject(TObject *obj);
   virtual void      AddOutput(TList *out);   // Incorporate a list
   virtual void      StoreOutput(TList *out);   // Adopts the list
   virtual void      StoreFeedback(TObject *slave, TList *out); // Adopts the list
   virtual void      Progress(Long64_t total, Long64_t processed); // *SIGNAL*
   virtual void      Progress(TSlave *, Long64_t total, Long64_t processed)
                        { Progress(total, processed); }
   virtual void      Progress(Long64_t total, Long64_t processed, Long64_t bytesread,
                              Float_t initTime, Float_t procTime,
                              Float_t evtrti, Float_t mbrti); // *SIGNAL*
   virtual void      Feedback(TList *objs); // *SIGNAL*

   virtual TDSetElement *GetNextPacket(TSlave *slave, TMessage *r);

   virtual Int_t     ReinitSelector(TQueryResult *qr);

   void              UpdateAutoBin(const char *name,
                                   Double_t& xmin, Double_t& xmax,
                                   Double_t& ymin, Double_t& ymax,
                                   Double_t& zmin, Double_t& zmax);

   virtual Bool_t    IsClient() const { return kFALSE; }

   virtual EExitStatus GetExitStatus() const { return fExitStatus; }
   virtual Long64_t    GetEventsProcessed() const { return fEventsProcessed; }
   virtual void        AddEventsProcessed(Long64_t ev) { fEventsProcessed += ev; }

   virtual void      HandleAbortTimer();
   virtual void      HandleStopTimer();
   virtual void      SetStopTimer(Bool_t on = kTRUE, Bool_t abort = kFALSE, Int_t timeout = 0);

   ClassDef(TProofPlayer,0)  // Abstract PROOF player
};


//------------------------------------------------------------------------

class TProofPlayerLocal : public TProofPlayer {

public:
   TProofPlayerLocal() { }

   ClassDef(TProofPlayerLocal,0)  // PROOF player running on client
};


//------------------------------------------------------------------------

class TProofPlayerRemote : public TProofPlayer {

private:
   TProof             *fProof;         // link to associated PROOF session
   TList              *fOutputLists;   // results returned by slaves
   TList              *fFeedback;      // reference for use on master
   TList              *fFeedbackLists; // intermediate results
   TVirtualPacketizer *fPacketizer;    // transform TDSet into packets for slaves
   TDSet              *fDSet;          //!tdset for current processing

   TList              *MergeFeedback();

protected:
   virtual Bool_t  HandleTimer(TTimer *timer);
   TProof         *GetProof() const { return fProof; }
   virtual Bool_t  SendSelector(const char *selector_file); //send selector to slaves
   virtual void    SetupFeedback();  // specialized setup
   virtual void    StopFeedback();   // specialized teardown

public:
   TProofPlayerRemote(TProof *proof = 0) : fProof(proof), fOutputLists(0), fFeedback(0),
                                           fFeedbackLists(0), fPacketizer(0) {}
   virtual ~TProofPlayerRemote();   // Owns the fOutput list

   Long64_t       Process(TDSet *set, const char *selector,
                          Option_t *option = "", Long64_t nentries = -1,
                          Long64_t firstentry = 0, TEventList *evl = 0);
   Long64_t       Finalize(Bool_t force = kFALSE, Bool_t sync = kFALSE);
   Long64_t       Finalize(TQueryResult *qr);
   Long64_t       DrawSelect(TDSet *set, const char *varexp,
                             const char *selection, Option_t *option = "",
                             Long64_t nentries = -1, Long64_t firstentry = 0);

   void           StopProcess(Bool_t abort, Int_t timeout = -1);
   void           StoreOutput(TList *out);   // Adopts the list
   void           StoreFeedback(TObject *slave, TList *out); // Adopts the list
   Int_t          Incorporate(TObject *obj, TList *out, Bool_t &merged);
   Int_t          AddOutputObject(TObject *obj);
   void           AddOutput(TList *out);   // Incorporate a list
   void           MergeOutput();
   void           Progress(Long64_t total, Long64_t processed); // *SIGNAL*
   void           Progress(TSlave*, Long64_t total, Long64_t processed)
                     { Progress(total, processed); }
   void           Progress(Long64_t total, Long64_t processed, Long64_t bytesread,
                           Float_t initTime, Float_t procTime,
                           Float_t evtrti, Float_t mbrti); // *SIGNAL*
   void           Feedback(TList *objs); // *SIGNAL*
   TDSetElement  *GetNextPacket(TSlave *slave, TMessage *r);

   Bool_t         IsClient() const;

   ClassDef(TProofPlayerRemote,0)  // PROOF player running on master server
};


//------------------------------------------------------------------------

class TProofPlayerSlave : public TProofPlayer {

private:
   TSocket *fSocket;
   TList   *fFeedback;  // List of objects to send updates of

   virtual Bool_t HandleTimer(TTimer *timer);

protected:
   void SetupFeedback();
   void StopFeedback();

public:
   TProofPlayerSlave(TSocket *socket = 0) : fSocket(socket), fFeedback(0) { }

   Long64_t DrawSelect(TDSet *set, const char *varexp,
                       const char *selection, Option_t *option = "",
                       Long64_t nentries = -1, Long64_t firstentry = 0);

   ClassDef(TProofPlayerSlave,0)  // PROOF player running on slave server
};


//------------------------------------------------------------------------

class TProofPlayerSuperMaster : public TProofPlayerRemote {

private:
   TArrayL64 fSlaveProgress;
   TArrayL64 fSlaveTotals;
   TList     fSlaves;
   Bool_t    fReturnFeedback;

protected:
   virtual Bool_t HandleTimer(TTimer *timer);
   virtual void   SetupFeedback();

public:
   TProofPlayerSuperMaster(TProof *proof = 0) :
      TProofPlayerRemote(proof), fReturnFeedback(kFALSE) { }
   virtual ~TProofPlayerSuperMaster() { }

   virtual Long64_t Process(TDSet *set, const char *selector,
                            Option_t *option = "", Long64_t nentries = -1,
                            Long64_t firstentry = 0, TEventList *evl = 0);
   virtual void  Progress(Long64_t total, Long64_t processed)
                    { TProofPlayerRemote::Progress(total, processed); }
   virtual void  Progress(Long64_t total, Long64_t processed, Long64_t bytesread,
                          Float_t initTime, Float_t procTime,
                          Float_t evtrti, Float_t mbrti)
                    { TProofPlayerRemote::Progress(total, processed, bytesread,
                                                   initTime, procTime, evtrti, mbrti); }
   virtual void  Progress(TSlave *sl, Long64_t total, Long64_t processed);

   ClassDef(TProofPlayerSuperMaster,0)  // PROOF player running on super master
};

#endif


ROOT page - Class index - Class Hierarchy - Top of the page

This page has been automatically generated. If you have any comments or suggestions about the page layout send a mail to ROOT support, or contact the developers with any questions or problems regarding ROOT.