50#include "XrdOuc/XrdOucString.hh" 
   51#include "XrdOuc/XrdOucStream.hh" 
   94      TRACE(
XERR, 
"undefined scheduler: cannot start");
 
  111            XPDERR(
"problems receiving message; errno: "<<-
rc);
 
  130         TRACE(
ALL, 
"running regular checks");
 
 
  148            lhs->GetNActiveSessions() < 
rhs->GetNActiveSessions()) ? 1 : 0);
 
 
  156   if (!
d || !(
d->fVal))
 
 
  208   if (
d->fName == 
"schedparam") {
 
  210   } 
else if (
d->fName == 
"resource") {
 
 
  244      XPDERR(
"problems parsing file ");
 
  254   XPDFORM(
msg, 
"maxsess: %d, maxrun: %d, maxwrks: %d, selopt: %d, fifo:%d",
 
  262                           (
void *)
this, 0, 
"Scheduler cron thread") != 0) {
 
  263         XPDERR(
"could not start cron thread");
 
 
  283      std::list<XrdProofdProofServ *>::iterator 
ii;
 
 
  305   TRACE(
ALL,
" ++++++++++++++++++++ DumpQueues ++++++++++++++++++++++++++++++++ ");
 
  306   if (prefix) 
TRACE(
ALL, 
" +++ Called from: "<<prefix);
 
  308   std::list<XrdProofdProofServ *>::iterator 
ii;
 
  311      TRACE(
ALL,
" +++ #"<<++i<<
" client:"<< (*ii)->Client()<<
" # of queries: "<< (*ii)->Queries()->size());
 
  313   TRACE(
ALL,
" ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ ");
 
 
  329   while (
xps && !(
xps->IsValid())) {
 
 
  348   std::list<XrdProofWorker *>::iterator iter;
 
  350      TRACE(
DBG, (*iter)->fImage<<
" : # act: "<<(*iter)->fProofServs.size());
 
  351      if ((*iter)->fType != 
'M' && (*iter)->fType != 
'S' 
  363      std::list<XrdProofdProofServ *>::iterator 
sesIter;
 
  366         if ((*sesIter)->Group()) {
 
  379   } 
else if (
nWrks >= (
int) 
wrks->size()) {
 
 
  398                              std::list<XrdProofWorker *> *
wrks,
 
  420      const char *
cqtag = (
xps->CurrentQuery()) ? 
xps->CurrentQuery()->GetTag() : 
"undef";
 
  425         TRACE(
REQ, 
"current assignment for session "<< 
xps->SrvPID() << 
" is valid");
 
  444         TRACE(
REQ, 
"session has already assigned workers: enqueue");
 
  450   std::list<XrdProofWorker *> *
acws = 0;
 
  491            TRACE(
REQ, 
"no workers currently available: session enqueued");
 
  504   std::list<XrdProofWorker *> *
acwseff = 0;
 
  509         acwseff = 
new std::list<XrdProofWorker *>;
 
  511         if ((*xWrk)->Active() < 
maxnum) {
 
  515               if ((*xWrk)->Active() < 
maxnum) {
 
  552         TRACE(
REQ, 
"no workers currently available: session enqueued");
 
  556         TRACE(
XERR, 
"no worker available: do nothing");
 
  579            const char *
randdev = 
"/dev/urandom";
 
  583               if (read(fd, &seed, 
sizeof(seed)) != 
sizeof(seed)) {
 
  594         std::vector<XrdProofWorker *> 
vwrk(
nwt);
 
  599         std::list<XrdProofWorker *>::iterator 
iwk = 
acws->
begin();
 
  603            int na = (*iwk)->Active();
 
  611         for (i = 1; i < 
nwt; i++) {
 
  626               for (i = 0; i < 
nwt; i++) {
 
  630                     for (
j = i; 
j < 
nwt; 
j++) {
 
  677      std::list<XrdProofWorker *>::iterator 
iw = 
acws->
begin();
 
  687   if (
wrks->size() <= 1) {
 
  688      TRACE(
XERR, 
"no worker found: do nothing");
 
 
  717         TRACE(
XERR, 
"got undefined session: protocol error!");
 
  723      if (
xps && 
xps->CurrentQuery()) {
 
  724         qtag = 
xps->CurrentQuery()->GetTag();
 
  727            qtag.replace(
":",
"");
 
  732         TRACE(
XERR, 
"failure from GetWorkers: protocol error!");
 
  745            if (
xps->Queries()->size() > 1)
 
 
  763   const char *
osel[] = { 
"all", 
"round-robin", 
"random", 
"load-based"};
 
  764   sbuf += 
"Selection: ";
 
  767      sbuf += 
", max workers: ";
 
  773   std::list<XrdProofWorker *>::iterator 
iw;
 
  775      sbuf += (*iw)->fType;
 
  777      if ((*iw)->fPort > -1) {
 
  781      sbuf += 
"  sessions: "; 
sbuf += (*iw)->Active();
 
 
  801   if (
d->fName == 
"schedparam") {
 
  803   } 
else if (
d->fName == 
"resource") {
 
 
  822   while (val && val[0]) {
 
  824      if (s.beginswith(
"wmx:")) {
 
  825         s.replace(
"wmx:",
"");
 
  827      } 
else if (s.beginswith(
"mxsess:")) {
 
  828         s.replace(
"mxsess:",
"");
 
  830      } 
else if (s.beginswith(
"mxrun:")) {
 
  831         s.replace(
"mxrun:",
"");
 
  833      } 
else if (s.beginswith(
"selopt:")) {
 
  834         if (s.endswith(
"random"))
 
  836         else if (s.endswith(
"load"))
 
  840      } 
else if (s.beginswith(
"fraction:")) {
 
  841         s.replace(
"fraction:",
"");
 
  843      } 
else if (s.beginswith(
"optnwrks:")) {
 
  844         s.replace(
"optnwrks:",
"");
 
  846      } 
else if (s.beginswith(
"minforquery:")) {
 
  847         s.replace(
"minforquery:",
"");
 
  849      } 
else if (s.beginswith(
"queue:")) {
 
  850         if (s.endswith(
"fifo")) {
 
  853      } 
else if (
strncmp(val, 
"default", 7)) {
 
  858      val = cfg->GetWord();
 
  872      TRACE(
ALL, 
"WARNING: in Load-Based mode the max number of sessions" 
  873                 " to be run is determined dynamically");
 
 
  892   while ((val = cfg->GetWord()) && val[0]) {
 
  894      if (s.beginswith(
"wmx:")) {
 
  895         s.replace(
"wmx:",
"");
 
  897      } 
else if (s.beginswith(
"mxsess:")) {
 
  898         s.replace(
"mxsess:",
"");
 
  900      } 
else if (s.beginswith(
"selopt:")) {
 
  901         if (s.endswith(
"random"))
 
 
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
#define TRACE(Flag, Args)
winID h TVirtualViewer3D TVirtualGLPainter p
const char *const XPD_GW_QueryEnqueued
const char *const XPD_GW_Static
int DoSchedDirective(XrdProofdDirective *d, char *val, XrdOucStream *cfg, bool rcf)
Generic directive processor.
static bool XpdWrkComp(XrdProofWorker *&lhs, XrdProofWorker *&rhs)
Compare two workers for sorting.
void * XrdProofSchedCron(void *p)
This is an endless loop to check the system periodically or when triggered via a message in a dedicat...
int DoDirectiveClass(XrdProofdDirective *, char *val, XrdOucStream *cfg, bool rcf)
Generic class directive processor.
const_iterator begin() const
const_iterator end() const
XrdProofGroup * GetGroup(const char *grp)
Returns the instance of for group 'grp.
virtual int DoDirectiveResource(char *, XrdOucStream *, bool)
Process 'resource' directive.
virtual int GetWorkers(XrdProofdProofServ *xps, std::list< XrdProofWorker * > *, const char *)
Get a list of workers that can be used by session 'xps'.
XrdProofSched(const char *name, XrdProofdManager *mgr, XrdProofGroupMgr *grpmgr, const char *cfn, XrdOucError *e=0)
Constructor.
virtual int ExportInfo(XrdOucString &)
Fill sbuf with some info about our current status.
virtual int ProcessDirective(XrdProofdDirective *d, char *val, XrdOucStream *cfg, bool rcf)
Update the priorities of the active sessions.
virtual int Reschedule()
Consider starting some query from the queue.
virtual void RegisterDirectives()
Register directives for configuration.
virtual int Enqueue(XrdProofdProofServ *xps, XrdProofQuery *query)
Queue a query in the session; if this is the first querym enqueue also the session.
virtual int DoDirective(XrdProofdDirective *d, char *val, XrdOucStream *cfg, bool rcf)
Update the priorities of the active sessions.
std::list< XrdProofdProofServ * > fQueue
virtual XrdProofdProofServ * FirstSession()
Get first valid session.
virtual void DumpQueues(const char *prefix=0)
Dump the content of the waiting sessions queue.
XrdProofGroupMgr * fGrpMgr
virtual int Config(bool rcf=0)
Configure this instance using the content of file 'cfn'.
virtual void ResetParameters()
Reset values for the configurable parameters.
virtual int GetNumWorkers(XrdProofdProofServ *xps)
Calculate the number of workers to be used given the state of the cluster.
virtual int DoDirectiveSchedParam(char *, XrdOucStream *, bool)
Process 'schedparam' directive.
static void Sort(std::list< XrdProofWorker * > *lst, bool(*f)(XrdProofWorker *&lhs, XrdProofWorker *&rhs))
Sort ascendingly the list according to the comparing algorithm defined by 'f'; 'f' should return 'tru...
virtual int Config(bool rcf=0)
void Register(const char *dname, XrdProofdDirective *d)
int GetWorkers(XrdOucString &workers, XrdProofdProofServ *, const char *)
Get a list of workers from the available resource broker.
XrdProofdNetMgr * NetMgr() const
XrdProofdProofServMgr * SessionMgr() const
std::list< XrdProofWorker * > * GetActiveWorkers()
Return the list of workers after having made sure that the info is up-to-date.
std::list< XrdProofdProofServ * > * ActiveSessions()