29#  include "XrdNet/XrdNetAddr.hh" 
   31#include "Xrd/XrdBuffer.hh" 
   36#include "XrdOuc/XrdOucStream.hh" 
   37#include "XrdSys/XrdSysPlatform.hh" 
  107   std::list<XrdProofWorker *>::iterator 
w = 
fRegWorkers.begin();
 
 
  132   std::list<XrdProofWorker *>::iterator 
w = 
fWorkers.begin();
 
  138   XrdOucString mm(
"master ", 128);
 
  146      XPDERR(
"problems parsing file ");
 
  151   msg = (
rcf) ? 
"re-configuring" : 
"configuring";
 
  164               TRACE(
ALL, 
"PROOF config file will " <<
 
  169                  XPDERR(
"unable to find valid information in PROOF config file " <<
 
 
  216   if (
d->fName == 
"resource") {
 
  218   } 
else if (
d->fName == 
"adminreqto") {
 
  220   } 
else if (
d->fName == 
"worker") {
 
  224   TRACE(
XERR, 
"unknown directive: " << 
d->fName);
 
 
  234   list<XrdProofWorker *>::const_iterator iter, 
iter2;
 
  235   list<XrdProofWorker *>::iterator 
iter3; 
 
  250   for (iter = 
fNodes.begin(); iter != 
fNodes.end(); ++iter) {
 
  258      info[*iter].available = 0;
 
  260         if ((*iter)->Matches(*
iter2)) {
 
  261            info[*iter].available++;
 
  264      info[*iter].added = 0;
 
  266      if (
info[*iter].available > 1 && 
info[*iter].available < min)
 
  267         min = 
info[*iter].available;
 
  274   for (iter = 
fNodes.begin(); iter != 
fNodes.end(); ++iter) {
 
  275      if (
info[*iter].available > 1) {
 
  276         info[*iter].per_iteration = (
unsigned int)floor((
double)
info[*iter].available / (
double)min);
 
  278         info[*iter].per_iteration = 1;
 
  291      for (map<XrdProofWorker *, BalancerInfo>::iterator i = 
info.
begin(); i != 
info.
end(); ++i) {
 
  292         if (i->second.added < i->second.available) {
 
  295                                        (i->second.available - i->second.added));
 
  297            for (
unsigned int j = 0; 
j < 
to_add; 
j++) {
 
  300            i->second.added += 
to_add;
 
  323            if ((*iter2)->Matches(*
iter3)) {
 
  325               (*iter2)->MergeProofServs(*(*
iter3));
 
 
  374   if (!
strcmp(
"static", val)) {
 
  378      while ((val = cfg->GetWord()) && val[0]) {
 
  380         if (s.beginswith(
"ucfg:")) {
 
  382         } 
else if (s.beginswith(
"reload:")) {
 
  384         } 
else if (s.beginswith(
"dfltfallback:")) {
 
  385            fDfltFallback = (s.endswith(
"1") || s.endswith(
"yes")) ? 1 : 0;
 
  386         } 
else if (s.beginswith(
"wmx:")) {
 
  387         } 
else if (s.beginswith(
"selopt:")) {
 
 
  427   XrdOucString 
wrd(cfg->GetWord());
 
  428   if (
wrd.length() > 0) {
 
  431      char rest[2048] = {0};
 
  432      cfg->GetRest((
char *)&rest[0], 2048);
 
  435      if (
wrd == 
"master" || 
wrd == 
"node") {
 
  438         if (
pw->fHost.beginswith(
"localhost") ||
 
  448         int ir = 
line.find(
"repeat=");
 
  451            r.erase(
r.find(
' '));
 
  459            if (
mline.IsValid()) {
 
  460               TRACE(
DBG, 
"found multi-line with: " << 
mline.N() << 
" tokens");
 
  461               for (
int i = 0; i < 
mline.N(); i++) {
 
 
  493   std::list<XrdProofWorker *>::iterator 
iw = 
fNodes.begin();
 
  496      if ((
w = *
iw) && 
w->fType != 
'M') {
 
  498         bool us = (((
w->fHost.find(
"localhost") != 
STR_NPOS ||
 
  500                    (
w->fPort == -1 || 
w->fPort == 
fMgr->
Port())) ? 1 : 0;
 
  505            XrdOucString 
u = (
w->fUser.length() > 0) ? 
w->fUser : 
usr;
 
  509            if (
w->fPort != -1) {
 
  526                  TRACE(
XERR, 
"problems marshalling request");
 
  530                  TRACE(
XERR, 
"problems sending ctrl-c request to server " << 
u);
 
  537            TRACE(
DBG, 
"broadcast request for ourselves: ignore");
 
 
  557   unsigned int nok = 0;
 
  561   std::list<XrdProofWorker *>::iterator 
iw = 
fNodes.begin();
 
  565      if ((
w = *
iw) && 
w->fType != 
'M') {
 
  567         bool us = (((
w->fHost.find(
"localhost") != 
STR_NPOS ||
 
  569                    (
w->fPort == -1 || 
w->fPort == 
fMgr->
Port())) ? 1 : 0;
 
  574            XrdOucString 
u = (
w->fUser.length() > 0) ? 
w->fUser : 
usr;
 
  578            if (
w->fPort != -1) {
 
  588               TRACE(
XERR, 
"problems sending request to " << 
u);
 
  595            TRACE(
DBG, 
"broadcast request for ourselves: ignore");
 
 
  614   XrdOucString buf = 
" Manager connection from ";
 
 
  662            notifymsg += 
"change-of-ROOT version request to ";
 
  667            buf = (
msg) ? (
const void *)
msg : buf;
 
  677            buf = (
msg) ? (
const void *)
msg : buf;
 
  687            buf = (
msg) ? (
const void *)
msg : buf;
 
  723         XrdOucString 
cmsg = 
"failure attempting connection to ";
 
 
  743   if (host && 
strlen(host) > 0) {
 
  745      if (
uu.Port <= 0) 
uu.Port = 1093;
 
  748      char *
fqn = XrdSysDNS::getHostName(
uu.Host.c_str());
 
  751      aNA.Set(
uu.Host.c_str());
 
  752      char *
fqn = (
char *) 
aNA.Name();
 
 
  782   kXR_int64 ofs = 
ntohll(
p->Request()->readbuf.ofs);
 
  783   int len = 
ntohl(
p->Request()->readbuf.len);
 
  789   int dlen = 
p->Request()->header.dlen;
 
  793   if (dlen > 0 && 
p->Argp()->buff) {
 
  794      file = 
new char[dlen+1];
 
  795      memcpy(file, 
p->Argp()->buff, dlen);
 
  799      if (
ui.Host.length() > 0) {
 
  803            memcpy(file, 
ui.File.c_str(), 
ui.File.length());
 
  804            file[
ui.File.length()] = 0;
 
  812         pattern = 
new char[
len + 1];
 
  816            pattern[i++] = file[
j++];
 
  820         TRACEP(
p, 
DBG, 
"grep operation " << 
grep << 
", pattern:" << pattern);
 
  823      emsg = 
"file name not found";
 
  830             ", pattern: " << pattern);
 
  832      TRACEP(
p, 
REQ, 
"file: " << file << 
", ofs: " << ofs << 
", len: " << 
len);
 
  850      if (
u.User.length() <= 0)
 
  860               XPDFORM(
emsg, 
"nothing found by 'grep' in %s, pattern: %s", 
filen, pattern);
 
  867                    (
local) ? 
"local file " : 
"remote file ", file);
 
  875            emsg = 
"nothing found in ";
 
  884      response->Send(buf, 
lout);
 
 
  906   if (file.length() <= 0 || file.find(
'*') == 
STR_NPOS) 
return 0;
 
  910   int isl = file.rfind(
'/');
 
  912      fn.assign(file, 
isl + 1, -1);
 
  913      dn.assign(file, 0, 
isl);
 
  930      if (!
strncmp(ent->d_name, 
".", 1) || !
strncmp(ent->d_name, 
"..", 2))
 
  934      if (
sent.matches(
fn.c_str()) > 0) 
break;
 
  940   if (
sent.length() > 0) {
 
 
  961   TRACE(
REQ, 
"file: " << path << 
", ofs: " << ofs << 
", len: " << 
len);
 
  964   if (!path || 
strlen(path) <= 0) {
 
  970   XrdOucString 
spath(path);
 
  972      TRACE(
XERR, 
"path cannot be resolved! (" << path << 
")");
 
  975   const char *file = 
spath.c_str();
 
  980      emsg = 
"could not open ";
 
  989      emsg = 
"could not get size of file with stat: errno: ";
 
  999   kXR_int64 start = ofs;
 
 1003   kXR_int64 end = 
fst + 
len;
 
 1013      emsg = 
"could not allocate enough memory on the heap: errno: ";
 
 1028      while ((
nr = read(fd, buf + pos, left)) < 0 && 
errno == 
EINTR)
 
 1039   } 
while (
nr > 0 && left > 0);
 
 
 1060                                       const char *
pat, 
int &
len, 
int opt)
 
 1065   TRACE(
REQ, 
"file: " << path << 
", pat: " << 
pat << 
", len: " << 
len);
 
 1068   if (!path || 
strlen(path) <= 0) {
 
 1074   XrdOucString 
spath(path);
 
 1076      TRACE(
XERR, 
"path cannot be resolved! (" << path << 
")");
 
 1079   const char *file = 
spath.c_str();
 
 1083   if (stat(file, &
st) != 0) {
 
 1084      emsg = 
"could not get size of file with stat: errno: ";
 
 1096      cmd = 
new char[
lcmd];
 
 1099      } 
else if (opt == 2) {
 
 1101      } 
else if (opt == 3) {
 
 1108      cmd = 
new char[
lcmd];
 
 1116      emsg = 
"could not run '";
 
 1129   int bufsiz = 0, left = 0, lines = 0;
 
 1136      if (!buf || (
llen > left)) {
 
 1144         emsg = 
"could not allocate enough memory on the heap: errno: ";
 
 
 1181                                        kXR_int64 ofs, 
int &
len, 
int grep)
 
 1186         ", file: " << (file ? file : 
"undef") << 
", ofs: " << ofs <<
 
 1187         ", len: " << 
len << 
", grep: " << 
grep);
 
 1190   if (!file || 
strlen(file) <= 0) {
 
 1197      u.TakeUrl(XrdOucString(file));
 
 1205   if (conn && conn->
IsValid()) {
 
 1211      reqhdr.readbuf.ofs = ofs;
 
 1215      const void *
btmp = (
const void *) file;
 
 1222      if (
xrsp && buf && (
xrsp->DataLen() > 0)) {
 
 
 1251         ", msg: " << (
msg ? 
msg : 
"undef") << 
", isess: " << 
isess);
 
 1263   if (conn && conn->
IsValid()) {
 
 1273      const void *
btmp = (
const void *) 
msg;
 
 1280      if (
xrsp && buf && (
xrsp->DataLen() > 0)) {
 
 1282         buf = (
char *) 
realloc((
void *)buf, 
len + 1);
 
 
 1310   char *buf = 0, *
pbuf = buf;
 
 1313   std::list<XrdProofWorker *>::iterator 
iw = 
fNodes.begin();
 
 1318         bool us = (((
w->fHost.find(
"localhost") != 
STR_NPOS ||
 
 1320                    (
w->fPort == -1 || 
w->fPort == 
fMgr->
Port())) ? 1 : 0;
 
 1326            if (
w->fPort != -1) {
 
 1342            TRACE(
DBG, 
"request for ourselves: ignore");
 
 
 1359   XPDLOC(
NMGR, 
"NetMgr::CreateDefaultPROOFcfg")
 
 1371      XrdOucString mm(
"master ", 128);
 
 1378         mm = 
"worker localhost port=";
 
 1388   std::list<XrdProofWorker *>::iterator 
w = 
fDfltWorkers.begin();
 
 
 1420            TRACE(
XERR, 
"unable to read the configuration file");
 
 1421            return (std::list<XrdProofWorker *> *)0;
 
 
 1437   const char *
xpdloc = 
"NetMgr::Dump";
 
 1441   XPDPRT(
"+++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
 
 1442   XPDPRT(
"+ Active workers status");
 
 1446   std::list<XrdProofWorker *>::iterator 
iw;
 
 1448      XPDPRT(
"+ wrk: " << (*iw)->fHost << 
":" << (*iw)->fPort << 
" type:" << (*iw)->fType <<
 
 1449             " active sessions:" << (*iw)->Active());
 
 1452   XPDPRT(
"+++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
 
 
 1473            TRACE(
XERR, 
"unable to read the configuration file");
 
 1474            return (std::list<XrdProofWorker *> *)0;
 
 1478   TRACE(
DBG, 
"returning list with " << 
fNodes.size() << 
" entries");
 
 
 1514   TRACE(
DBG, 
"time of last modification: " << 
st.st_mtime);
 
 1528         TRACE(
XERR, 
"continuing with existing list of workers.");
 
 1542      XrdOucString mm(
"master ", 128);
 
 1547      std::list<XrdProofWorker *>::iterator 
w = 
fRegWorkers.begin();
 
 1561      while (
lin[
p++] == 
' ') {
 
 1565      if (
lin[
p] == 
'\0' || 
lin[
p] == 
'\n')
 
 1581      const char *
pfx[2] = { 
"master", 
"node" };
 
 1585         if (
pw->fHost.beginswith(
"localhost") ||
 
 1595         std::list<XrdProofWorker *>::iterator 
w = 
fRegWorkers.begin();
 
 1600            if (!((*w)->fActive)) {
 
 1601               if ((*w)->fHost == 
pw->fHost && (*w)->fPort == 
pw->fPort) {
 
 1622   std::list<XrdProofWorker *>::iterator 
w = 
fRegWorkers.begin();
 
 1624      if ((*w)->fActive) {
 
 1639   return ((
nw == 0) ? -1 : 0);
 
 
 1659      std::list<XrdProofWorker *>::iterator 
w = 
fWorkers.begin();
 
 1661      for (; 
w != 
fWorkers.end(); ++
w) 
if ((*w)->fActive) {
 
 1663            std::list<XrdProofWorker *>::iterator 
n;
 
 1665               if ((*n)->Matches(*
w)) {
 
 
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
#define TRACE(Flag, Args)
static unsigned int total
winID h TVirtualViewer3D TVirtualGLPainter p
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h Atom_t Int_t ULong_t ULong_t unsigned char prop_list Atom_t Atom_t Atom_t Time_t UChar_t len
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h Atom_t Int_t ULong_t ULong_t unsigned char prop_list Atom_t Atom_t Atom_t Time_t type
#define NAME_REQUESTTIMEOUT
#define EnvPutInt(name, val)
int DoDirectiveClass(XrdProofdDirective *, char *val, XrdOucStream *cfg, bool rcf)
Generic class directive processor.
int DoDirectiveInt(XrdProofdDirective *, char *val, XrdOucStream *cfg, bool rcf)
Process directive for an integer.
int MessageSender(const char *msg, int len, void *arg)
Send up a message from the server.
#define XPD_SETRESP(p, x)
#define TRACEP(p, act, x)
#define XrdSysMutexHelper
const_iterator begin() const
const_iterator end() const
XrdClientMessage * SendReq(XPClientRequest *req, const void *reqData, char **answData, const char *CmdName, bool notifyerr=1)
SendReq tries to send a single command for a number of times.
virtual void SetAsync(XrdClientAbsUnsolMsgHandler *uh, XrdProofConnSender_t=0, void *=0)
Set handler of unsolicited responses.
bool IsValid() const
Test validity of this connection.
void SetSID(kXR_char *sid)
Set our stream id, to match against that one in the server's response.
XReqErrorType LowWrite(XPClientRequest *, const void *, int)
Send request to server (NB: req is marshalled at this point, so we need also the plain reqDataLen)
const char * GetLastErr()
static void SetRetryParam(int maxtry=5, int timewait=2)
Change values of the retry control parameters, numer of retries and wait time between attempts (in se...
static int GetNumCPUs()
Find out and return the number of CPUs in the local machine.
static int CheckIf(XrdOucStream *s, const char *h)
Check existence and match condition of an 'if' directive If none (valid) is found,...
static char * Expand(char *p)
Expand path 'p' relative to: $HOME if begins with ~/ <user>'s $HOME if begins with ~<user>/ $PWD if d...
virtual int Config(bool rcf=0)
void Register(const char *dname, XrdProofdDirective *d)
XrdProofdNetMgr * NetMgr() const
const char * Host() const
const char * EffectiveUser() const
char * ReadBufferRemote(const char *url, const char *file, kXR_int64 ofs, int &len, int grep)
Send a read buffer request of length 'len' at offset 'ofs' for remote file defined by 'url'; the retu...
std::list< XrdProofWorker * > * GetNodes()
Return the list of unique nodes after having made sure that the info is up-to-date.
int DoDirectiveResource(char *, XrdOucStream *, bool)
Process 'resource' directive.
XrdProofConn * GetProofConn(const char *url)
Get a XrdProofConn for url; create a new one if not available.
std::list< XrdProofWorker * > fDfltWorkers
void BalanceNodesOrder()
Indices (this will be used twice).
std::list< XrdProofWorker * > fNodes
int Broadcast(int type, const char *msg, const char *usr=0, XrdProofdResponse *r=0, bool notify=0, int subtype=-1)
Broadcast request to known potential sub-nodes.
bool IsLocal(const char *host, bool checkport=0)
Check if 'host' is this local host.
XrdClientMessage * Send(const char *url, int type, const char *msg, int srvtype, XrdProofdResponse *r, bool notify=0, int subtype=-1)
Broadcast request to known potential sub-nodes.
int LocateLocalFile(XrdOucString &file)
Locate the exact file path allowing for wildcards '*' in the file name.
XrdProofdNetMgr(XrdProofdManager *mgr, XrdProtocol_Config *pi, XrdOucError *e)
Constructor.
char * ReadBufferLocal(const char *file, kXR_int64 ofs, int &len)
Read a buffer of length 'len' at offset 'ofs' of local file 'path'; the returned buffer must be freed...
std::list< XrdProofWorker * > * GetActiveWorkers()
Return the list of workers after having made sure that the info is up-to-date.
int BroadcastCtrlC(const char *usr)
Broadcast a ctrlc interrupt Return 0 on success, -1 on error.
void CreateDefaultPROOFcfg()
Fill-in fWorkers for a localhost based on the number of workers fNumLocalWrks.
char * ReadLogPaths(const char *url, const char *stag, int isess)
Get log paths from next tier; used in multi-master setups Returns 0 in case of error.
void RegisterDirectives()
Register config directives.
int FindUniqueNodes()
Scan fWorkers for unique nodes (stored in fNodes).
int ReadPROOFcfg(bool reset=1)
Read PROOF config file and load the information in fWorkers.
std::list< XrdProofWorker * > fWorkers
int DoDirectiveWorker(char *, XrdOucStream *, bool)
Process 'worker' directive.
std::list< XrdProofWorker * > fRegWorkers
int Config(bool rcf=0)
Run configuration and parse the entered config directives.
int DoDirective(XrdProofdDirective *d, char *val, XrdOucStream *cfg, bool rcf)
Update the priorities of the active sessions.
virtual ~XrdProofdNetMgr()
Destructor.
int ReadBuffer(XrdProofdProtocol *p)
Process a readbuf request.
int DoDirectiveAdminReqTO(char *, XrdOucStream *, bool)
Process 'adminreqto' directive.
int clientMarshall(XPClientRequest *str)
This function applies the network byte order on those parts of the 16-bytes buffer,...