// @(#)root/proof:$Id$
// Author: Fons Rademakers   13/02/97

/*************************************************************************
 * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers.               *
 * All rights reserved.                                                  *
 *                                                                       *
 * For the licensing terms see $ROOTSYS/LICENSE.                         *
 * For the list of contributors see $ROOTSYS/README/CREDITS.             *
 *************************************************************************/

//////////////////////////////////////////////////////////////////////////
//                                                                      //
// TProof                                                               //
//                                                                      //
// This class controls a Parallel ROOT Facility, PROOF, cluster.        //
// It fires the slave servers, it keeps track of how many slaves are    //
// running, it keeps track of the slaves running status, it broadcasts  //
// messages to all slaves, it collects results, etc.                    //
//                                                                      //
//////////////////////////////////////////////////////////////////////////

#include "TProofCondor.h"

#include "TCondor.h"
#include "TList.h"
#include "TMap.h"
#include "TMessage.h"
#include "TMonitor.h"
#include "TProofNodeInfo.h"
#include "TProofResourcesStatic.h"
#include "TProofServ.h"
#include "TSlave.h"
#include "TSocket.h"
#include "TString.h"
#include "TTimer.h"

ClassImp(TProofCondor)

//______________________________________________________________________________
TProofCondor::TProofCondor(const char *masterurl, const char *conffile,
                           const char *confdir, Int_t loglevel,
                           const char *, TProofMgr *mgr)
  : fCondor(0), fTimer(0)
{
   // Start proof using condor

   // Default initializations
   InitMembers();

   // This may be needed during init
   fManager = mgr;

   fUrl = TUrl(masterurl);

   if (!conffile || !conffile[0]) {
      conffile = kPROOF_ConfFile;
   } else if (!strncasecmp(conffile, "condor:", 7)) {
      conffile+=7;
   }

   if (!confdir  || !confdir[0]) {
      confdir = kPROOF_ConfDir;
   }

   Init(masterurl, conffile, confdir, loglevel);
}

//______________________________________________________________________________
TProofCondor::~TProofCondor()
{
   // Clean up Condor PROOF environment.

   SafeDelete(fCondor);
   SafeDelete(fTimer);
}

