23 #include "XrdOuc/XrdOucStream.hh"    37    std::list<XrdProofdSessionEntry *> *fSortedList;
    39 } XpdCreateActiveList_t;
    53    XPDLOC(PMGR, 
"PriorityCron")
    57       TRACE(REQ, 
"undefined manager: cannot start");
    67          if ((rc = mgr->
Pipe()->
Recv(msg)) != 0) {
    68             XPDERR(
"problems receiving message; errno: "<<-rc);
    73             XrdOucString usr, grp;
    74             int opt = 0, pid = -1;
    76             rc = (rc == 0) ? msg.
Get(usr) : rc;
    77             rc = (rc == 0) ? msg.
Get(grp) : rc;
    78             rc = (rc == 0) ? msg.
Get(pid) : rc;
    80                XPDERR(
"kChangeStatus: problems parsing message : '"<<msg.
Buf()<<
"'; errno: "<<-rc);
    88                mgr->
AddSession(usr.c_str(), grp.c_str(), pid);
    90                XPDERR(
"kChangeStatus: invalid opt: "<< opt);
    96             rc = (rc == 0) ? msg.
Get(prio) : rc;
    98                XPDERR(
"kSetGroupPriority: problems parsing message; errno: "<<-rc);
   104             XPDERR(
"unknown message type: "<< msg.
Type());
   108             XPDERR(
"problem setting nice values ");
   124    XPDLOC(PMGR, 
"XrdProofdPriorityMgr")
   133       TRACE(XERR, 
"unable to generate pipe for the priority poller");
   146    XPDLOC(PMGR, 
"DumpPriorityChanges")
   152       XPDFORM(msg, 
"priority will be changed by %d for user(s): %s",
   169    XPDLOC(PMGR, 
"PriorityMgr::Config")
   173       XPDERR(
"problems parsing file ");
   178    msg = (rcf) ? 
"re-configuring" : 
"configuring";
   185       TRACE(ALL, 
"no priority changes requested");
   190       XPDFORM(msg, 
"worker sched based on '%s' priorities",
   199                               (
void *)
this, 0, 
"PriorityMgr poller thread") != 0) {
   200          XPDERR(
"could not start poller thread");
   203       TRACE(ALL, 
"poller thread started");
   223                                   char *val, XrdOucStream *cfg, 
bool rcf)
   225    XPDLOC(PMGR, 
"PriorityMgr::DoDirective")
   231    if (d->
fName == 
"priority") {
   233    } 
else if (d->
fName == 
"schedopt") {
   276    XPDLOC(PMGR, 
"CreateActiveList")
   278    XpdCreateActiveList_t *cal = (XpdCreateActiveList_t *)s;
   283       std::list<XrdProofdSessionEntry *> *sorted = cal->fSortedList;
   293                std::list<XrdProofdSessionEntry *>::iterator ssvi;
   294                for (ssvi = sorted->begin() ; ssvi != sorted->end(); ssvi++) {
   295                   if (ef >= (*ssvi)->fFracEff) {
   296                      sorted->insert(ssvi, e);
   302                   sorted->push_back(e);
   307                emsg = 
"no srv sessions for active client";
   310             emsg = 
"group not found: "; emsg += e->
fGroup.c_str();
   313          emsg = 
"group manager undefined";
   316       emsg = 
"input structure or entry undefined";
   320    if (cal) cal->error = 1;
   321    TRACE(XERR, (e ? e->
fUser : 
"---") << 
": protocol error: "<<emsg);
   336    XPDLOC(PMGR, 
"PriorityMgr::SetNiceValues")
   338    TRACE(REQ, 
"------------------- Start ----------------------");
   340    TRACE(REQ, 
"opt: "<<opt);
   344       TRACE(REQ, 
"------------------- End ------------------------");
   356       TRACE(REQ, 
"------------------- End ------------------------");
   366       TRACE(XERR, 
"failure from SetEffectiveFractions");
   371    TRACE(DBG,  
"creating a list of active sessions sorted by decreasing effective fraction ");
   372    std::list<XrdProofdSessionEntry *> sorted;
   373    XpdCreateActiveList_t cal = { 
fMgr->
GroupsMgr(), &sorted, 0 };
   380       std::list<XrdProofdSessionEntry *>::iterator ssvi;
   382          for (ssvi = sorted.begin() ; ssvi != sorted.end(); ssvi++)
   383             TRACE(HDBG, i++ <<
" eff: "<< (*ssvi)->fFracEff);
   386       TRACE(DBG,  
"calculating nice values");
   389       ssvi = sorted.begin();
   390       float xmax = (*ssvi)->fFracEff;
   394          (*ssvi)->SetPriority(nice);
   397          while (ssvi != sorted.end()) {
   401             TRACE(DBG,  
"    --> nice value for client "<< (*ssvi)->fUser<<
" is "<<nice);
   402             (*ssvi)->SetPriority(nice);
   406          TRACE(XERR, 
"negative or null max effective fraction: "<<xmax);
   410       TRACE(XERR, 
"failure from CreateActiveList");
   413    TRACE(REQ, 
"------------------- End ------------------------");
   429    int dp = strtol(val,0,10);
   432    if ((val = cfg->GetWord()) && !strncmp(val,
"if",2)) {
   433       if ((val = cfg->GetWord()) && val[0]) {
   447    XPDLOC(PMGR, 
"PriorityMgr::DoDirectiveSchedOpt")
   457    while (val && val[0]) {
   458       XrdOucString o = val;
   459       if (o.beginswith(
"min:")) {
   461          o.replace(
"min:",
"");
   463       } 
else if (o.beginswith(
"max:")) {
   465          o.replace(
"max:",
"");
   470          else if (o == 
"local")
   478       val = cfg->GetWord();
   492       TRACE(XERR, 
"inconsistent value for fPriorityMin (> fPriorityMax) ["<<
   506    XrdOucString key; key += pid;
   517    XrdOucString key; key += pid;
   536    XPDLOC(PMGR, 
"PriorityMgr::SetProcessPriority")
   546          if ((priority = getpriority(PRIO_PROCESS, pid)) == -1 && errno != 0) {
   547             TRACE(XERR, 
"getpriority: errno: " << errno);
   551          int newp = priority + dp;
   556             TRACE(XERR, 
"could not get privileges");
   559          TRACE(REQ, 
"got privileges ");
   561          if (setpriority(PRIO_PROCESS, pid, newp) != 0) {
   562             TRACE(XERR, 
"setpriority: errno: " << errno);
   563             return ((errno != 0) ? -errno : -1);
   565          if ((getpriority(PRIO_PROCESS, pid)) != newp && errno != 0) {
   566             TRACE(XERR, 
"did not succeed: errno: " << errno);
   583                      : fUser(u), fGroup(g), fPid(pid), fFracEff(0.)
   585    XPDLOC(PMGR, 
"XrdProofdSessionEntry")
   590    int prio = getpriority(PRIO_PROCESS, pid);
   592       TRACE(XERR, 
" getpriority: errno: " << errno);
   611    XPDLOC(PMGR, 
"SessionEntry::SetPriority")
   622          TRACE(XERR, 
"could not get privileges");
   626       if (setpriority(PRIO_PROCESS, 
fPid, priority) != 0) {
   627          TRACE(XERR, 
"setpriority: errno: " << errno);
 
int Poll(int to=-1)
Poll over the read pipe for to secs; return whatever poll returns. 
int RemoveSession(int pid)
Remove from the active list the session with ID pid. 
static int GetUserInfo(const char *usr, XrdProofUI &ui)
Get information about user 'usr' in a thread safe way. 
void SetSchedOpt(int opt)
int SetNiceValues(int opt=0)
Recalculate nice values taking into account all active users and their priorities. 
XrdProofdPriorityMgr(XrdProofdManager *mgr, XrdProtocol_Config *pi, XrdSysError *e)
Constructor. 
int DoDirectiveClass(XrdProofdDirective *, char *val, XrdOucStream *cfg, bool rcf)
Generic class directive processor. 
#define TRACE(Flag, Args)
XrdProofGroupMgr * GroupsMgr() const 
void SetGroupPriority(const char *grp, int priority)
Change group priority. Used when a master pushes a priority to a worker. 
int SetEffectiveFractions(bool optprio)
Go through the list of active groups (those having at least a non-idle member) and determine the effe...
int DoDirectiveSchedOpt(char *, XrdOucStream *, bool)
Process 'schedopt' directive. 
XrdOucHash< XrdProofdPriority > fPriorities
void SetPriority(float p)
XrdProofGroup * GetGroup(const char *grp)
Returns the instance of for group 'grp. 
int Recv(XpdMsg &msg)
Recv message from the pipe. 
int SetProcessPriority(int pid, const char *usr, int &dp)
Change priority of process pid belonging to user, if needed. 
int DoDirectivePriority(char *, XrdOucStream *, bool)
Process 'priority' directive. 
int DoDirective(XrdProofdDirective *d, char *val, XrdOucStream *cfg, bool rcf)
Update the priorities of the active sessions. 
#define XrdSysMutexHelper
int AddSession(const char *u, const char *g, int pid)
Add to the active list a session with ID pid. 
static int ResetEntryPriority(const char *, XrdProofdSessionEntry *e, void *)
Reset the priority on entries. 
#define XpdBadPGuard(g, u)
int Active(const char *usr=0)
Return the number of active groups (usr = 0) or the number of active sessions for user 'usr'...
static int DumpPriorityChanges(const char *, XrdProofdPriority *p, void *s)
Reset the priority on entries. 
XrdProofdSessionEntry(const char *u, const char *g, int pid)
Constructor. 
int SetPriority(int priority=XPPM_NOPRIORITY)
Change process priority. 
static int CheckIf(XrdOucStream *s, const char *h)
Check existence and match condition of an 'if' directive If none (valid) is found, return -1. 
int Config(bool rcf=0)
Run configuration and parse the entered config directives. 
const char * Host() const 
you should not use this method at all Int_t Int_t Double_t Double_t Double_t e
static int CreateActiveList(const char *, XrdProofdSessionEntry *e, void *s)
Run thorugh entries to create the sorted list of active entries. 
XrdOucHash< XrdProofdSessionEntry > fSessions
int Get(int &i)
Get next token and interpret it as an int. 
virtual ~XrdProofdSessionEntry()
Destructor. 
virtual int Config(bool rcf=0)
void RegisterDirectives()
Register directives for configuration. 
void Register(const char *dname, XrdProofdDirective *d)
void * XrdProofdPriorityCron(void *p)
This is an endless loop to periodically check the system.