// @(#)root/proof:$Id$
// Author:

/*************************************************************************
 * Copyright (C) 1995-2005, Rene Brun and Fons Rademakers.               *
 * All rights reserved.                                                  *
 *                                                                       *
 * For the licensing terms see $ROOTSYS/LICENSE.                         *
 * For the list of contributors see $ROOTSYS/README/CREDITS.             *
 *************************************************************************/

//////////////////////////////////////////////////////////////////////////
//                                                                      //
// TProofNode                                                           //
//                                                                      //
// PROOF worker node information                                        //
//                                                                      //
//////////////////////////////////////////////////////////////////////////

#include "TProofNodes.h"
#include "TProof.h"
#include "TList.h"
#include "TMap.h"
#include "TObjString.h"

ClassImp(TProofNodes)

//______________________________________________________________________________
TProofNodes::TProofNodes(TProof* proof)
            : fProof(proof), fNodes(0), fActiveNodes(0),
              fMaxWrksNode(-1), fMinWrksNode(-1),
              fNNodes(0), fNWrks(0), fNActiveWrks(0), fNCores(0)
{
   // Constructor

   Build();
}

//______________________________________________________________________________
TProofNodes::~TProofNodes()
{
   // Destructor

   if (fNodes) {
      fNodes->SetOwner(kTRUE);
      SafeDelete(fNodes);
   }
}

//______________________________________________________________________________
void TProofNodes::Build()
{
   // Desctiption: Build the node list, which is a list of nodes whose members
   //             in turn are lists of workers on the node.
   // Input: Nothing
   // Return: Nothing

   if (!fProof || !fProof->IsValid()) {
      Warning("Build", "the PROOF instance is undefined or invalid! Cannot continue");
      return;
   }

   if (fNodes){
      fNodes->SetOwner(kTRUE);
      SafeDelete(fNodes);
   }
   fNodes = new TMap;
   fNodes->SetOwner(kTRUE);

   TList *slaves = fProof->GetListOfSlaveInfos();
   TIter nxtslave(slaves);
   TSlaveInfo *si = 0;
   TList *node = 0;
   TPair *pair = 0;
   while ((si = (TSlaveInfo *)(nxtslave()))) {
      TSlaveInfo *si_copy = (TSlaveInfo *)(si->Clone());
      if (!(pair = (TPair *) fNodes->FindObject(si->GetName()))) {
         node = new TList;
         //si's are owned by the member fSlaveInfo of fProof
         node->SetOwner(kTRUE);
         node->SetName(si_copy->GetName());
         node->Add(si_copy);
         fNodes->Add(new TObjString(si->GetName()), node);
      } else {
         node = (TList *) pair->Value();
         node->Add(si_copy);
      }
   }
   // Update counters and created active nodes map
   if (fActiveNodes){
      fActiveNodes->SetOwner(kTRUE);
      SafeDelete(fActiveNodes);
   }
   fActiveNodes = new TMap;
   fActiveNodes->SetOwner(kTRUE);
   TList *actnode = 0;
   fMaxWrksNode = -1;
   fMinWrksNode = -1;
   fNNodes = 0;
   fNWrks = 0;
   fNActiveWrks = 0;
   TIter nxk(fNodes);
   TObject *key = 0;
   while ((key = nxk()) != 0) {
      node = dynamic_cast<TList *>(fNodes->GetValue(key));
      if (node) {
         fNNodes++;
         // Number of cores
         si = (TSlaveInfo *) node->First();
         fNCores += si->fSysInfo.fCpus;
         // Number of workers
         fNWrks += node->GetSize();
         if (fMinWrksNode == -1 || (node->GetSize() < fMinWrksNode)) {
            fMinWrksNode = node->GetSize();
         }
         if (fMaxWrksNode == -1 || (node->GetSize() > fMaxWrksNode)) {
            fMaxWrksNode = node->GetSize();
         }
         TIter nxw(node);
         while ((si = (TSlaveInfo *) nxw())) {
            if (si->fStatus == TSlaveInfo::kActive) {
               fNActiveWrks++;
               TSlaveInfo *si_copy = (TSlaveInfo *)(si->Clone());
               actnode = dynamic_cast<TList *>(fActiveNodes->GetValue(key));
               if (actnode) {
                  actnode->Add(si_copy);
               } else {
                  actnode = new TList;
                  actnode->SetOwner(kTRUE);
                  actnode->SetName(si_copy->GetName());
                  actnode->Add(si_copy);
                  fActiveNodes->Add(new TObjString(si->GetName()), actnode);
               }
            }
         }
      } else {
         Warning("Build", "could not get list for node '%s'", key->GetName());
      }
   }

   // Done
   return;
}

