Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
XrdProofWorker.cxx
Go to the documentation of this file.
1// @(#)root/proofd:$Id$
2// Author: Gerardo Ganis June 2007
3
4/*************************************************************************
5 * Copyright (C) 1995-2005, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12//////////////////////////////////////////////////////////////////////////
13// //
14// XrdProofWorker //
15// //
16// Authors: G. Ganis, CERN, 2007 //
17// //
18// Class with information about a potential worker. //
19// A list of instances of this class is built using the config file or //
20// or the information collected from the resource discoverers. //
21// //
22//////////////////////////////////////////////////////////////////////////
23
24#include <time.h>
25
26#include "XrdProofdAux.h"
27#include "XrdProofWorker.h"
28#include "XrdProofdProofServ.h"
30#include "XpdSysDNS.h"
31#include "XProofProtocol.h"
32
33// Tracing utilities
34#include "XrdProofdTrace.h"
35
36////////////////////////////////////////////////////////////////////////////////
37/// Constructor from a config file-like string
38
40 : fExport(256), fType('W'), fPort(-1), fPerfIdx(100), fNwrks(1), fActive(1)
41{
42 fMutex = new XrdSysRecMutex;
43
44 // Make sure we got something to parse
45 if (!str || strlen(str) <= 0)
46 return;
47
48 // The actual work is done by Reset()
49 Reset(str);
50}
51////////////////////////////////////////////////////////////////////////////////
52/// Destructor
53
55{
57}
58
59////////////////////////////////////////////////////////////////////////////////
60/// Copy constructor
61
63{
64 fMutex = new XrdSysRecMutex;
65 fExport = w.fExport;
66 fType = w.fType;
67 fHost = w.fHost;
68 fPort = w.fPort;
70 fImage = w.fImage;
72 fMsd = w.fMsd;
73 fId = w.fId;
74 fNwrks = w.fNwrks;
75 fOrd = "";
76 fActive = 1;
77}
78
79////////////////////////////////////////////////////////////////////////////////
80/// Set content from a config file-like string
81
82void XrdProofWorker::Reset(const char *str)
83{
84 XPDLOC(NMGR, "Worker::Reset")
85
86
87 // Reinit vars
88 fExport = "";
89 fType = 'W';
90 fHost = "";
92 fPerfIdx = 100;
93 fImage = "";
94 fWorkDir = "";
95 fMsd = "";
96 fId = "";
97 fNwrks = 1;
98 fOrd = "";
99
100 // Make sure we got something to parse
101 if (!str || strlen(str) <= 0)
102 return;
103
104 // Tokenize the string
105 XrdOucString s(str);
106
107 // First token is the type
108 XrdOucString tok;
109 XrdOucString typestr = "master|submaster|worker|slave";
110 int from = s.tokenize(tok, 0, ' ');
111 if (from == STR_NPOS || typestr.find(tok) == STR_NPOS)
112 return;
113 if (tok == "submaster")
114 fType = 'S';
115 else if (tok == "master")
116 fType = 'M';
117
118 // Next token is the user@host:port string, make sure it is a full qualified host name
119 if ((from = s.tokenize(tok, from, ' ')) == STR_NPOS)
120 return;
121 XrdClientUrlInfo ui(tok.c_str());
122 // Take the user name, if specified
123 fUser = ui.User;
124 char *err;
125 char *fullHostName = XrdSysDNS::getHostName((char *)ui.Host.c_str(), &err);
126 if (!fullHostName || (fullHostName && !strcmp(fullHostName, "0.0.0.0"))) {
127 TRACE(XERR, "DNS could not resolve '" << ui.Host << "'");
128 SafeFree(fullHostName);
129 return;
130 }
131 fHost = fullHostName;
132 SafeFree(fullHostName);
133 // Take the port, if specified
134 fPort = (ui.Port > 0) ? ui.Port : fPort;
135
136 // and then the remaining options
137 while ((from = s.tokenize(tok, from, ' ')) != STR_NPOS) {
138 if (tok.beginswith("workdir=")) {
139 // Working dir
140 tok.replace("workdir=", "");
141 fWorkDir = tok;
142 } else if (tok.beginswith("image=")) {
143 // Image
144 tok.replace("image=", "");
145 fImage = tok;
146 } else if (tok.beginswith("msd=")) {
147 // Mass storage domain
148 tok.replace("msd=", "");
149 fMsd = tok;
150 } else if (tok.beginswith("port=")) {
151 // Port
152 tok.replace("port=", "");
153 fPort = strtol(tok.c_str(), (char **)0, 10);
154 } else if (tok.beginswith("perf=")) {
155 // Performance index
156 tok.replace("perf=", "");
157 fPerfIdx = strtol(tok.c_str(), (char **)0, 10);
158 } else if (!tok.beginswith("repeat=")) {
159 // Unknown
160 TRACE(XERR, "ignoring unknown option '" << tok << "'");
161 }
162 }
163}
164
165////////////////////////////////////////////////////////////////////////////////
166/// Check compatibility of host with this instance.
167/// return 1 if compatible.
168
169bool XrdProofWorker::Matches(const char *host)
170{
171 return ((fHost.matches(host)) ? 1 : 0);
172}
173
174////////////////////////////////////////////////////////////////////////////////
175/// Set content from a config file-like string
176
178{
179 // Check if 'wrk' is on the same node that 'this'; used to find the unique
180 // worker nodes.
181 // return 1 if the node is the same.
182
183 if (wrk) {
184 // Check Host names
185 if (wrk->fHost == fHost) {
186 // Check ports
187 int pa = (fPort > 0) ? fPort : XPD_DEF_PORT;
188 int pb = (wrk->fPort > 0) ? wrk->fPort : XPD_DEF_PORT;
189 if (pa == pb)
190 return 1;
191 }
192 }
193
194 // They do not match
195 return 0;
196}
197
198////////////////////////////////////////////////////////////////////////////////
199/// Export current content in a form understood by parsing algorithms
200/// inside the PROOF session, i.e.
201/// <type>|<user@host>|<port>|<ord>|-|<perfidx>|<img>|<workdir>|<msd>
202
203const char *XrdProofWorker::Export(const char *ord)
204{
205 XPDLOC(NMGR, "Worker::Export")
206
207 fExport = fType;
208
209 // Add user@host
210 fExport += '|' ;
211 if (fUser.length() > 0) {
212 fExport += fUser;
213 fExport += "@";
214 }
215 fExport += fHost;
216
217 // Add port
218 if (fPort > 0) {
219 fExport += '|' ;
220 fExport += fPort;
221 } else
222 fExport += "|-";
223
224 // Ordinal only if passed as argument
225 if (ord && strlen(ord) > 0) {
226 // Add ordinal
227 fExport += '|' ;
228 fExport += ord;
229 } else if (fOrd.length() > 0) {
230 // Add ordinal
231 fExport += '|' ;
232 fExport += fOrd;
233 } else {
234 // No ordinal at this level
235 fExport += "|-";
236 }
237 // ID at this level
238 fExport += "|-";
239
240 // Add performance index
241 fExport += '|' ;
242 fExport += fPerfIdx;
243
244 // Add image
245 if (fImage.length() > 0) {
246 fExport += '|' ;
247 fExport += fImage;
248 } else
249 fExport += "|-";
250
251 // Add workdir
252 if (fWorkDir.length() > 0) {
253 fExport += '|' ;
254 fExport += fWorkDir;
255 } else
256 fExport += "|-";
257
258 // Add mass storage domain
259 if (fMsd.length() > 0) {
260 fExport += '|' ;
261 fExport += fMsd;
262 } else
263 fExport += "|-";
264
265 // Add Nwrks
266 fExport += "|-|" ;
267 fExport += fNwrks;
268
269 // We are done
270 TRACE(DBG, "sending: " << fExport);
271 return fExport.c_str();
272}
273
274////////////////////////////////////////////////////////////////////////////////
275/// Calculate the number of workers existing on this node which are
276/// currently running.
277/// TODO: optimally, one could contact the packetizer and count the
278/// opened files.
279
281{
282 int myRunning = 0;
283 std::list<XrdProofdProofServ *>::iterator iter;
284 XrdSysMutexHelper mhp(fMutex);
285 for (iter = fProofServs.begin(); iter != fProofServs.end(); ++iter) {
286 if (*iter) {
287 if ((*iter)->Status() == kXPD_running)
288 myRunning++;
289 }
290 }
291 return myRunning;
292}
293
294////////////////////////////////////////////////////////////////////////////////
295/// Merge session objects from the other worker object in order to merge all
296/// the objects in only one.
297
299{
300 std::list<XrdProofdProofServ *>::const_iterator iter;
301 XrdSysMutexHelper mhp(fMutex);
302 for (iter = other.fProofServs.begin(); iter != other.fProofServs.end(); ++iter) {
303 this->fProofServs.push_back(*iter);
304 }
305}
306
307////////////////////////////////////////////////////////////////////////////////
308/// Sort ascendingly the list according to the comparing algorithm defined
309/// by 'f'; 'f' should return 'true' if 'rhs' > 'lhs'.
310/// This is implemented because on Solaris where std::list::sort() does not
311/// support an alternative comparison algorithm.
312
313void XrdProofWorker::Sort(std::list<XrdProofWorker *> *lst,
314 bool (*f)(XrdProofWorker *&lhs, XrdProofWorker *&rhs))
315{
316 // Check argument
317 if (!lst)
318 return;
319
320 // If empty or just one element, nothing to do
321 if (lst->size() < 2)
322 return;
323
324 // Fill a temp array with the current status
325 XrdProofWorker **ta = new XrdProofWorker *[lst->size() - 1];
326 std::list<XrdProofWorker *>::iterator i = lst->begin();
327 ++i; // skip master
328 int n = 0;
329 for (; i != lst->end(); ++i)
330 ta[n++] = *i;
331
332 // Now start the loops
333 XrdProofWorker *tmp = 0;
334 bool notyet = 1;
335 int jold = 0;
336 while (notyet) {
337 int j = jold;
338 while (j < n - 1) {
339 if (f(ta[j], ta[j+1]))
340 break;
341 j++;
342 }
343 if (j >= n - 1) {
344 notyet = 0;
345 } else {
346 jold = j + 1;
347 XPDSWAP(ta[j], ta[j+1], tmp);
348 int k = j;
349 while (k > 0) {
350 if (!f(ta[k], ta[k-1])) {
351 XPDSWAP(ta[k], ta[k-1], tmp);
352 } else {
353 break;
354 }
355 k--;
356 }
357 }
358 }
359
360 // Empty the original list
361 XrdProofWorker *mst = lst->front();
362 lst->clear();
363 lst->push_back(mst);
364
365 // Fill it again
366 while (n--)
367 lst->push_back(ta[n]);
368
369 // Clean up
370 delete[] ta;
371}
#define f(i)
Definition RSha256.hxx:104
#define TRACE(Flag, Args)
Definition TGHtml.h:121
@ kXPD_running
#define XPD_DEF_PORT
#define SafeDel(x)
#define SafeFree(x)
#define XPDSWAP(a, b, t)
#define XPDLOC(d, x)
int GetNActiveSessions()
Calculate the number of workers existing on this node which are currently running.
void Reset(const char *str)
Set content from a config file-like string.
XrdOucString fOrd
XrdOucString fImage
bool Matches(const char *host)
Check compatibility of host with this instance.
XrdSysRecMutex * fMutex
XrdOucString fId
std::list< XrdProofdProofServ * > fProofServs
XrdOucString fWorkDir
XrdOucString fUser
XrdOucString fHost
void MergeProofServs(const XrdProofWorker &other)
Merge session objects from the other worker object in order to merge all the objects in only one.
XrdProofWorker(const char *str=0)
Constructor from a config file-like string.
static void Sort(std::list< XrdProofWorker * > *lst, bool(*f)(XrdProofWorker *&lhs, XrdProofWorker *&rhs))
Sort ascendingly the list according to the comparing algorithm defined by 'f'; 'f' should return 'tru...
virtual ~XrdProofWorker()
Destructor.
XrdOucString fMsd
const char * Export(const char *ord=0)
Export current content in a form understood by parsing algorithms inside the PROOF session,...
XrdOucString fExport
const Int_t n
Definition legend1.C:16