Logo ROOT   6.08/07
Reference Guide
XrdProofWorker.cxx
Go to the documentation of this file.
1 // @(#)root/proofd:$Id$
2 // Author: Gerardo Ganis June 2007
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2005, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 //////////////////////////////////////////////////////////////////////////
13 // //
14 // XrdProofWorker //
15 // //
16 // Authors: G. Ganis, CERN, 2007 //
17 // //
18 // Class with information about a potential worker. //
19 // A list of instances of this class is built using the config file or //
20 // or the information collected from the resource discoverers. //
21 // //
22 //////////////////////////////////////////////////////////////////////////
23 
24 #include <time.h>
25 
26 #include "XrdProofdAux.h"
27 #include "XrdProofWorker.h"
28 #include "XrdProofdProofServ.h"
30 #include "XpdSysDNS.h"
31 #include "XProofProtocol.h"
32 
33 // Tracing utilities
34 #include "XrdProofdTrace.h"
35 
36 ////////////////////////////////////////////////////////////////////////////////
37 /// Constructor from a config file-like string
38 
40  : fExport(256), fType('W'), fPort(-1), fPerfIdx(100), fNwrks(1), fActive(1)
41 {
42  fMutex = new XrdSysRecMutex;
43 
44  // Make sure we got something to parse
45  if (!str || strlen(str) <= 0)
46  return;
47 
48  // The actual work is done by Reset()
49  Reset(str);
50 }
51 ////////////////////////////////////////////////////////////////////////////////
52 /// Destructor
53 
55 {
56  SafeDel(fMutex);
57 }
58 
59 ////////////////////////////////////////////////////////////////////////////////
60 /// Copy constructor
61 
63 {
64  fMutex = new XrdSysRecMutex;
65  fExport = w.fExport;
66  fType = w.fType;
67  fHost = w.fHost;
68  fPort = w.fPort;
69  fPerfIdx = w.fPerfIdx;
70  fImage = w.fImage;
71  fWorkDir = w.fWorkDir;
72  fMsd = w.fMsd;
73  fId = w.fId;
74  fNwrks = w.fNwrks;
75  fOrd = "";
76  fActive = 1;
77 }
78 
79 ////////////////////////////////////////////////////////////////////////////////
80 /// Set content from a config file-like string
81 
82 void XrdProofWorker::Reset(const char *str)
83 {
84  XPDLOC(NMGR, "Worker::Reset")
85 
86 
87  // Reinit vars
88  fExport = "";
89  fType = 'W';
90  fHost = "";
92  fPerfIdx = 100;
93  fImage = "";
94  fWorkDir = "";
95  fMsd = "";
96  fId = "";
97  fNwrks = 1;
98  fOrd = "";
99 
100  // Make sure we got something to parse
101  if (!str || strlen(str) <= 0)
102  return;
103 
104  // Tokenize the string
105  XrdOucString s(str);
106 
107  // First token is the type
108  XrdOucString tok;
109  XrdOucString typestr = "master|submaster|worker|slave";
110  int from = s.tokenize(tok, 0, ' ');
111  if (from == STR_NPOS || typestr.find(tok) == STR_NPOS)
112  return;
113  if (tok == "submaster")
114  fType = 'S';
115  else if (tok == "master")
116  fType = 'M';
117 
118  // Next token is the user@host:port string, make sure it is a full qualified host name
119  if ((from = s.tokenize(tok, from, ' ')) == STR_NPOS)
120  return;
121  XrdClientUrlInfo ui(tok.c_str());
122  // Take the user name, if specified
123  fUser = ui.User;
124  char *err;
125  char *fullHostName = XrdSysDNS::getHostName((char *)ui.Host.c_str(), &err);
126  if (!fullHostName || (fullHostName && !strcmp(fullHostName, "0.0.0.0"))) {
127  TRACE(XERR, "DNS could not resolve '" << ui.Host << "'");
128  SafeFree(fullHostName);
129  return;
130  }
131  fHost = fullHostName;
132  SafeFree(fullHostName);
133  // Take the port, if specified
134  fPort = (ui.Port > 0) ? ui.Port : fPort;
135 
136  // and then the remaining options
137  while ((from = s.tokenize(tok, from, ' ')) != STR_NPOS) {
138  if (tok.beginswith("workdir=")) {
139  // Working dir
140  tok.replace("workdir=", "");
141  fWorkDir = tok;
142  } else if (tok.beginswith("image=")) {
143  // Image
144  tok.replace("image=", "");
145  fImage = tok;
146  } else if (tok.beginswith("msd=")) {
147  // Mass storage domain
148  tok.replace("msd=", "");
149  fMsd = tok;
150  } else if (tok.beginswith("port=")) {
151  // Port
152  tok.replace("port=", "");
153  fPort = strtol(tok.c_str(), (char **)0, 10);
154  } else if (tok.beginswith("perf=")) {
155  // Performance index
156  tok.replace("perf=", "");
157  fPerfIdx = strtol(tok.c_str(), (char **)0, 10);
158  } else if (!tok.beginswith("repeat=")) {
159  // Unknown
160  TRACE(XERR, "ignoring unknown option '" << tok << "'");
161  }
162  }
163 }
164 
165 ////////////////////////////////////////////////////////////////////////////////
166 /// Check compatibility of host with this instance.
167 /// return 1 if compatible.
168 
169 bool XrdProofWorker::Matches(const char *host)
170 {
171  return ((fHost.matches(host)) ? 1 : 0);
172 }
173 
174 ////////////////////////////////////////////////////////////////////////////////
175 /// Set content from a config file-like string
176 
178 {
179  // Check if 'wrk' is on the same node that 'this'; used to find the unique
180  // worker nodes.
181  // return 1 if the node is the same.
182 
183  if (wrk) {
184  // Check Host names
185  if (wrk->fHost == fHost) {
186  // Check ports
187  int pa = (fPort > 0) ? fPort : XPD_DEF_PORT;
188  int pb = (wrk->fPort > 0) ? wrk->fPort : XPD_DEF_PORT;
189  if (pa == pb)
190  return 1;
191  }
192  }
193 
194  // They do not match
195  return 0;
196 }
197 
198 ////////////////////////////////////////////////////////////////////////////////
199 /// Export current content in a form understood by parsing algorithms
200 /// inside the PROOF session, i.e.
201 /// <type>|<user@host>|<port>|<ord>|-|<perfidx>|<img>|<workdir>|<msd>
202 
203 const char *XrdProofWorker::Export(const char *ord)
204 {
205  XPDLOC(NMGR, "Worker::Export")
206 
207  fExport = fType;
208 
209  // Add user@host
210  fExport += '|' ;
211  if (fUser.length() > 0) {
212  fExport += fUser;
213  fExport += "@";
214  }
215  fExport += fHost;
216 
217  // Add port
218  if (fPort > 0) {
219  fExport += '|' ;
220  fExport += fPort;
221  } else
222  fExport += "|-";
223 
224  // Ordinal only if passed as argument
225  if (ord && strlen(ord) > 0) {
226  // Add ordinal
227  fExport += '|' ;
228  fExport += ord;
229  } else if (fOrd.length() > 0) {
230  // Add ordinal
231  fExport += '|' ;
232  fExport += fOrd;
233  } else {
234  // No ordinal at this level
235  fExport += "|-";
236  }
237  // ID at this level
238  fExport += "|-";
239 
240  // Add performance index
241  fExport += '|' ;
242  fExport += fPerfIdx;
243 
244  // Add image
245  if (fImage.length() > 0) {
246  fExport += '|' ;
247  fExport += fImage;
248  } else
249  fExport += "|-";
250 
251  // Add workdir
252  if (fWorkDir.length() > 0) {
253  fExport += '|' ;
254  fExport += fWorkDir;
255  } else
256  fExport += "|-";
257 
258  // Add mass storage domain
259  if (fMsd.length() > 0) {
260  fExport += '|' ;
261  fExport += fMsd;
262  } else
263  fExport += "|-";
264 
265  // Add Nwrks
266  fExport += "|-|" ;
267  fExport += fNwrks;
268 
269  // We are done
270  TRACE(DBG, "sending: " << fExport);
271  return fExport.c_str();
272 }
273 
274 ////////////////////////////////////////////////////////////////////////////////
275 /// Calculate the number of workers existing on this node which are
276 /// currently running.
277 /// TODO: optimally, one could contact the packetizer and count the
278 /// opened files.
279 
281 {
282  int myRunning = 0;
283  std::list<XrdProofdProofServ *>::iterator iter;
285  for (iter = fProofServs.begin(); iter != fProofServs.end(); ++iter) {
286  if (*iter) {
287  if ((*iter)->Status() == kXPD_running)
288  myRunning++;
289  }
290  }
291  return myRunning;
292 }
293 
294 ////////////////////////////////////////////////////////////////////////////////
295 /// Merge session objects from the other worker object in order to merge all
296 /// the objects in only one. This was added to support hybrid satatically and
297 /// dinamically Bonjour workers discovery.
298 
300 {
301  std::list<XrdProofdProofServ *>::const_iterator iter;
303  for (iter = other.fProofServs.begin(); iter != other.fProofServs.end(); ++iter) {
304  this->fProofServs.push_back(*iter);
305  }
306 }
307 
308 ////////////////////////////////////////////////////////////////////////////////
309 /// Sort ascendingly the list according to the comparing algorithm defined
310 /// by 'f'; 'f' should return 'true' if 'rhs' > 'lhs'.
311 /// This is implemented because on Solaris where std::list::sort() does not
312 /// support an alternative comparison algorithm.
313 
314 void XrdProofWorker::Sort(std::list<XrdProofWorker *> *lst,
315  bool (*f)(XrdProofWorker *&lhs, XrdProofWorker *&rhs))
316 {
317  // Check argument
318  if (!lst)
319  return;
320 
321  // If empty or just one element, nothing to do
322  if (lst->size() < 2)
323  return;
324 
325  // Fill a temp array with the current status
326  XrdProofWorker **ta = new XrdProofWorker *[lst->size() - 1];
327  std::list<XrdProofWorker *>::iterator i = lst->begin();
328  i++; // skip master
329  int n = 0;
330  for (; i != lst->end(); ++i)
331  ta[n++] = *i;
332 
333  // Now start the loops
334  XrdProofWorker *tmp = 0;
335  bool notyet = 1;
336  int jold = 0;
337  while (notyet) {
338  int j = jold;
339  while (j < n - 1) {
340  if (f(ta[j], ta[j+1]))
341  break;
342  j++;
343  }
344  if (j >= n - 1) {
345  notyet = 0;
346  } else {
347  jold = j + 1;
348  XPDSWAP(ta[j], ta[j+1], tmp);
349  int k = j;
350  while (k > 0) {
351  if (!f(ta[k], ta[k-1])) {
352  XPDSWAP(ta[k], ta[k-1], tmp);
353  } else {
354  break;
355  }
356  k--;
357  }
358  }
359  }
360 
361  // Empty the original list
362  XrdProofWorker *mst = lst->front();
363  lst->clear();
364  lst->push_back(mst);
365 
366  // Fill it again
367  while (n--)
368  lst->push_back(ta[n]);
369 
370  // Clean up
371  delete[] ta;
372 }
const char * Export(const char *ord=0)
Export current content in a form understood by parsing algorithms inside the PROOF session...
std::list< XrdProofdProofServ * > fProofServs
#define SafeDel(x)
Definition: XrdProofdAux.h:335
virtual ~XrdProofWorker()
Destructor.
XrdOucString fId
#define XrdSysRecMutex
Definition: XrdSysToOuc.h:18
#define TRACE(Flag, Args)
Definition: TGHtml.h:124
XrdOucString fImage
XrdSysRecMutex * fMutex
void Reset(const char *str)
Set content from a config file-like string.
XrdOucString fUser
while((line_o=(*lineIt)()))
Definition: HLFactory.cxx:464
XrdOucString fMsd
int GetNActiveSessions()
Calculate the number of workers existing on this node which are currently running.
bool Matches(const char *host)
Check compatibility of host with this instance.
#define XPDLOC(d, x)
#define XrdSysMutexHelper
Definition: XrdSysToOuc.h:17
void MergeProofServs(const XrdProofWorker &other)
Merge session objects from the other worker object in order to merge all the objects in only one...
#define XPDSWAP(a, b, t)
Definition: XrdProofdAux.h:364
PyObject * fType
double f(double x)
#define SafeFree(x)
Definition: XrdProofdAux.h:341
XrdProofWorker(const char *str=0)
Constructor from a config file-like string.
static void Sort(std::list< XrdProofWorker *> *lst, bool(*f)(XrdProofWorker *&lhs, XrdProofWorker *&rhs))
Sort ascendingly the list according to the comparing algorithm defined by &#39;f&#39;; &#39;f&#39; should return &#39;tru...
#define XPD_DEF_PORT
XrdOucString fOrd
XrdOucString fWorkDir
const Int_t n
Definition: legend1.C:16
if(line.BeginsWith("/*"))
Definition: HLFactory.cxx:443
XrdOucString fExport
XrdOucString fHost