ROOT logo
// @(#)root/proofplayer:$Id: TVirtualPacketizer.cxx 25918 2008-10-22 15:00:04Z ganis $
// Author: Maarten Ballintijn    9/7/2002

/*************************************************************************
 * Copyright (C) 1995-2002, Rene Brun and Fons Rademakers.               *
 * All rights reserved.                                                  *
 *                                                                       *
 * For the licensing terms see $ROOTSYS/LICENSE.                         *
 * For the list of contributors see $ROOTSYS/README/CREDITS.             *
 *************************************************************************/

//////////////////////////////////////////////////////////////////////////
//                                                                      //
// TVirtualPacketizer                                                   //
//                                                                      //
// The packetizer is a load balancing object created for each query.    //
// It generates packets to be processed on PROOF worker servers.        //
// A packet is an event range (begin entry and number of entries) or    //
// object range (first object and number of objects) in a TTree         //
// (entries) or a directory (objects) in a file.                        //
// Packets are generated taking into account the performance of the     //
// remote machine, the time it took to process a previous packet on     //
// the remote machine, the locality of the database files, etc.         //
//                                                                      //
// TVirtualPacketizer includes common parts of PROOF packetizers.       //
// Look in subclasses for details.                                      //
// The default packetizer is TPacketizerAdaptive.                       //
// To use an alternative one, for instance - the TPacketizer, call:     //
// proof->SetParameter("PROOF_Packetizer", "TPacketizer");              //
//                                                                      //
//////////////////////////////////////////////////////////////////////////


#include "TVirtualPacketizer.h"
#include "TEnv.h"
#include "TFile.h"
#include "TTree.h"
#include "TKey.h"
#include "TDSet.h"
#include "TError.h"
#include "TEventList.h"
#include "TEntryList.h"
#include "TMap.h"
#include "TMessage.h"
#include "TObjString.h"

#include "TProof.h"
#include "TProofDebug.h"
#include "TProofPlayer.h"
#include "TProofServ.h"
#include "TSlave.h"
#include "TSocket.h"
#include "TTimer.h"
#include "TUrl.h"
#include "TMath.h"
#include "TMonitor.h"
#include "TNtupleD.h"
#include "TPerfStats.h"

ClassImp(TVirtualPacketizer)

//______________________________________________________________________________
TVirtualPacketizer::TVirtualPacketizer(TList *input, TProofProgressStatus *st)
{
   // Constructor.

   fProgressStatus = st;
   if (!fProgressStatus) {
      Error("TVirtualPacketizer", "No progress status");
      return;
   }
   fTotalEntries = 0;
   fValid = kTRUE;
   fStop = kFALSE;
   fFailedPackets = 0;

   // Performance monitoring
   TTime tnow = gSystem->Now();
   fStartTime = Long_t(tnow);
   SetBit(TVirtualPacketizer::kIsInitializing);
   ResetBit(TVirtualPacketizer::kIsDone);
   fInitTime = 0;
   fProcTime = 0;
   fTimeUpdt = -1.;

   // Init circularity ntple for performance calculations
   fCircProg = new TNtupleD("CircNtuple","Circular progress info","tm:ev:mb");
   fCircN = 10;
   TProof::GetParameter(input, "PROOF_ProgressCircularity", fCircN);
   fCircProg->SetCircular(fCircN);

   // Init progress timer
   Long_t period = 500;
   TProof::GetParameter(input, "PROOF_ProgressPeriod", period);
   fProgress = new TTimer;
   fProgress->SetObject(this);
   fProgress->Start(period, kFALSE);

   // Whether to send estimated values for the progress info
   TString estopt;
   TProof::GetParameter(input, "PROOF_RateEstimation", estopt);
   if (estopt.IsNull()) {
      // Parse option from the env
      estopt = gEnv->GetValue("Proof.RateEstimation", "");
   }
   fUseEstOpt = kEstOff;
   if (estopt == "current")
      fUseEstOpt = kEstCurrent;
   else if (estopt == "average")
      fUseEstOpt = kEstAverage;
}

//______________________________________________________________________________
TVirtualPacketizer::~TVirtualPacketizer()
{
   // Destructor.

   SafeDelete(fCircProg);
   SafeDelete(fProgress);
   SafeDelete(fFailedPackets);
   fProgressStatus = 0; // belongs to the player
}