//______________________________________________________________________________
Int_t TProofNodes::ActivateWorkers(Int_t nwrks)
{
   // Description: Activate 'nwrks' workers; calls TProof::SetParallel and
   //              rebuild the internal lists
   // Input: number of workers
   // Return: 0 is successful, <0 otherwise.

   Int_t nw = fProof->SetParallel(nwrks);
   if (nw > 0) {
      if (nw != nwrks)
         Warning("ActivateWorkers", "requested %d got %d", nwrks, nw);
      Build();
   }
   return nw;
}

//______________________________________________________________________________
Int_t TProofNodes::ActivateWorkers(const char *workers)
{
   // Description: Activate the same number of workers on all nodes.
   // Input: workers: string of the form "nx" where non-negative integer n
   //                 is the number of worker on each node to be activated.
   // Return: The number of active workers per node when the operation is
   //         successful.
   //         <0 otherwise.

   TString toactivate;
   TString todeactivate;

   // The TProof::ActivateWorker/TProof::DeactivateWorker functions were fixed /
   // improved starting with protocol version 33
   Bool_t protocol33 = kTRUE;
   if (fProof->GetRemoteProtocol() < 33 || fProof->GetClientProtocol() < 33) {
      protocol33 = kFALSE;
      // This resets the lists to avoid the problem fixed in protocol 33
      fProof->SetParallel(0);
   }

   //Make sure worker list is up-to-date
   Build();

   TString sworkers = TString(workers).Strip(TString::kTrailing, 'x');
   if (!sworkers.IsDigit()) {
      Error("ActivateWorkers", "wrongly formatted argument: %s - cannot continue", workers);
      return -1;
   }
   Int_t nworkersnode = sworkers.Atoi();
   Int_t ret = nworkersnode;
   TSlaveInfo *si = 0;
   TList *node = 0;
   TObject *key = 0;

   TIter nxk(fNodes);
   while ((key = nxk()) != 0) {
      if ((node = dynamic_cast<TList *>(fNodes->GetValue(key)))) {
         TIter nxtworker(node);
         Int_t nactiveworkers = 0;
         while ((si = (TSlaveInfo *)(nxtworker()))) {
            if (nactiveworkers < nworkersnode) {
               if (si->fStatus == TSlaveInfo::kNotActive) {
                  if (protocol33) {
                      toactivate += TString::Format("%s,", si->GetOrdinal());
                  } else {
                     fProof->ActivateWorker(si->GetOrdinal());
                  }
               }
               nactiveworkers++;
            } else {
               if (si->fStatus == TSlaveInfo::kActive) {
                  if (protocol33) {
                      todeactivate += TString::Format("%s,", si->GetOrdinal());
                  } else {
                     fProof->DeactivateWorker(si->GetOrdinal());
                  }
               }
            }
         }
      } else {
         Warning("ActivateWorkers", "could not get list for node '%s'", key->GetName());
      }
   }

   if (!todeactivate.IsNull()) {
      todeactivate.Remove(TString::kTrailing, ',');
      if (fProof->DeactivateWorker(todeactivate) < 0) ret = -1;
   }
   if (!toactivate.IsNull()) {
      toactivate.Remove(TString::kTrailing, ',');
      if (fProof->ActivateWorker(toactivate) < 0) ret = -1;
   }
   if (ret < 0) {
      Warning("ActivateWorkers", "could not get the requested number of workers per node (%d)",
                                  nworkersnode);
      return ret;
   }

   // Rebuild
   Build();

   // Build() destroyes fNodes so we need to re-create the iterator, resetting is not enough ...
   TIter nxkn(fNodes);
   while ((key = nxkn()) != 0) {
      if ((node = dynamic_cast<TList *>(fNodes->GetValue(key)))) {
         TIter nxtworker(node);
         Int_t nactiveworkers = 0;
         while ((si = (TSlaveInfo *)(nxtworker()))) {
            if (si->fStatus == TSlaveInfo::kActive) nactiveworkers++;
         }
         if (nactiveworkers != nworkersnode) {
            Warning("ActivateWorkers", "only %d (out of %d requested) workers "
                                       "were activated on node %s",
                                       nactiveworkers, nworkersnode, node->GetName());
            ret = -1;
         }
      } else {
         Warning("ActivateWorkers", "could not get list for node '%s'", key->GetName());
      }
   }

   // Done
   return ret;
}