//______________________________________________________________________________
Bool_t TProofCondor::StartSlaves(Bool_t)
{
   // Setup Condor workers using dynamic information

   fCondor = new TCondor;
   TString jobad = GetJobAd();

   fImage = fCondor->GetImage(gSystem->HostName());
   if (fImage.Length() == 0) {
      Error("StartSlaves", "Empty Condor image found for system %s",
            gSystem->HostName());
      return kFALSE;
   }

   TList claims;
   if (fConfFile.IsNull()) {
      // startup all slaves if no config file given
      TList *condorclaims = fCondor->Claim(9999, jobad);
      TIter nextclaim(condorclaims);
      while (TObject *o = nextclaim()) claims.Add(o);
   } else {
      // parse config file
      TProofResourcesStatic *resources = new TProofResourcesStatic(fConfDir, fConfFile);
      fConfFile = resources->GetFileName(); // Update the global file name (with path)
      PDB(kGlobal,1) Info("StartSlaves", "using PROOF config file: %s", fConfFile.Data());

      // Get all workers
      TList *workerList = resources->GetWorkers();
      if (workerList->GetSize() == 0) {
         Error("StartSlaves", "Found no condorworkers in %s", fConfFile.Data());
         return kFALSE;
      }

      // check for valid slave lines and claim condor nodes
      Int_t ord = 0;

      // Loop over all workers and start them
      TListIter next(workerList);
      TObject *to;
      TProofNodeInfo *worker;
      int nSlavesDone = 0;
      while ((to = next())) {
         // Get the next worker from the list
         worker = (TProofNodeInfo *)to;

         // Read back worker node info
         const Char_t *image = worker->GetImage().Data();
         const Char_t *workdir = worker->GetWorkDir().Data();
         Int_t perfidx = worker->GetPerfIndex();

         gSystem->Sleep(10 /* ms */);
         TCondorSlave* csl = fCondor->Claim(worker->GetNodeName().Data(), jobad);
         if (csl) {
            csl->fPerfIdx = perfidx;
            csl->fImage = image;
            csl->fWorkDir = gSystem->ExpandPathName(workdir);
            TString fullord = TString(gProofServ->GetOrdinal()) + "." + ((Long_t) ord);
            csl->fOrdinal = fullord.Data();
            claims.Add(csl);
            ord++;
         }

         // Notify claim creation
         nSlavesDone++;
         TMessage m(kPROOF_SERVERSTARTED);
         m << TString("Creating COD Claim") << workerList->GetSize()
         << nSlavesDone << (csl != 0);
         gProofServ->GetSocket()->Send(m);

      } // end while (worker loop)

      // Cleanup
      delete resources;
      resources = 0;
   } // end else (parse config file)

   Long_t delay = 500; // timer delay 0.5s
   Int_t ntries = 20; // allow 20 tries (must be > 1 for algorithm to work)
   Int_t trial = 1;
   Int_t idx = 0;

   int nClaims = claims.GetSize();
   int nClaimsDone = 0;
   while (claims.GetSize() > 0) {
      TCondorSlave* c = 0;

      // Get Condor Slave
      if (trial == 1) {
         c = dynamic_cast<TCondorSlave*>(claims.At(idx));
      } else {
         TPair *p = dynamic_cast<TPair*>(claims.At(idx));
         if (p) {
            TTimer *t = dynamic_cast<TTimer*>(p->Value());
            if (t) {
               // wait remaining time
               Long64_t wait = t->GetAbsTime()-gSystem->Now();
               if (wait > 0) gSystem->Sleep((UInt_t)wait);
               c = dynamic_cast<TCondorSlave*>(p->Key());
            }
         }
      }

      // create slave
      TSlave *slave = 0;
      if (c) slave = CreateSlave(Form("%s:%d", c->fHostname.Data(), c->fPort), c->fOrdinal,
                                               c->fPerfIdx, c->fImage, c->fWorkDir);

      // add slave to appropriate list
      if (trial < ntries) {
         if (slave && slave->IsValid()) {
            fSlaves->Add(slave);
            if (trial == 1) {
               claims.Remove(c);
            } else {
               TPair *p = dynamic_cast<TPair*>(claims.Remove(c));
               if (p) {
                  TTimer *xt = dynamic_cast<TTimer*>(p->Value());
                  if (xt) delete xt;
                  delete p;
               }
            }
            nClaimsDone++;
            TMessage m(kPROOF_SERVERSTARTED);
            m << TString("Opening connections to workers") << nClaims
               << nClaimsDone << kTRUE;
            gProofServ->GetSocket()->Send(m);
         } else if (slave) {
            if (trial == 1) {
               TTimer* timer = new TTimer(delay);
               TPair *p = new TPair(c, timer);
               claims.RemoveAt(idx);
               claims.AddAt(p, idx);
            } else {
               TPair *p = dynamic_cast<TPair*>(claims.At(idx));
               if (p && p->Value()) {
                  TTimer *xt = dynamic_cast<TTimer*>(p->Value());
                  if (xt) xt->Reset();
               }
            }
            delete slave;
            idx++;
         } else {
            Warning("StartSlaves", "could not create TSlave object!");
         }
      } else {
         if (slave) {
            fSlaves->Add(slave);
            TPair *p = dynamic_cast<TPair*>(claims.Remove(c));
            if (p && p->Value()) {
               TTimer *xt = dynamic_cast<TTimer*>(p->Value());
               delete xt;
            }
            if (p) delete p;

            nClaimsDone++;
            TMessage m(kPROOF_SERVERSTARTED);
            m << TString("Opening connections to workers") << nClaims
               << nClaimsDone << slave->IsValid();
            gProofServ->GetSocket()->Send(m);
         } else {
            Warning("StartSlaves", "could not create TSlave object!");
         }
      }

      if (idx>=claims.GetSize()) {
         trial++;
         idx = 0;
      }
   }

   // Here we finalize the server startup: in this way the bulk
   // of remote operations are almost parallelized
   TIter nxsl(fSlaves);
   TSlave *sl = 0;
   int nSlavesDone = 0, nSlavesTotal = fSlaves->GetSize();
   while ((sl = (TSlave *) nxsl())) {

      // Finalize setup of the server
      if (sl->IsValid()) {
         sl->SetupServ(TSlave::kSlave, 0);
      }

      if (sl->IsValid()) {
         fAllMonitor->Add(sl->GetSocket());
      } else {
         fBadSlaves->Add(sl);
      }

      // Notify end of startup operations
      nSlavesDone++;
      TMessage m(kPROOF_SERVERSTARTED);
      Bool_t wrkvalid = sl->IsValid() ? kTRUE : kFALSE;
      m << TString("Setting up worker servers") << nSlavesTotal
         << nSlavesDone << wrkvalid;
      gProofServ->GetSocket()->Send(m);
   }

   return kTRUE;
}