//______________________________________________________________________________
Long64_t TVirtualPacketizer::GetEntries(Bool_t tree, TDSetElement *e)
{
   // Get entries.

   Long64_t entries;
   TFile *file = TFile::Open(e->GetFileName());

   if ( file->IsZombie() ) {
      Error("GetEntries","Cannot open file: %s (%s)",
            e->GetFileName(), strerror(file->GetErrno()) );
      return -1;
   }

   TDirectory *dirsave = gDirectory;
   if ( ! file->cd(e->GetDirectory()) ) {
      Error("GetEntries","Cannot cd to: %s", e->GetDirectory() );
      delete file;
      return -1;
   }
   TDirectory *dir = gDirectory;
   dirsave->cd();

   if ( tree ) {
      TKey *key = dir->GetKey(e->GetObjName());
      if ( key == 0 ) {
         Error("GetEntries","Cannot find tree \"%s\" in %s",
               e->GetObjName(), e->GetFileName() );
         delete file;
         return -1;
      }
      TTree *t = (TTree *) key->ReadObj();
      if ( t == 0 ) {
         // Error always reported?
         delete file;
         return -1;
      }
      entries = (Long64_t) t->GetEntries();
      delete t;

   } else {
      TList *keys = dir->GetListOfKeys();
      entries = keys->GetSize();
   }

   delete file;

   return entries;
}

//______________________________________________________________________________
TDSetElement *TVirtualPacketizer::GetNextPacket(TSlave *, TMessage *)
{
   // Get next packet.

   AbstractMethod("GetNextPacket");
   return 0;
}

//______________________________________________________________________________
void TVirtualPacketizer::StopProcess(Bool_t /*abort*/)
{
   // Stop process.

   fStop = kTRUE;
}

//______________________________________________________________________________
TDSetElement* TVirtualPacketizer::CreateNewPacket(TDSetElement* base,
                                                  Long64_t first, Long64_t num)
{
   // Creates a new TDSetElement from from base packet starting from
   // the first entry with num entries.
   // The function returns a new created objects which have to be deleted.

   TDSetElement* elem = new TDSetElement(base->GetFileName(), base->GetObjName(),
                                         base->GetDirectory(), first, num);

   // create TDSetElements for all the friends of elem.
   TList *friends = base->GetListOfFriends();
   if (friends) {
      TIter nxf(friends);
      TPair *p = 0;
      while ((p = (TPair *) nxf())) {
         TDSetElement *fe = (TDSetElement *) p->Key();
         elem->AddFriend(new TDSetElement(fe->GetFileName(), fe->GetObjName(),
                                          fe->GetDirectory(), first, num),
                                         ((TObjString *)(p->Value()))->GetName());
      }
   }

   return elem;
}