//______________________________________________________________________________
void TProofNodes::Print(Option_t* option) const
{
   // Description: Print node information.

   TIter nxk(fNodes);
   TObject *key = 0;
   while ((key = nxk()) != 0) {
      TList *node = dynamic_cast<TList *>(fNodes->GetValue(key));
      if (node) {
         node->Print(option);
      } else {
         Warning("Print", "could not get list for node '%s'", key->GetName());
      }
   }
}
 TProofNodes.cxx:1
 TProofNodes.cxx:2
 TProofNodes.cxx:3
 TProofNodes.cxx:4
 TProofNodes.cxx:5
 TProofNodes.cxx:6
 TProofNodes.cxx:7
 TProofNodes.cxx:8
 TProofNodes.cxx:9
 TProofNodes.cxx:10
 TProofNodes.cxx:11
 TProofNodes.cxx:12
 TProofNodes.cxx:13
 TProofNodes.cxx:14
 TProofNodes.cxx:15
 TProofNodes.cxx:16
 TProofNodes.cxx:17
 TProofNodes.cxx:18
 TProofNodes.cxx:19
 TProofNodes.cxx:20
 TProofNodes.cxx:21
 TProofNodes.cxx:22
 TProofNodes.cxx:23
 TProofNodes.cxx:24
 TProofNodes.cxx:25
 TProofNodes.cxx:26
 TProofNodes.cxx:27
 TProofNodes.cxx:28
 TProofNodes.cxx:29
 TProofNodes.cxx:30
 TProofNodes.cxx:31
 TProofNodes.cxx:32
 TProofNodes.cxx:33
 TProofNodes.cxx:34
 TProofNodes.cxx:35
 TProofNodes.cxx:36
 TProofNodes.cxx:37
 TProofNodes.cxx:38
 TProofNodes.cxx:39
 TProofNodes.cxx:40
 TProofNodes.cxx:41
 TProofNodes.cxx:42
 TProofNodes.cxx:43
 TProofNodes.cxx:44
 TProofNodes.cxx:45
 TProofNodes.cxx:46
 TProofNodes.cxx:47
 TProofNodes.cxx:48
 TProofNodes.cxx:49
 TProofNodes.cxx:50
 TProofNodes.cxx:51
 TProofNodes.cxx:52
 TProofNodes.cxx:53
 TProofNodes.cxx:54
 TProofNodes.cxx:55
 TProofNodes.cxx:56
 TProofNodes.cxx:57
 TProofNodes.cxx:58
 TProofNodes.cxx:59
 TProofNodes.cxx:60
 TProofNodes.cxx:61
 TProofNodes.cxx:62
 TProofNodes.cxx:63
 TProofNodes.cxx:64
 TProofNodes.cxx:65
 TProofNodes.cxx:66
 TProofNodes.cxx:67
 TProofNodes.cxx:68
 TProofNodes.cxx:69
 TProofNodes.cxx:70
 TProofNodes.cxx:71
 TProofNodes.cxx:72
 TProofNodes.cxx:73
 TProofNodes.cxx:74
 TProofNodes.cxx:75
 TProofNodes.cxx:76
 TProofNodes.cxx:77
 TProofNodes.cxx:78
 TProofNodes.cxx:79
 TProofNodes.cxx:80
 TProofNodes.cxx:81
 TProofNodes.cxx:82
 TProofNodes.cxx:83
 TProofNodes.cxx:84
 TProofNodes.cxx:85
 TProofNodes.cxx:86
 TProofNodes.cxx:87
 TProofNodes.cxx:88
 TProofNodes.cxx:89
 TProofNodes.cxx:90
 TProofNodes.cxx:91
 TProofNodes.cxx:92
 TProofNodes.cxx:93
 TProofNodes.cxx:94
 TProofNodes.cxx:95
 TProofNodes.cxx:96
 TProofNodes.cxx:97
 TProofNodes.cxx:98
 TProofNodes.cxx:99
 TProofNodes.cxx:100
 TProofNodes.cxx:101
 TProofNodes.cxx:102
 TProofNodes.cxx:103
 TProofNodes.cxx:104
 TProofNodes.cxx:105
 TProofNodes.cxx:106
 TProofNodes.cxx:107
 TProofNodes.cxx:108
 TProofNodes.cxx:109
 TProofNodes.cxx:110
 TProofNodes.cxx:111
 TProofNodes.cxx:112
 TProofNodes.cxx:113
 TProofNodes.cxx:114
 TProofNodes.cxx:115
 TProofNodes.cxx:116
 TProofNodes.cxx:117
 TProofNodes.cxx:118
 TProofNodes.cxx:119
 TProofNodes.cxx:120
 TProofNodes.cxx:121
 TProofNodes.cxx:122
 TProofNodes.cxx:123
 TProofNodes.cxx:124
 TProofNodes.cxx:125
 TProofNodes.cxx:126
 TProofNodes.cxx:127
 TProofNodes.cxx:128
 TProofNodes.cxx:129
 TProofNodes.cxx:130
 TProofNodes.cxx:131
 TProofNodes.cxx:132
 TProofNodes.cxx:133
 TProofNodes.cxx:134
 TProofNodes.cxx:135
 TProofNodes.cxx:136
 TProofNodes.cxx:137
 TProofNodes.cxx:138
 TProofNodes.cxx:139
 TProofNodes.cxx:140
 TProofNodes.cxx:141
 TProofNodes.cxx:142
 TProofNodes.cxx:143
 TProofNodes.cxx:144
 TProofNodes.cxx:145
 TProofNodes.cxx:146
 TProofNodes.cxx:147
 TProofNodes.cxx:148
 TProofNodes.cxx:149
 TProofNodes.cxx:150
 TProofNodes.cxx:151
 TProofNodes.cxx:152
 TProofNodes.cxx:153
 TProofNodes.cxx:154
 TProofNodes.cxx:155
 TProofNodes.cxx:156
 TProofNodes.cxx:157
 TProofNodes.cxx:158
 TProofNodes.cxx:159
 TProofNodes.cxx:160
 TProofNodes.cxx:161
 TProofNodes.cxx:162
 TProofNodes.cxx:163
 TProofNodes.cxx:164
 TProofNodes.cxx:165
 TProofNodes.cxx:166
 TProofNodes.cxx:167
 TProofNodes.cxx:168
 TProofNodes.cxx:169
 TProofNodes.cxx:170
 TProofNodes.cxx:171
 TProofNodes.cxx:172
 TProofNodes.cxx:173
 TProofNodes.cxx:174
 TProofNodes.cxx:175
 TProofNodes.cxx:176
 TProofNodes.cxx:177
 TProofNodes.cxx:178
 TProofNodes.cxx:179
 TProofNodes.cxx:180
 TProofNodes.cxx:181
 TProofNodes.cxx:182
 TProofNodes.cxx:183
 TProofNodes.cxx:184
 TProofNodes.cxx:185
 TProofNodes.cxx:186
 TProofNodes.cxx:187
 TProofNodes.cxx:188
 TProofNodes.cxx:189
 TProofNodes.cxx:190
 TProofNodes.cxx:191
 TProofNodes.cxx:192
 TProofNodes.cxx:193
 TProofNodes.cxx:194
 TProofNodes.cxx:195
 TProofNodes.cxx:196
 TProofNodes.cxx:197
 TProofNodes.cxx:198
 TProofNodes.cxx:199
 TProofNodes.cxx:200
 TProofNodes.cxx:201
 TProofNodes.cxx:202
 TProofNodes.cxx:203
 TProofNodes.cxx:204
 TProofNodes.cxx:205
 TProofNodes.cxx:206
 TProofNodes.cxx:207
 TProofNodes.cxx:208
 TProofNodes.cxx:209
 TProofNodes.cxx:210
 TProofNodes.cxx:211
 TProofNodes.cxx:212
 TProofNodes.cxx:213
 TProofNodes.cxx:214
 TProofNodes.cxx:215
 TProofNodes.cxx:216
 TProofNodes.cxx:217
 TProofNodes.cxx:218
 TProofNodes.cxx:219
 TProofNodes.cxx:220
 TProofNodes.cxx:221
 TProofNodes.cxx:222
 TProofNodes.cxx:223
 TProofNodes.cxx:224
 TProofNodes.cxx:225
 TProofNodes.cxx:226
 TProofNodes.cxx:227
 TProofNodes.cxx:228
 TProofNodes.cxx:229
 TProofNodes.cxx:230
 TProofNodes.cxx:231
 TProofNodes.cxx:232
 TProofNodes.cxx:233
 TProofNodes.cxx:234
 TProofNodes.cxx:235
 TProofNodes.cxx:236
 TProofNodes.cxx:237
 TProofNodes.cxx:238
 TProofNodes.cxx:239
 TProofNodes.cxx:240
 TProofNodes.cxx:241
 TProofNodes.cxx:242
 TProofNodes.cxx:243
 TProofNodes.cxx:244
 TProofNodes.cxx:245
 TProofNodes.cxx:246
 TProofNodes.cxx:247
 TProofNodes.cxx:248
 TProofNodes.cxx:249
 TProofNodes.cxx:250
 TProofNodes.cxx:251
 TProofNodes.cxx:252
 TProofNodes.cxx:253
 TProofNodes.cxx:254
 TProofNodes.cxx:255
 TProofNodes.cxx:256
 TProofNodes.cxx:257
 TProofNodes.cxx:258
 TProofNodes.cxx:259
 TProofNodes.cxx:260
 TProofNodes.cxx:261
 TProofNodes.cxx:262
 TProofNodes.cxx:263
 TProofNodes.cxx:264
 TProofNodes.cxx:265
 TProofNodes.cxx:266
 TProofNodes.cxx:267
 TProofNodes.cxx:268
 TProofNodes.cxx:269
 TProofNodes.cxx:270
 TProofNodes.cxx:271
 TProofNodes.cxx:272
 TProofNodes.cxx:273
 TProofNodes.cxx:274
 TProofNodes.cxx:275
 TProofNodes.cxx:276
 TProofNodes.cxx:277
 TProofNodes.cxx:278
 TProofNodes.cxx:279
 TProofNodes.cxx:280
 TProofNodes.cxx:281
 TProofNodes.cxx:282
 TProofNodes.cxx:283
 TProofNodes.cxx:284