//______________________________________________________________________________
void TProofCondor::SetActive(Bool_t active)
{
   // Suspend or resume PROOF via Condor.

   if (fTimer == 0) {
      fTimer = new TTimer();
   }
   if (active) {
      PDB(kCondor,1) Info("SetActive","-- Condor Resume --");
      fTimer->Stop();
      if (fCondor->GetState() == TCondor::kSuspended)
         fCondor->Resume();
   } else {
#if 1
      return; // don't suspend for the moment
#else
      Int_t delay = 60000; // milli seconds
      PDB(kCondor,1) Info("SetActive","-- Delayed Condor Suspend (%d msec / to %lld) --",
                          delay, delay + Long64_t(gSystem->Now()));
      fTimer->Connect("Timeout()", "TCondor", fCondor, "Suspend()");
      fTimer->Start(10000, kTRUE); // single shot
#endif
   }
}

//______________________________________________________________________________
TString TProofCondor::GetJobAd()
{
   // Get job Ad

   TString ad;

   ad = "JobUniverse = 5\n"; // vanilla
   ad += Form("Cmd = \"%s/bin/proofd\"\n", GetConfDir());
   ad += Form("Iwd = \"%s\"\n", gSystem->TempDirectory());
   ad += "In = \"/dev/null\"\n";
   ad += Form("Out = \"%s/proofd.out.$(Port)\"\n", gSystem->TempDirectory());
   ad += Form("Err = \"%s/proofd.err.$(Port)\"\n", gSystem->TempDirectory());
   ad += Form("Args = \"-f -p $(Port) -d %d %s\"\n", GetLogLevel(), GetConfDir());

   return ad;
}
 TProofCondor.cxx:1
 TProofCondor.cxx:2
 TProofCondor.cxx:3
 TProofCondor.cxx:4
 TProofCondor.cxx:5
 TProofCondor.cxx:6
 TProofCondor.cxx:7
 TProofCondor.cxx:8
 TProofCondor.cxx:9
 TProofCondor.cxx:10
 TProofCondor.cxx:11
 TProofCondor.cxx:12
 TProofCondor.cxx:13
 TProofCondor.cxx:14
 TProofCondor.cxx:15
 TProofCondor.cxx:16
 TProofCondor.cxx:17
 TProofCondor.cxx:18
 TProofCondor.cxx:19
 TProofCondor.cxx:20
 TProofCondor.cxx:21
 TProofCondor.cxx:22
 TProofCondor.cxx:23
 TProofCondor.cxx:24
 TProofCondor.cxx:25
 TProofCondor.cxx:26
 TProofCondor.cxx:27
 TProofCondor.cxx:28
 TProofCondor.cxx:29
 TProofCondor.cxx:30
 TProofCondor.cxx:31
 TProofCondor.cxx:32
 TProofCondor.cxx:33
 TProofCondor.cxx:34
 TProofCondor.cxx:35
 TProofCondor.cxx:36
 TProofCondor.cxx:37
 TProofCondor.cxx:38
 TProofCondor.cxx:39
 TProofCondor.cxx:40
 TProofCondor.cxx:41
 TProofCondor.cxx:42
 TProofCondor.cxx:43
 TProofCondor.cxx:44
 TProofCondor.cxx:45
 TProofCondor.cxx:46
 TProofCondor.cxx:47
 TProofCondor.cxx:48
 TProofCondor.cxx:49
 TProofCondor.cxx:50
 TProofCondor.cxx:51
 TProofCondor.cxx:52
 TProofCondor.cxx:53
 TProofCondor.cxx:54
 TProofCondor.cxx:55
 TProofCondor.cxx:56
 TProofCondor.cxx:57
 TProofCondor.cxx:58
 TProofCondor.cxx:59
 TProofCondor.cxx:60
 TProofCondor.cxx:61
 TProofCondor.cxx:62
 TProofCondor.cxx:63
 TProofCondor.cxx:64
 TProofCondor.cxx:65
 TProofCondor.cxx:66
 TProofCondor.cxx:67
 TProofCondor.cxx:68
 TProofCondor.cxx:69
 TProofCondor.cxx:70
 TProofCondor.cxx:71
 TProofCondor.cxx:72
 TProofCondor.cxx:73
 TProofCondor.cxx:74
 TProofCondor.cxx:75
 TProofCondor.cxx:76
 TProofCondor.cxx:77
 TProofCondor.cxx:78
 TProofCondor.cxx:79
 TProofCondor.cxx:80
 TProofCondor.cxx:81
 TProofCondor.cxx:82
 TProofCondor.cxx:83
 TProofCondor.cxx:84
 TProofCondor.cxx:85
 TProofCondor.cxx:86
 TProofCondor.cxx:87
 TProofCondor.cxx:88
 TProofCondor.cxx:89
 TProofCondor.cxx:90
 TProofCondor.cxx:91
 TProofCondor.cxx:92
 TProofCondor.cxx:93
 TProofCondor.cxx:94
 TProofCondor.cxx:95
 TProofCondor.cxx:96
 TProofCondor.cxx:97
 TProofCondor.cxx:98
 TProofCondor.cxx:99
 TProofCondor.cxx:100
 TProofCondor.cxx:101
 TProofCondor.cxx:102
 TProofCondor.cxx:103
 TProofCondor.cxx:104
 TProofCondor.cxx:105
 TProofCondor.cxx:106
 TProofCondor.cxx:107
 TProofCondor.cxx:108
 TProofCondor.cxx:109
 TProofCondor.cxx:110
 TProofCondor.cxx:111
 TProofCondor.cxx:112
 TProofCondor.cxx:113
 TProofCondor.cxx:114
 TProofCondor.cxx:115
 TProofCondor.cxx:116
 TProofCondor.cxx:117
 TProofCondor.cxx:118
 TProofCondor.cxx:119
 TProofCondor.cxx:120
 TProofCondor.cxx:121
 TProofCondor.cxx:122
 TProofCondor.cxx:123
 TProofCondor.cxx:124
 TProofCondor.cxx:125
 TProofCondor.cxx:126
 TProofCondor.cxx:127
 TProofCondor.cxx:128
 TProofCondor.cxx:129
 TProofCondor.cxx:130
 TProofCondor.cxx:131
 TProofCondor.cxx:132
 TProofCondor.cxx:133
 TProofCondor.cxx:134
 TProofCondor.cxx:135
 TProofCondor.cxx:136
 TProofCondor.cxx:137
 TProofCondor.cxx:138
 TProofCondor.cxx:139
 TProofCondor.cxx:140
 TProofCondor.cxx:141
 TProofCondor.cxx:142
 TProofCondor.cxx:143
 TProofCondor.cxx:144
 TProofCondor.cxx:145
 TProofCondor.cxx:146
 TProofCondor.cxx:147
 TProofCondor.cxx:148
 TProofCondor.cxx:149
 TProofCondor.cxx:150
 TProofCondor.cxx:151
 TProofCondor.cxx:152
 TProofCondor.cxx:153
 TProofCondor.cxx:154
 TProofCondor.cxx:155
 TProofCondor.cxx:156
 TProofCondor.cxx:157
 TProofCondor.cxx:158
 TProofCondor.cxx:159
 TProofCondor.cxx:160
 TProofCondor.cxx:161
 TProofCondor.cxx:162
 TProofCondor.cxx:163
 TProofCondor.cxx:164
 TProofCondor.cxx:165
 TProofCondor.cxx:166
 TProofCondor.cxx:167
 TProofCondor.cxx:168
 TProofCondor.cxx:169
 TProofCondor.cxx:170
 TProofCondor.cxx:171
 TProofCondor.cxx:172
 TProofCondor.cxx:173
 TProofCondor.cxx:174
 TProofCondor.cxx:175
 TProofCondor.cxx:176
 TProofCondor.cxx:177
 TProofCondor.cxx:178
 TProofCondor.cxx:179
 TProofCondor.cxx:180
 TProofCondor.cxx:181
 TProofCondor.cxx:182
 TProofCondor.cxx:183
 TProofCondor.cxx:184
 TProofCondor.cxx:185
 TProofCondor.cxx:186
 TProofCondor.cxx:187
 TProofCondor.cxx:188
 TProofCondor.cxx:189
 TProofCondor.cxx:190
 TProofCondor.cxx:191
 TProofCondor.cxx:192
 TProofCondor.cxx:193
 TProofCondor.cxx:194
 TProofCondor.cxx:195
 TProofCondor.cxx:196
 TProofCondor.cxx:197
 TProofCondor.cxx:198
 TProofCondor.cxx:199
 TProofCondor.cxx:200
 TProofCondor.cxx:201
 TProofCondor.cxx:202
 TProofCondor.cxx:203
 TProofCondor.cxx:204
 TProofCondor.cxx:205
 TProofCondor.cxx:206
 TProofCondor.cxx:207
 TProofCondor.cxx:208
 TProofCondor.cxx:209
 TProofCondor.cxx:210
 TProofCondor.cxx:211
 TProofCondor.cxx:212
 TProofCondor.cxx:213
 TProofCondor.cxx:214
 TProofCondor.cxx:215
 TProofCondor.cxx:216
 TProofCondor.cxx:217
 TProofCondor.cxx:218
 TProofCondor.cxx:219
 TProofCondor.cxx:220
 TProofCondor.cxx:221
 TProofCondor.cxx:222
 TProofCondor.cxx:223
 TProofCondor.cxx:224
 TProofCondor.cxx:225
 TProofCondor.cxx:226
 TProofCondor.cxx:227
 TProofCondor.cxx:228
 TProofCondor.cxx:229
 TProofCondor.cxx:230
 TProofCondor.cxx:231
 TProofCondor.cxx:232
 TProofCondor.cxx:233
 TProofCondor.cxx:234
 TProofCondor.cxx:235
 TProofCondor.cxx:236
 TProofCondor.cxx:237
 TProofCondor.cxx:238
 TProofCondor.cxx:239
 TProofCondor.cxx:240
 TProofCondor.cxx:241
 TProofCondor.cxx:242
 TProofCondor.cxx:243
 TProofCondor.cxx:244
 TProofCondor.cxx:245
 TProofCondor.cxx:246
 TProofCondor.cxx:247
 TProofCondor.cxx:248
 TProofCondor.cxx:249
 TProofCondor.cxx:250
 TProofCondor.cxx:251
 TProofCondor.cxx:252
 TProofCondor.cxx:253
 TProofCondor.cxx:254
 TProofCondor.cxx:255
 TProofCondor.cxx:256
 TProofCondor.cxx:257
 TProofCondor.cxx:258
 TProofCondor.cxx:259
 TProofCondor.cxx:260
 TProofCondor.cxx:261
 TProofCondor.cxx:262
 TProofCondor.cxx:263
 TProofCondor.cxx:264
 TProofCondor.cxx:265
 TProofCondor.cxx:266
 TProofCondor.cxx:267
 TProofCondor.cxx:268
 TProofCondor.cxx:269
 TProofCondor.cxx:270
 TProofCondor.cxx:271
 TProofCondor.cxx:272
 TProofCondor.cxx:273
 TProofCondor.cxx:274
 TProofCondor.cxx:275
 TProofCondor.cxx:276
 TProofCondor.cxx:277
 TProofCondor.cxx:278
 TProofCondor.cxx:279
 TProofCondor.cxx:280
 TProofCondor.cxx:281
 TProofCondor.cxx:282
 TProofCondor.cxx:283
 TProofCondor.cxx:284
 TProofCondor.cxx:285
 TProofCondor.cxx:286
 TProofCondor.cxx:287
 TProofCondor.cxx:288
 TProofCondor.cxx:289
 TProofCondor.cxx:290
 TProofCondor.cxx:291
 TProofCondor.cxx:292
 TProofCondor.cxx:293
 TProofCondor.cxx:294
 TProofCondor.cxx:295
 TProofCondor.cxx:296
 TProofCondor.cxx:297
 TProofCondor.cxx:298
 TProofCondor.cxx:299
 TProofCondor.cxx:300
 TProofCondor.cxx:301
 TProofCondor.cxx:302
 TProofCondor.cxx:303
 TProofCondor.cxx:304
 TProofCondor.cxx:305
 TProofCondor.cxx:306
 TProofCondor.cxx:307
 TProofCondor.cxx:308
 TProofCondor.cxx:309
 TProofCondor.cxx:310
 TProofCondor.cxx:311
 TProofCondor.cxx:312
 TProofCondor.cxx:313
 TProofCondor.cxx:314
 TProofCondor.cxx:315
 TProofCondor.cxx:316
 TProofCondor.cxx:317
 TProofCondor.cxx:318
 TProofCondor.cxx:319
 TProofCondor.cxx:320
 TProofCondor.cxx:321