//______________________________________________________________________________
Bool_t TVirtualPacketizer::HandleTimer(TTimer *)
{
   // Send progress message to client.

   if (fProgress == 0 || TestBit(TVirtualPacketizer::kIsDone))
      return kFALSE; // timer stopped already or reports completed

   // Prepare progress info
   TTime tnow = gSystem->Now();
   Float_t now = (Float_t) (Long_t(tnow) - fStartTime) / (Double_t)1000.;
   Long64_t estent = GetEntriesProcessed();
   Long64_t estmb = GetBytesRead();

   // Times and counters
   Float_t evtrti = -1., mbrti = -1.;
   if (TestBit(TVirtualPacketizer::kIsInitializing)) {
      // Initialization
      fInitTime = now;
   } else {
      // Fill the reference as first
      if (fCircProg->GetEntries() <= 0) {
         fCircProg->Fill((Double_t)0., 0., 0.);
         // Best estimation of the init time
         fInitTime = (now + fInitTime) / 2.;
      }
      // Time between updates
      fTimeUpdt = now - fProcTime;
      // Update proc time
      fProcTime = now - fInitTime;
      // Estimated number of processed events
      GetEstEntriesProcessed(fProcTime, estent, estmb);
      Double_t evts = (Double_t) estent;
      Double_t mbs = (estmb > 0) ?  estmb / TMath::Power(2.,20.) : 0.; //--> MB
      // Good entry
      fCircProg->Fill((Double_t)fProcTime, evts, mbs);
      // Instantaneous rates (at least 5 reports)
      if (fCircProg->GetEntries() > 4) {
         Double_t *ar = fCircProg->GetArgs();
         fCircProg->GetEntry(0);
         Double_t dt = (Double_t)fProcTime - ar[0];
         evtrti = (dt > 0) ? (Float_t) (evts - ar[1]) / dt : -1. ;
         mbrti = (dt > 0) ? (Float_t) (mbs - ar[2]) / dt : -1. ;
         if (gPerfStats != 0)
            gPerfStats->RateEvent((Double_t)fProcTime, dt,
                                 (Long64_t) (evts - ar[1]),
                                 (Long64_t) ((mbs - ar[2])*TMath::Power(2.,20.)));
      }

      // Final report only once (to correctly determine the proc time)
      if (fTotalEntries > 0 && GetEntriesProcessed() >= fTotalEntries)
         SetBit(TVirtualPacketizer::kIsDone);
   }

   if (gProofServ) {

      // Message to be sent over
      TMessage m(kPROOF_PROGRESS);
      if (gProofServ->GetProtocol() > 11) {
         // Fill the message now
         m << fTotalEntries << estent << estmb << fInitTime << fProcTime
           << evtrti << mbrti;
      } else {
         // Old format
         m << fTotalEntries << GetEntriesProcessed();
      }
      // send message to client;
      gProofServ->GetSocket()->Send(m);

   } else {
      if (gProof && gProof->GetPlayer()) {
         // Log locally
         gProof->GetPlayer()->Progress(fTotalEntries, estent, estmb,
                                       fInitTime, fProcTime, evtrti, mbrti);
      }
   }

   // Final report only once (to correctly determine the proc time)
   if (fTotalEntries > 0 && GetEntriesProcessed() >= fTotalEntries)
      SetBit(TVirtualPacketizer::kIsDone);

   return kFALSE; // ignored?
}

//______________________________________________________________________________
void TVirtualPacketizer::SetInitTime()
{
   // Set the initialization time

   if (TestBit(TVirtualPacketizer::kIsInitializing)) {
      fInitTime = (Float_t) (Long_t(gSystem->Now()) - fStartTime) / (Double_t)1000.;
      ResetBit(TVirtualPacketizer::kIsInitializing);
   }
   PDB(kPacketizer,2)
      Info("SetInitTime","fInitTime: %f s", fInitTime);
}
 TVirtualPacketizer.cxx:1
 TVirtualPacketizer.cxx:2
 TVirtualPacketizer.cxx:3
 TVirtualPacketizer.cxx:4
 TVirtualPacketizer.cxx:5
 TVirtualPacketizer.cxx:6
 TVirtualPacketizer.cxx:7
 TVirtualPacketizer.cxx:8
 TVirtualPacketizer.cxx:9
 TVirtualPacketizer.cxx:10
 TVirtualPacketizer.cxx:11
 TVirtualPacketizer.cxx:12
 TVirtualPacketizer.cxx:13
 TVirtualPacketizer.cxx:14
 TVirtualPacketizer.cxx:15
 TVirtualPacketizer.cxx:16
 TVirtualPacketizer.cxx:17
 TVirtualPacketizer.cxx:18
 TVirtualPacketizer.cxx:19
 TVirtualPacketizer.cxx:20
 TVirtualPacketizer.cxx:21
 TVirtualPacketizer.cxx:22
 TVirtualPacketizer.cxx:23
 TVirtualPacketizer.cxx:24
 TVirtualPacketizer.cxx:25
 TVirtualPacketizer.cxx:26
 TVirtualPacketizer.cxx:27
 TVirtualPacketizer.cxx:28
 TVirtualPacketizer.cxx:29
 TVirtualPacketizer.cxx:30
 TVirtualPacketizer.cxx:31
 TVirtualPacketizer.cxx:32
 TVirtualPacketizer.cxx:33
 TVirtualPacketizer.cxx:34
 TVirtualPacketizer.cxx:35
 TVirtualPacketizer.cxx:36
 TVirtualPacketizer.cxx:37
 TVirtualPacketizer.cxx:38
 TVirtualPacketizer.cxx:39
 TVirtualPacketizer.cxx:40
 TVirtualPacketizer.cxx:41
 TVirtualPacketizer.cxx:42
 TVirtualPacketizer.cxx:43
 TVirtualPacketizer.cxx:44
 TVirtualPacketizer.cxx:45
 TVirtualPacketizer.cxx:46
 TVirtualPacketizer.cxx:47
 TVirtualPacketizer.cxx:48
 TVirtualPacketizer.cxx:49
 TVirtualPacketizer.cxx:50
 TVirtualPacketizer.cxx:51
 TVirtualPacketizer.cxx:52
 TVirtualPacketizer.cxx:53
 TVirtualPacketizer.cxx:54
 TVirtualPacketizer.cxx:55
 TVirtualPacketizer.cxx:56
 TVirtualPacketizer.cxx:57
 TVirtualPacketizer.cxx:58
 TVirtualPacketizer.cxx:59
 TVirtualPacketizer.cxx:60
 TVirtualPacketizer.cxx:61
 TVirtualPacketizer.cxx:62
 TVirtualPacketizer.cxx:63
 TVirtualPacketizer.cxx:64
 TVirtualPacketizer.cxx:65
 TVirtualPacketizer.cxx:66
 TVirtualPacketizer.cxx:67
 TVirtualPacketizer.cxx:68
 TVirtualPacketizer.cxx:69
 TVirtualPacketizer.cxx:70
 TVirtualPacketizer.cxx:71
 TVirtualPacketizer.cxx:72
 TVirtualPacketizer.cxx:73
 TVirtualPacketizer.cxx:74
 TVirtualPacketizer.cxx:75
 TVirtualPacketizer.cxx:76
 TVirtualPacketizer.cxx:77
 TVirtualPacketizer.cxx:78
 TVirtualPacketizer.cxx:79
 TVirtualPacketizer.cxx:80
 TVirtualPacketizer.cxx:81
 TVirtualPacketizer.cxx:82
 TVirtualPacketizer.cxx:83
 TVirtualPacketizer.cxx:84
 TVirtualPacketizer.cxx:85
 TVirtualPacketizer.cxx:86
 TVirtualPacketizer.cxx:87
 TVirtualPacketizer.cxx:88
 TVirtualPacketizer.cxx:89
 TVirtualPacketizer.cxx:90
 TVirtualPacketizer.cxx:91
 TVirtualPacketizer.cxx:92
 TVirtualPacketizer.cxx:93
 TVirtualPacketizer.cxx:94
 TVirtualPacketizer.cxx:95
 TVirtualPacketizer.cxx:96
 TVirtualPacketizer.cxx:97
 TVirtualPacketizer.cxx:98
 TVirtualPacketizer.cxx:99
 TVirtualPacketizer.cxx:100
 TVirtualPacketizer.cxx:101
 TVirtualPacketizer.cxx:102
 TVirtualPacketizer.cxx:103
 TVirtualPacketizer.cxx:104
 TVirtualPacketizer.cxx:105
 TVirtualPacketizer.cxx:106
 TVirtualPacketizer.cxx:107
 TVirtualPacketizer.cxx:108
 TVirtualPacketizer.cxx:109
 TVirtualPacketizer.cxx:110
 TVirtualPacketizer.cxx:111
 TVirtualPacketizer.cxx:112
 TVirtualPacketizer.cxx:113
 TVirtualPacketizer.cxx:114
 TVirtualPacketizer.cxx:115
 TVirtualPacketizer.cxx:116
 TVirtualPacketizer.cxx:117
 TVirtualPacketizer.cxx:118
 TVirtualPacketizer.cxx:119
 TVirtualPacketizer.cxx:120
 TVirtualPacketizer.cxx:121
 TVirtualPacketizer.cxx:122
 TVirtualPacketizer.cxx:123
 TVirtualPacketizer.cxx:124
 TVirtualPacketizer.cxx:125
 TVirtualPacketizer.cxx:126
 TVirtualPacketizer.cxx:127
 TVirtualPacketizer.cxx:128
 TVirtualPacketizer.cxx:129
 TVirtualPacketizer.cxx:130
 TVirtualPacketizer.cxx:131
 TVirtualPacketizer.cxx:132
 TVirtualPacketizer.cxx:133
 TVirtualPacketizer.cxx:134
 TVirtualPacketizer.cxx:135
 TVirtualPacketizer.cxx:136
 TVirtualPacketizer.cxx:137
 TVirtualPacketizer.cxx:138
 TVirtualPacketizer.cxx:139
 TVirtualPacketizer.cxx:140
 TVirtualPacketizer.cxx:141
 TVirtualPacketizer.cxx:142
 TVirtualPacketizer.cxx:143
 TVirtualPacketizer.cxx:144
 TVirtualPacketizer.cxx:145
 TVirtualPacketizer.cxx:146
 TVirtualPacketizer.cxx:147
 TVirtualPacketizer.cxx:148
 TVirtualPacketizer.cxx:149
 TVirtualPacketizer.cxx:150
 TVirtualPacketizer.cxx:151
 TVirtualPacketizer.cxx:152
 TVirtualPacketizer.cxx:153
 TVirtualPacketizer.cxx:154
 TVirtualPacketizer.cxx:155
 TVirtualPacketizer.cxx:156
 TVirtualPacketizer.cxx:157
 TVirtualPacketizer.cxx:158
 TVirtualPacketizer.cxx:159
 TVirtualPacketizer.cxx:160
 TVirtualPacketizer.cxx:161
 TVirtualPacketizer.cxx:162
 TVirtualPacketizer.cxx:163
 TVirtualPacketizer.cxx:164
 TVirtualPacketizer.cxx:165
 TVirtualPacketizer.cxx:166
 TVirtualPacketizer.cxx:167
 TVirtualPacketizer.cxx:168
 TVirtualPacketizer.cxx:169
 TVirtualPacketizer.cxx:170
 TVirtualPacketizer.cxx:171
 TVirtualPacketizer.cxx:172
 TVirtualPacketizer.cxx:173
 TVirtualPacketizer.cxx:174
 TVirtualPacketizer.cxx:175
 TVirtualPacketizer.cxx:176
 TVirtualPacketizer.cxx:177
 TVirtualPacketizer.cxx:178
 TVirtualPacketizer.cxx:179
 TVirtualPacketizer.cxx:180
 TVirtualPacketizer.cxx:181
 TVirtualPacketizer.cxx:182
 TVirtualPacketizer.cxx:183
 TVirtualPacketizer.cxx:184
 TVirtualPacketizer.cxx:185
 TVirtualPacketizer.cxx:186
 TVirtualPacketizer.cxx:187
 TVirtualPacketizer.cxx:188
 TVirtualPacketizer.cxx:189
 TVirtualPacketizer.cxx:190
 TVirtualPacketizer.cxx:191
 TVirtualPacketizer.cxx:192
 TVirtualPacketizer.cxx:193
 TVirtualPacketizer.cxx:194
 TVirtualPacketizer.cxx:195
 TVirtualPacketizer.cxx:196
 TVirtualPacketizer.cxx:197
 TVirtualPacketizer.cxx:198
 TVirtualPacketizer.cxx:199
 TVirtualPacketizer.cxx:200
 TVirtualPacketizer.cxx:201
 TVirtualPacketizer.cxx:202
 TVirtualPacketizer.cxx:203
 TVirtualPacketizer.cxx:204
 TVirtualPacketizer.cxx:205
 TVirtualPacketizer.cxx:206
 TVirtualPacketizer.cxx:207
 TVirtualPacketizer.cxx:208
 TVirtualPacketizer.cxx:209
 TVirtualPacketizer.cxx:210
 TVirtualPacketizer.cxx:211
 TVirtualPacketizer.cxx:212
 TVirtualPacketizer.cxx:213
 TVirtualPacketizer.cxx:214
 TVirtualPacketizer.cxx:215
 TVirtualPacketizer.cxx:216
 TVirtualPacketizer.cxx:217
 TVirtualPacketizer.cxx:218
 TVirtualPacketizer.cxx:219
 TVirtualPacketizer.cxx:220
 TVirtualPacketizer.cxx:221
 TVirtualPacketizer.cxx:222
 TVirtualPacketizer.cxx:223
 TVirtualPacketizer.cxx:224
 TVirtualPacketizer.cxx:225
 TVirtualPacketizer.cxx:226
 TVirtualPacketizer.cxx:227
 TVirtualPacketizer.cxx:228
 TVirtualPacketizer.cxx:229
 TVirtualPacketizer.cxx:230
 TVirtualPacketizer.cxx:231
 TVirtualPacketizer.cxx:232
 TVirtualPacketizer.cxx:233
 TVirtualPacketizer.cxx:234
 TVirtualPacketizer.cxx:235
 TVirtualPacketizer.cxx:236
 TVirtualPacketizer.cxx:237
 TVirtualPacketizer.cxx:238
 TVirtualPacketizer.cxx:239
 TVirtualPacketizer.cxx:240
 TVirtualPacketizer.cxx:241
 TVirtualPacketizer.cxx:242
 TVirtualPacketizer.cxx:243
 TVirtualPacketizer.cxx:244
 TVirtualPacketizer.cxx:245
 TVirtualPacketizer.cxx:246
 TVirtualPacketizer.cxx:247
 TVirtualPacketizer.cxx:248
 TVirtualPacketizer.cxx:249
 TVirtualPacketizer.cxx:250
 TVirtualPacketizer.cxx:251
 TVirtualPacketizer.cxx:252
 TVirtualPacketizer.cxx:253
 TVirtualPacketizer.cxx:254
 TVirtualPacketizer.cxx:255
 TVirtualPacketizer.cxx:256
 TVirtualPacketizer.cxx:257
 TVirtualPacketizer.cxx:258
 TVirtualPacketizer.cxx:259
 TVirtualPacketizer.cxx:260
 TVirtualPacketizer.cxx:261
 TVirtualPacketizer.cxx:262
 TVirtualPacketizer.cxx:263
 TVirtualPacketizer.cxx:264
 TVirtualPacketizer.cxx:265
 TVirtualPacketizer.cxx:266
 TVirtualPacketizer.cxx:267
 TVirtualPacketizer.cxx:268
 TVirtualPacketizer.cxx:269
 TVirtualPacketizer.cxx:270
 TVirtualPacketizer.cxx:271
 TVirtualPacketizer.cxx:272
 TVirtualPacketizer.cxx:273
 TVirtualPacketizer.cxx:274
 TVirtualPacketizer.cxx:275
 TVirtualPacketizer.cxx:276
 TVirtualPacketizer.cxx:277
 TVirtualPacketizer.cxx:278
 TVirtualPacketizer.cxx:279
 TVirtualPacketizer.cxx:280
 TVirtualPacketizer.cxx:281
 TVirtualPacketizer.cxx:282
 TVirtualPacketizer.cxx:283
 TVirtualPacketizer.cxx:284
 TVirtualPacketizer.cxx:285
 TVirtualPacketizer.cxx:286
 TVirtualPacketizer.cxx:287
 TVirtualPacketizer.cxx:288
 TVirtualPacketizer.cxx:289
 TVirtualPacketizer.cxx:290
 TVirtualPacketizer.cxx:291
 TVirtualPacketizer.cxx:292
 TVirtualPacketizer.cxx:293
 TVirtualPacketizer.cxx:294
 TVirtualPacketizer.cxx:295
 TVirtualPacketizer.cxx:296
 TVirtualPacketizer.cxx:297
 TVirtualPacketizer.cxx:298
 TVirtualPacketizer.cxx:299
 TVirtualPacketizer.cxx:300
 TVirtualPacketizer.cxx:301
 TVirtualPacketizer.cxx:302
 TVirtualPacketizer.cxx:303
 TVirtualPacketizer.cxx:304
 TVirtualPacketizer.cxx:305
 TVirtualPacketizer.cxx:306
 TVirtualPacketizer.cxx:307
 TVirtualPacketizer.cxx:308
 TVirtualPacketizer.cxx:309
 TVirtualPacketizer.cxx:310
 TVirtualPacketizer.cxx:311
 TVirtualPacketizer.cxx:312
 TVirtualPacketizer.cxx:313