Logo ROOT   6.08/07
Reference Guide
TXProofServ.cxx
Go to the documentation of this file.
1 // @(#)root/proofx:$Id$
2 // Author: Gerardo Ganis 12/12/2005
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2005, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 /** \class TXProofServ
13 \ingroup proofx
14 
15 This class implements the XProofD version of TProofServ, with respect to which it differs only
16 for the underlying connection technology.
17 
18 */
19 
20 #include "RConfigure.h"
21 #include "RConfig.h"
22 #include "Riostream.h"
23 
24 #ifdef WIN32
25  #include <io.h>
26  typedef long off_t;
27 #endif
28 #include <sys/types.h>
29 #include <netinet/in.h>
30 #include <utime.h>
31 
32 #include "TXProofServ.h"
33 #include "TObjString.h"
34 #include "TEnv.h"
35 #include "TError.h"
36 #include "TException.h"
37 #include "THashList.h"
38 #include "TInterpreter.h"
39 #include "TParameter.h"
40 #include "TProofDebug.h"
41 #include "TProof.h"
42 #include "TProofPlayer.h"
43 #include "TQueryResultManager.h"
44 #include "TRegexp.h"
45 #include "TClass.h"
46 #include "TROOT.h"
47 #include "TSystem.h"
48 #include "TPluginManager.h"
49 #include "TXSocketHandler.h"
50 #include "TXUnixSocket.h"
51 #include "compiledata.h"
52 #include "TProofNodeInfo.h"
53 #include "XProofProtocol.h"
54 
57 
58 
59 // debug hook
60 static volatile Int_t gProofServDebug = 1;
61 
62 //----- SigPipe signal handler -------------------------------------------------
63 ////////////////////////////////////////////////////////////////////////////////
64 
65 class TXProofServSigPipeHandler : public TSignalHandler {
66  TXProofServ *fServ;
67 public:
68  TXProofServSigPipeHandler(TXProofServ *s) : TSignalHandler(kSigInterrupt, kFALSE)
69  { fServ = s; }
70  Bool_t Notify();
71 };
72 
73 ////////////////////////////////////////////////////////////////////////////////
74 
75 Bool_t TXProofServSigPipeHandler::Notify()
76 {
77  fServ->HandleSigPipe();
78  return kTRUE;
79 }
80 
81 //----- Termination signal handler ---------------------------------------------
82 ////////////////////////////////////////////////////////////////////////////////
83 
84 class TXProofServTerminationHandler : public TSignalHandler {
85  TXProofServ *fServ;
86 public:
87  TXProofServTerminationHandler(TXProofServ *s)
88  : TSignalHandler(kSigTermination, kFALSE) { fServ = s; }
89  Bool_t Notify();
90 };
91 
92 ////////////////////////////////////////////////////////////////////////////////
93 
94 Bool_t TXProofServTerminationHandler::Notify()
95 {
96  Printf("Received SIGTERM: terminating");
97 
98  fServ->HandleTermination();
99  return kTRUE;
100 }
101 
102 //----- Seg violation signal handler ---------------------------------------------
103 ////////////////////////////////////////////////////////////////////////////////
104 
105 class TXProofServSegViolationHandler : public TSignalHandler {
106  TXProofServ *fServ;
107 public:
108  TXProofServSegViolationHandler(TXProofServ *s)
110  Bool_t Notify();
111 };
112 
113 ////////////////////////////////////////////////////////////////////////////////
114 
115 Bool_t TXProofServSegViolationHandler::Notify()
116 {
117  Printf("**** ");
118  Printf("**** Segmentation violation: terminating ****");
119  Printf("**** ");
120  fServ->HandleTermination();
121  return kTRUE;
122 }
123 
124 //----- Input handler for messages from parent or master -----------------------
125 ////////////////////////////////////////////////////////////////////////////////
126 
127 class TXProofServInputHandler : public TFileHandler {
128  TXProofServ *fServ;
129 public:
130  TXProofServInputHandler(TXProofServ *s, Int_t fd) : TFileHandler(fd, 1)
131  { fServ = s; }
132  Bool_t Notify();
133  Bool_t ReadNotify() { return Notify(); }
134 };
135 
136 ////////////////////////////////////////////////////////////////////////////////
137 
138 Bool_t TXProofServInputHandler::Notify()
139 {
140  fServ->HandleSocketInput();
141  // This request has been completed: remove the client ID from the pipe
142  ((TXUnixSocket *) fServ->GetSocket())->RemoveClientID();
143  return kTRUE;
144 }
145 
147 
148 // Hook to the constructor. This is needed to avoid using the plugin manager
149 // which may create problems in multi-threaded environments.
150 extern "C" {
151  TApplication *GetTXProofServ(Int_t *argc, char **argv, FILE *flog)
152  { return new TXProofServ(argc, argv, flog); }
153 }
154 
155 ////////////////////////////////////////////////////////////////////////////////
156 /// Main constructor
157 
158 TXProofServ::TXProofServ(Int_t *argc, char **argv, FILE *flog)
159  : TProofServ(argc, argv, flog)
160 {
161  fInterruptHandler = 0;
162  fInputHandler = 0;
164 
165  // TODO:
166  // Int_t useFIFO = 0;
167 /* if (GetParameter(fProof->GetInputList(), "PROOF_UseFIFO", useFIFO) != 0) {
168  if (useFIFO == 1)
169  Info("", "enablig use of FIFO (if allowed by the server)");
170  else
171  Warning("", "unsupported strategy index (%d): ignore", strategy);
172  }
173 */
174 }
175 
176 ////////////////////////////////////////////////////////////////////////////////
177 /// Finalize the server setup. If master, create the TProof instance to talk
178 /// the worker or submaster nodes.
179 /// Return 0 on success, -1 on error
180 
182 {
183  Bool_t xtest = (Argc() > 3 && !strcmp(Argv(3), "test")) ? kTRUE : kFALSE;
184 
185  if (gProofDebugLevel > 0)
186  Info("CreateServer", "starting%s server creation", (xtest ? " test" : ""));
187 
188  // Get file descriptor for log file
189  if (fLogFile) {
190  // Use the file already open by pmain
191  if ((fLogFileDes = fileno(fLogFile)) < 0) {
192  Error("CreateServer", "resolving the log file description number");
193  return -1;
194  }
195  // Hide the session start-up logs unless we are in verbose mode
196  if (gProofDebugLevel <= 0)
197  lseek(fLogFileDes, (off_t) 0, SEEK_END);
198  }
199 
200  // Global location string in TXSocket
201  TXSocket::SetLocation((IsMaster()) ? "master" : "slave");
202 
203  // Set debug level in XrdClient
204  EnvPutInt(NAME_DEBUG, gEnv->GetValue("XNet.Debug", 0));
205 
206  // Get socket to be used to call back our xpd
207  if (xtest) {
208  // test session, just send the protocol version on the open pipe
209  // and exit
210  if (!(fSockPath = gSystem->Getenv("ROOTOPENSOCK"))) {
211  Error("CreateServer", "test: socket setup by xpd undefined");
212  return -1;
213  }
214  Int_t fpw = (Int_t) strtol(fSockPath.Data(), 0, 10);
215  int proto = htonl(kPROOF_Protocol);
216  fSockPath = "";
217  if (write(fpw, &proto, sizeof(proto)) != sizeof(proto)) {
218  Error("CreateServer", "test: sending protocol number");
219  return -1;
220  }
221  exit(0);
222  } else {
223  fSockPath = gEnv->GetValue("ProofServ.OpenSock", "");
224  if (fSockPath.Length() <= 0) {
225  Error("CreateServer", "socket setup by xpd undefined");
226  return -1;
227  }
228  TString entity = gEnv->GetValue("ProofServ.Entity", "");
229  if (entity.Length() > 0)
230  fSockPath.Insert(0,Form("%s/", entity.Data()));
231  }
232 
233  // Get open socket descriptor, if any
234  Int_t sockfd = -1;
235  const char *opensock = gSystem->Getenv("ROOTOPENSOCK");
236  if (opensock && strlen(opensock) > 0) {
238  sockfd = (Int_t) strtol(opensock, 0, 10);
239  if (TSystem::GetErrno() == ERANGE) {
240  sockfd = -1;
241  Warning("CreateServer", "socket descriptor: wrong conversion from '%s'", opensock);
242  }
243  if (sockfd > 0 && gProofDebugLevel > 0)
244  Info("CreateServer", "using open connection (descriptor %d)", sockfd);
245  }
246 
247  // Get the sessions ID
248  Int_t psid = gEnv->GetValue("ProofServ.SessionID", -1);
249  if (psid < 0) {
250  Error("CreateServer", "Session ID undefined");
251  return -1;
252  }
253 
254  // Call back the server
255  fSocket = new TXUnixSocket(fSockPath, psid, -1, this, sockfd);
256  if (!fSocket || !(fSocket->IsValid())) {
257  Error("CreateServer", "Failed to open connection to XrdProofd coordinator");
258  return -1;
259  }
260  // Set compression level, if any
262 
263  // Set the title for debugging
264  TString tgt("client");
265  if (fOrdinal != "0") {
266  tgt = fOrdinal;
267  if (tgt.Last('.') != kNPOS) tgt.Remove(tgt.Last('.'));
268  }
269  fSocket->SetTitle(tgt);
270 
271  // Set the this as reference of this socket
272  ((TXSocket *)fSocket)->fReference = this;
273 
274  // Get socket descriptor
275  Int_t sock = fSocket->GetDescriptor();
276 
277  // Install message input handlers
278  fInputHandler =
279  TXSocketHandler::GetSocketHandler(new TXProofServInputHandler(this, sock), fSocket);
281 
282  // Get the client ID
283  Int_t cid = gEnv->GetValue("ProofServ.ClientID", -1);
284  if (cid < 0) {
285  Error("CreateServer", "Client ID undefined");
286  SendLogFile();
287  return -1;
288  }
289  ((TXSocket *)fSocket)->SetClientID(cid);
290 
291  // debug hooks
292  if (IsMaster()) {
293  // wait (loop) in master to allow debugger to connect
294  if (gEnv->GetValue("Proof.GdbHook",0) == 1) {
295  while (gProofServDebug)
296  ;
297  }
298  } else {
299  // wait (loop) in slave to allow debugger to connect
300  if (gEnv->GetValue("Proof.GdbHook",0) == 2) {
301  while (gProofServDebug)
302  ;
303  }
304  }
305 
306  if (gProofDebugLevel > 0)
307  Info("CreateServer", "Service: %s, ConfDir: %s, IsMaster: %d",
309 
310  if (Setup() == -1) {
311  // Setup failure
312  LogToMaster();
313  SendLogFile();
314  Terminate(0);
315  return -1;
316  }
317 
318  if (!fLogFile) {
319  RedirectOutput();
320  // If for some reason we failed setting a redirection file for the logs
321  // we cannot continue
322  if (!fLogFile || (fLogFileDes = fileno(fLogFile)) < 0) {
323  LogToMaster();
324  SendLogFile(-98);
325  Terminate(0);
326  return -1;
327  }
328  }
329 
330  // Send message of the day to the client
331  if (IsMaster()) {
332  if (CatMotd() == -1) {
333  LogToMaster();
334  SendLogFile(-99);
335  Terminate(0);
336  return -1;
337  }
338  }
339 
340  // Everybody expects iostream to be available, so load it...
341  ProcessLine("#include <iostream>", kTRUE);
342  ProcessLine("#include <string>",kTRUE); // for std::string iostream.
343 
344  // Load user functions
345  const char *logon;
346  logon = gEnv->GetValue("Proof.Load", (char *)0);
347  if (logon) {
348  char *mac = gSystem->Which(TROOT::GetMacroPath(), logon, kReadPermission);
349  if (mac)
350  ProcessLine(Form(".L %s", logon), kTRUE);
351  delete [] mac;
352  }
353 
354  // Execute logon macro
355  logon = gEnv->GetValue("Proof.Logon", (char *)0);
356  if (logon && !NoLogOpt()) {
357  char *mac = gSystem->Which(TROOT::GetMacroPath(), logon, kReadPermission);
358  if (mac)
359  ProcessFile(logon);
360  delete [] mac;
361  }
362 
363  // Save current interpreter context
364  gInterpreter->SaveContext();
365  gInterpreter->SaveGlobalsContext();
366 
367  // if master, start slave servers
368  if (IsMaster()) {
369  TString master;
370 
371  if (fConfFile.BeginsWith("lite:")) {
372  master = "lite://";
373  } else {
374  master.Form("proof://%s@__master__", fUser.Data());
375 
376  // Add port, if defined
377  Int_t port = gEnv->GetValue("ProofServ.XpdPort", -1);
378  if (port > -1) {
379  master += ":";
380  master += port;
381  }
382  }
383 
384  // Make sure that parallel startup via threads is not active
385  // (it is broken for xpd because of the locks on gInterpreterMutex)
386  gEnv->SetValue("Proof.ParallelStartup", 0);
387 
388  // Get plugin manager to load appropriate TProof from
389  TPluginManager *pm = gROOT->GetPluginManager();
390  if (!pm) {
391  Error("CreateServer", "no plugin manager found");
392  SendLogFile(-99);
393  Terminate(0);
394  return -1;
395  }
396 
397  // Find the appropriate handler
398  TPluginHandler *h = pm->FindHandler("TProof", fConfFile);
399  if (!h) {
400  Error("CreateServer", "no plugin found for TProof with a"
401  " config file of '%s'", fConfFile.Data());
402  SendLogFile(-99);
403  Terminate(0);
404  return -1;
405  }
406 
407  // load the plugin
408  if (h->LoadPlugin() == -1) {
409  Error("CreateServer", "plugin for TProof could not be loaded");
410  SendLogFile(-99);
411  Terminate(0);
412  return -1;
413  }
414 
415  // Make instance of TProof
416  if (fConfFile.BeginsWith("lite:")) {
417  // Remove input and signal handlers to avoid spurious "signals"
418  // during startup
420  fProof = reinterpret_cast<TProof*>(h->ExecPlugin(6, master.Data(),
421  0, 0,
422  fLogLevel,
423  fSessionDir.Data(), 0));
424  // Re-enable input and signal handlers
426  } else {
427  fProof = reinterpret_cast<TProof*>(h->ExecPlugin(5, master.Data(),
428  fConfFile.Data(),
429  fConfDir.Data(),
430  fLogLevel,
431  fTopSessionTag.Data()));
432  }
433 
434  // Save worker info
435  if (fProof) fProof->SaveWorkerInfo();
436 
437  if (!fProof || (fProof && !fProof->IsValid())) {
438  Error("CreateServer", "plugin for TProof could not be executed");
439  FlushLogFile();
440  delete fProof;
441  fProof = 0;
442  SendLogFile(-99);
443  Terminate(0);
444  return -1;
445  }
446  // Find out if we are a master in direct contact only with workers
448 
449  SendLogFile();
450  }
451 
452  // Setup the shutdown timer
453  if (!fShutdownTimer) {
454  // Check activity on socket every 5 mins
455  fShutdownTimer = new TShutdownTimer(this, 300000);
457  }
458 
459  // Check if schema evolution is effective: clients running versions <=17 do not
460  // support that: send a warning message
461  if (fProtocol <= 17) {
462  TString msg;
463  msg.Form("Warning: client version is too old: automatic schema evolution is ineffective.\n"
464  " This may generate compatibility problems between streamed objects.\n"
465  " The advise is to move to ROOT >= 5.21/02 .");
466  SendAsynMessage(msg.Data());
467  }
468 
469  // Setup the idle timer
470  if (IsMaster() && !fIdleTOTimer) {
471  // Check activity on socket every 5 mins
472  Int_t idle_to = gEnv->GetValue("ProofServ.IdleTimeout", -1);
473  if (idle_to > 0) {
474  fIdleTOTimer = new TIdleTOTimer(this, idle_to * 1000);
475  fIdleTOTimer->Start(-1, kTRUE);
476  if (gProofDebugLevel > 0)
477  Info("CreateServer", " idle timer started (%d secs)", idle_to);
478  } else if (gProofDebugLevel > 0) {
479  Info("CreateServer", " idle timer not started (no idle timeout requested)");
480  }
481  }
482 
483  // Done
484  return 0;
485 }
486 
487 ////////////////////////////////////////////////////////////////////////////////
488 /// Cleanup. Not really necessary since after this dtor there is no
489 /// live anyway.
490 
492 {
493  delete fSocket;
494 }
495 
496 ////////////////////////////////////////////////////////////////////////////////
497 /// Handle high priority data sent by the master or client.
498 
500 {
501  // Real-time notification of messages
503 
504  // Get interrupt
505  Bool_t fw = kFALSE;
506  Int_t iLev = ((TXSocket *)fSocket)->GetInterrupt(fw);
507  if (iLev < 0) {
508  Error("HandleUrgentData", "error receiving interrupt");
509  return;
510  }
511 
512  PDB(kGlobal, 2)
513  Info("HandleUrgentData", "got interrupt: %d\n", iLev);
514 
515  if (fProof)
516  fProof->SetActive();
517 
518  switch (iLev) {
519 
520  case TProof::kPing:
521  PDB(kGlobal, 2)
522  Info("HandleUrgentData", "*** Ping");
523 
524  // If master server, propagate interrupt to slaves
525  if (fw && IsMaster()) {
526  Int_t nbad = fProof->fActiveSlaves->GetSize() - fProof->Ping();
527  if (nbad > 0) {
528  Info("HandleUrgentData","%d slaves did not reply to ping",nbad);
529  }
530  }
531 
532  // Touch the admin path to show we are alive
533  if (fAdminPath.IsNull()) {
534  fAdminPath = gEnv->GetValue("ProofServ.AdminPath", "");
535  }
536 
537  if (!fAdminPath.IsNull()) {
538  if (!fAdminPath.EndsWith(".status")) {
539  // Update file time stamps
540  if (utime(fAdminPath.Data(), 0) != 0)
541  Info("HandleUrgentData", "problems touching path: %s", fAdminPath.Data());
542  else
543  PDB(kGlobal, 2)
544  Info("HandleUrgentData", "touching path: %s", fAdminPath.Data());
545  } else {
546  // Update the status in the file
547  // 0 idle
548  // 1 running
549  // 2 being terminated (currently unused)
550  // 3 queued
551  // 4 idle timed-out
552  Int_t uss_rc = UpdateSessionStatus(-1);
553  if (uss_rc != 0)
554  Error("HandleUrgentData", "problems updating status path: %s (errno: %d)", fAdminPath.Data(), -uss_rc);
555  }
556  } else {
557  Info("HandleUrgentData", "admin path undefined");
558  }
559 
560  break;
561 
563  Info("HandleUrgentData", "*** Hard Interrupt");
564 
565  // If master server, propagate interrupt to slaves
566  if (fw && IsMaster())
568 
569  // Flush input socket
570  ((TXSocket *)fSocket)->Flush();
571 
572  if (IsMaster())
573  SendLogFile();
574 
575  break;
576 
578  Info("HandleUrgentData", "Soft Interrupt");
579 
580  // If master server, propagate interrupt to slaves
581  if (fw && IsMaster())
583 
584  Interrupt();
585 
586  if (IsMaster())
587  SendLogFile();
588 
589  break;
590 
591 
593  Info("HandleUrgentData", "Shutdown Interrupt");
594 
595  // When returning for here connection are closed
597 
598  break;
599 
600  default:
601  Error("HandleUrgentData", "unexpected type: %d", iLev);
602  break;
603  }
604 
605 
606  if (fProof) fProof->SetActive(kFALSE);
607 }
608 
609 ////////////////////////////////////////////////////////////////////////////////
610 /// Called when the client is not alive anymore; terminate the session.
611 
613 {
614  // Real-time notification of messages
615 
616  Info("HandleSigPipe","got sigpipe ... do nothing");
617 }
618 
619 ////////////////////////////////////////////////////////////////////////////////
620 /// Called when the client is not alive anymore; terminate the session.
621 
623 {
624  // If master server, propagate interrupt to slaves
625  // (shutdown interrupt send internally).
626  if (IsMaster()) {
627 
628  // If not idle, try first to stop processing
629  if (!fIdle) {
630  // Remove pending requests
632  // Interrupt the current monitor
634  // Do not wait for ever, but al least 20 seconds
635  Long_t timeout = gEnv->GetValue("Proof.ShutdownTimeout", 60);
636  timeout = (timeout > 20) ? timeout : 20;
637  // Processing will be aborted
638  fProof->StopProcess(kTRUE, (Long_t) (timeout / 2));
639  // Receive end-of-processing messages, but do not wait for ever
640  fProof->Collect(TProof::kActive, timeout);
641  // Still not idle
642  if (!fIdle)
643  Warning("HandleTermination","processing could not be stopped");
644  }
645  // Close the session
646  if (fProof)
647  fProof->Close("S");
648  }
649 
650  Terminate(0); // will not return from here....
651 }
652 
653 ////////////////////////////////////////////////////////////////////////////////
654 /// Print the ProofServ logo on standard output.
655 /// Return 0 on success, -1 on error
656 
658 {
659  char str[512];
660 
661  if (IsMaster()) {
662  snprintf(str, 512, "**** Welcome to the PROOF server @ %s ****", gSystem->HostName());
663  } else {
664  snprintf(str, 512, "**** PROOF worker server @ %s started ****", gSystem->HostName());
665  }
666 
667  if (fSocket->Send(str) != 1+static_cast<Int_t>(strlen(str))) {
668  Error("Setup", "failed to send proof server startup message");
669  return -1;
670  }
671 
672  // Get client protocol
673  if ((fProtocol = gEnv->GetValue("ProofServ.ClientVersion", -1)) < 0) {
674  Error("Setup", "remote proof protocol missing");
675  return -1;
676  }
677 
678  // The local user
679  fUser = gEnv->GetValue("ProofServ.Entity", "");
680  if (fUser.Length() >= 0) {
681  if (fUser.Contains(":"))
682  fUser.Remove(fUser.Index(":"));
683  if (fUser.Contains("@"))
684  fUser.Remove(fUser.Index("@"));
685  } else {
687  if (pw) {
688  fUser = pw->fUser;
689  delete pw;
690  }
691  }
692 
693  // Work dir and ...
694  if (IsMaster()) {
695  TString cf = gEnv->GetValue("ProofServ.ProofConfFile", "");
696  if (cf.Length() > 0)
697  fConfFile = cf;
698  }
699  fWorkDir = gEnv->GetValue("ProofServ.Sandbox", Form("~/%s", kPROOF_WorkDir));
700 
701  // Get Session tag
702  if ((fSessionTag = gEnv->GetValue("ProofServ.SessionTag", "-1")) == "-1") {
703  Error("Setup", "Session tag missing");
704  return -1;
705  }
706  // Get top session tag, i.e. the tag of the PROOF session
707  if ((fTopSessionTag = gEnv->GetValue("ProofServ.TopSessionTag", "-1")) == "-1") {
708  fTopSessionTag = "";
709  // Try to extract it from log file path (for backward compatibility)
710  if (gSystem->Getenv("ROOTPROOFLOGFILE")) {
711  fTopSessionTag = gSystem->DirName(gSystem->Getenv("ROOTPROOFLOGFILE"));
712  Ssiz_t lstl;
713  if ((lstl = fTopSessionTag.Last('/')) != kNPOS) fTopSessionTag.Remove(0, lstl + 1);
714  if (fTopSessionTag.BeginsWith("session-")) {
715  fTopSessionTag.Remove(0, strlen("session-"));
716  } else {
717  fTopSessionTag = "";
718  }
719  }
720  if (fTopSessionTag.IsNull()) {
721  Error("Setup", "top session tag missing");
722  return -1;
723  }
724  }
725 
726  // Make sure the process ID is in the tag
727  TString spid = Form("-%d", gSystem->GetPid());
728  if (!fSessionTag.EndsWith(spid)) {
729  Int_t nd = 0;
730  if ((nd = fSessionTag.CountChar('-')) >= 2) {
731  Int_t id = fSessionTag.Index("-", fSessionTag.Index("-") + 1);
732  if (id != kNPOS) fSessionTag.Remove(id);
733  } else if (nd != 1) {
734  Warning("Setup", "Wrong number of '-' in session tag: protocol error? %s", fSessionTag.Data());
735  }
736  // Add this process ID
737  fSessionTag += spid;
738  }
739  if (gProofDebugLevel > 0)
740  Info("Setup", "session tags: %s, %s", fTopSessionTag.Data(), fSessionTag.Data());
741 
742  // Get Session dir (sandbox)
743  if ((fSessionDir = gEnv->GetValue("ProofServ.SessionDir", "-1")) == "-1") {
744  Error("Setup", "Session dir missing");
745  return -1;
746  }
747 
748  // Goto to the main PROOF working directory
749  char *workdir = gSystem->ExpandPathName(fWorkDir.Data());
750  fWorkDir = workdir;
751  delete [] workdir;
752  if (gProofDebugLevel > 0)
753  Info("Setup", "working directory set to %s", fWorkDir.Data());
754 
755  // Common setup
756  if (SetupCommon() != 0) {
757  Error("Setup", "common setup failed");
758  return -1;
759  }
760 
761  // Send packages off immediately to reduce latency
763 
764  // Check every two hours if client is still alive
766 
767  // Install SigPipe handler to handle kKeepAlive failure
768  gSystem->AddSignalHandler(new TXProofServSigPipeHandler(this));
769 
770  // Install Termination handler
771  gSystem->AddSignalHandler(new TXProofServTerminationHandler(this));
772 
773  // Install seg violation handler
774  gSystem->AddSignalHandler(new TXProofServSegViolationHandler(this));
775 
776  if (gProofDebugLevel > 0)
777  Info("Setup", "successfully completed");
778 
779  // Done
780  return 0;
781 }
782 
783 ////////////////////////////////////////////////////////////////////////////////
784 /// Get list of workers to be used from now on.
785 /// The list must be provided by the caller.
786 
788  Int_t & /* prioritychange */,
789  Bool_t resume)
790 {
792 
793  // User config files, when enabled, override cluster-wide configuration
794  if (gEnv->GetValue("ProofServ.UseUserCfg", 0) != 0) {
795  Int_t pc = 1;
796  return TProofServ::GetWorkers(workers, pc);
797  }
798 
799  // seqnum of the query for which we call getworkers
800  Bool_t dynamicStartup = gEnv->GetValue("Proof.DynamicStartup", kFALSE);
801  TString seqnum = (dynamicStartup) ? "" : XPD_GW_Static;
802  if (!fWaitingQueries->IsEmpty()) {
803  if (resume) {
804  seqnum += ((TProofQueryResult *)(fWaitingQueries->First()))->GetSeqNum();
805  } else {
806  seqnum += ((TProofQueryResult *)(fWaitingQueries->Last()))->GetSeqNum();
807  }
808  }
809  // Send request to the coordinator
810  TObjString *os = 0;
811  if (dynamicStartup) {
812  // We wait dynto seconds for the first worker to come; -1 means forever
813  Int_t dynto = gEnv->GetValue("Proof.DynamicStartupTimeout", -1);
814  Bool_t doto = (dynto > 0) ? kTRUE : kFALSE;
815  while (!(os = ((TXSocket *)fSocket)->SendCoordinator(kGetWorkers, seqnum.Data()))) {
816  if (doto > 0 && --dynto < 0) break;
817  // Another second
818  gSystem->Sleep(1000);
819  }
820  } else {
821  os = ((TXSocket *)fSocket)->SendCoordinator(kGetWorkers, seqnum.Data());
822  }
823 
824  // The reply contains some information about the master (image, workdir)
825  // followed by the information about the workers; the tokens for each node
826  // are separated by '&'
827  if (os) {
828  TString fl(os->GetName());
829  if (fl.BeginsWith(XPD_GW_QueryEnqueued)) {
830  SendAsynMessage("+++ Query cannot be processed now: enqueued");
831  return kQueryEnqueued;
832  }
833 
834  // Honour a max number of workers request (typically when running in valgrind)
835  Int_t nwrks = -1;
836  Bool_t pernode = kFALSE;
837  if (gSystem->Getenv("PROOF_NWORKERS")) {
838  TString s(gSystem->Getenv("PROOF_NWORKERS"));
839  if (s.EndsWith("x")) {
840  pernode = kTRUE;
841  s.ReplaceAll("x", "");
842  }
843  if (s.IsDigit()) {
844  nwrks = s.Atoi();
845  if (!dynamicStartup && (nwrks > 0)) {
846  // Notify, except in dynamic workers mode to avoid flooding
847  TString msg;
848  if (pernode) {
849  msg.Form("+++ Starting max %d workers per node following the setting of PROOF_NWORKERS", nwrks);
850  } else {
851  msg.Form("+++ Starting max %d workers following the setting of PROOF_NWORKERS", nwrks);
852  }
853  SendAsynMessage(msg);
854  } else {
855  nwrks = -1;
856  }
857  } else {
858  pernode = kFALSE;
859  }
860  }
861 
862  TString tok;
863  Ssiz_t from = 0;
864  TList *nodecnt = (pernode) ? new TList : 0 ;
865  if (fl.Tokenize(tok, from, "&")) {
866  if (!tok.IsNull()) {
867  TProofNodeInfo *master = new TProofNodeInfo(tok);
868  if (!master) {
869  Error("GetWorkers", "no appropriate master line got from coordinator");
870  return kQueryStop;
871  } else {
872  // Set image if not yet done and available
873  if (fImage.IsNull() && strlen(master->GetImage()) > 0)
874  fImage = master->GetImage();
875  SafeDelete(master);
876  }
877  // Now the workers
878  while (fl.Tokenize(tok, from, "&")) {
879  if (!tok.IsNull()) {
880  if (nwrks == -1 || nwrks > 0) {
881  // We have the minimal set of information to start
882  rc = kQueryOK;
883  if (pernode && nodecnt) {
884  TProofNodeInfo *ni = new TProofNodeInfo(tok);
885  TParameter<Int_t> *p = 0;
886  Int_t nw = 0;
887  if (!(p = (TParameter<Int_t> *) nodecnt->FindObject(ni->GetNodeName().Data()))) {
888  p = new TParameter<Int_t>(ni->GetNodeName().Data(), nw);
889  nodecnt->Add(p);
890  }
891  nw = p->GetVal();
892  if (gDebug > 0)
893  Info("GetWorkers","%p: name: %s (%s) val: %d (nwrks: %d)",
894  p, p->GetName(), ni->GetNodeName().Data(), nw, nwrks);
895  if (nw < nwrks) {
896  if (workers) workers->Add(ni);
897  nw++;
898  p->SetVal(nw);
899  } else {
900  // Two many workers on this machine already
901  SafeDelete(ni);
902  }
903  } else {
904  if (workers)
905  workers->Add(new TProofNodeInfo(tok));
906  // Count down
907  if (nwrks != -1) nwrks--;
908  }
909  } else {
910  // Release this worker (to cleanup the session list in the coordinator and get a fresh
911  // and correct list next call)
912  TProofNodeInfo *ni = new TProofNodeInfo(tok);
913  ReleaseWorker(ni->GetOrdinal().Data());
914  }
915  }
916  }
917  }
918  }
919  // Cleanup
920  if (nodecnt) {
921  nodecnt->SetOwner(kTRUE);
922  SafeDelete(nodecnt);
923  }
924  }
925 
926  // We are done
927  return rc;
928 }
929 
930 ////////////////////////////////////////////////////////////////////////////////
931 /// Handle error on the input socket
932 
934 {
935  // Try reconnection
936  if (fSocket && !fSocket->IsValid()) {
937 
938  fSocket->Reconnect();
939  if (fSocket && fSocket->IsValid()) {
940  if (gDebug > 0)
941  Info("HandleError",
942  "%p: connection to local coordinator re-established", this);
943  FlushLogFile();
944  return kFALSE;
945  }
946  }
947  Printf("TXProofServ::HandleError: %p: got called ...", this);
948 
949  // If master server, propagate interrupt to slaves
950  // (shutdown interrupt send internally).
951  if (IsMaster())
952  fProof->Close("S");
953 
954  // Avoid communicating back anything to the coordinator (it is gone)
955  if (fSocket) ((TXSocket *)fSocket)->SetSessionID(-1);
956 
957  Terminate(0);
958 
959  Printf("TXProofServ::HandleError: %p: DONE ... ", this);
960 
961  // We are done
962  return kTRUE;
963 }
964 
965 ////////////////////////////////////////////////////////////////////////////////
966 /// Handle asynchronous input on the input socket
967 
969 {
970  if (gDebug > 2)
971  Printf("TXProofServ::HandleInput %p, in: %p", this, in);
972 
973  XHandleIn_t *hin = (XHandleIn_t *) in;
974  Int_t acod = (hin) ? hin->fInt1 : kXPD_msg;
975 
976  // Act accordingly
977  if (acod == kXPD_ping || acod == kXPD_interrupt) {
978  // Interrupt or Ping
980 
981  } else if (acod == kXPD_flush) {
982  // Flush stdout, so that we can access the full log file
983  Info("HandleInput","kXPD_flush: flushing log file (stdout)");
984  fflush(stdout);
985 
986  } else if (acod == kXPD_urgent) {
987  // Get type
988  Int_t type = hin->fInt2;
989  switch (type) {
991  {
992  // Abort or Stop ?
993  Bool_t abort = (hin->fInt3 != 0) ? kTRUE : kFALSE;
994  // Timeout
995  Int_t timeout = hin->fInt4;
996  // Act now
997  if (fProof)
998  fProof->StopProcess(abort, timeout);
999  else
1000  if (fPlayer)
1001  fPlayer->StopProcess(abort, timeout);
1002  }
1003  break;
1004  default:
1005  Info("HandleInput","kXPD_urgent: unknown type: %d", type);
1006  }
1007 
1008  } else if (acod == kXPD_inflate) {
1009 
1010  // Obsolete type
1011  Warning("HandleInput", "kXPD_inflate: obsolete message type");
1012 
1013  } else if (acod == kXPD_priority) {
1014 
1015  // The factor is the priority to be propagated
1016  fGroupPriority = hin->fInt2;
1017  if (fProof)
1019  // Notify
1020  Info("HandleInput", "kXPD_priority: group %s priority set to %f",
1021  fGroup.Data(), (Float_t) fGroupPriority / 100.);
1022 
1023  } else if (acod == kXPD_clusterinfo) {
1024 
1025  // Information about the cluster status
1026  fTotSessions = hin->fInt2;
1027  fActSessions = hin->fInt3;
1028  fEffSessions = (hin->fInt4)/1000.;
1029  // Notify
1030  Info("HandleInput", "kXPD_clusterinfo: tot: %d, act: %d, eff: %f",
1032 
1033  } else {
1034  // Standard socket input
1036  // This request has been completed: remove the client ID from the pipe
1037  ((TXSocket *)fSocket)->RemoveClientID();
1038  }
1039 
1040  // We are done
1041  return kTRUE;
1042 }
1043 
1044 ////////////////////////////////////////////////////////////////////////////////
1045 /// Disable read timeout on the underlying socket
1046 
1048 {
1049  if (fSocket)
1050  ((TXSocket *)fSocket)->DisableTimeout();
1051 }
1052 
1053 ////////////////////////////////////////////////////////////////////////////////
1054 /// Enable read timeout on the underlying socket
1055 
1057 {
1058  if (fSocket)
1059  ((TXSocket *)fSocket)->EnableTimeout();
1060 }
1061 
1062 ////////////////////////////////////////////////////////////////////////////////
1063 /// Terminate the proof server.
1064 
1066 {
1067  if (fTerminated)
1068  // Avoid doubling the exit operations
1069  exit(1);
1070  fTerminated = kTRUE;
1071 
1072  // Notify
1073  Info("Terminate", "starting session termination operations ...");
1074  if (fgLogToSysLog > 0) {
1075  TString s;
1076  s.Form("%s -1 %.3f %.3f", fgSysLogEntity.Data(), fRealTime, fCpuTime);
1077  gSystem->Syslog(kLogNotice, s.Data());
1078  }
1079 
1080  // Notify the memory footprint
1081  ProcInfo_t pi;
1082  if (!gSystem->GetProcInfo(&pi)){
1083  Info("Terminate", "process memory footprint: %ld/%ld kB virtual, %ld/%ld kB resident ",
1085  }
1086 
1087  // Deactivate current monitor, if any
1088  if (fProof)
1089  fProof->SetMonitor(0, kFALSE);
1090 
1091  // Cleanup session directory
1092  if (status == 0) {
1093  // make sure we remain in a "connected" directory
1094  gSystem->ChangeDirectory("/");
1095  // needed in case fSessionDir is on NFS ?!
1096  gSystem->MakeDirectory(fSessionDir+"/.delete");
1097  gSystem->Exec(Form("%s %s", kRM, fSessionDir.Data()));
1098  }
1099 
1100  // Cleanup queries directory if empty
1101  if (IsMaster()) {
1102  if (!(fQMgr && fQMgr->Queries() && fQMgr->Queries()->GetSize())) {
1103  // make sure we remain in a "connected" directory
1104  gSystem->ChangeDirectory("/");
1105  // needed in case fQueryDir is on NFS ?!
1106  gSystem->MakeDirectory(fQueryDir+"/.delete");
1107  gSystem->Exec(Form("%s %s", kRM, fQueryDir.Data()));
1108  // Remove lock file
1109  if (fQueryLock)
1111  }
1112 
1113  // Unlock the query dir owned by this session
1114  if (fQueryLock)
1115  fQueryLock->Unlock();
1116  } else {
1117  // Try to stop processing if any
1118  Bool_t abort = (status == 0) ? kFALSE : kTRUE;
1119  if (!fIdle && fPlayer)
1120  fPlayer->StopProcess(abort,1);
1121  gSystem->Sleep(2000);
1122  }
1123 
1124  // Cleanup data directory if empty
1126  if (UnlinkDataDir(fDataDir))
1127  Info("Terminate", "data directory '%s' has been removed", fDataDir.Data());
1128  }
1129 
1130  // Remove input and signal handlers to avoid spurious "signals"
1131  // for closing activities executed upon exit()
1133 
1134  // Stop processing events (set a flag to exit the event loop)
1135  gSystem->ExitLoop();
1136 
1137  // We post the pipe once to wake up the main thread which is waiting for
1138  // activity on this socket; this fake activity will make it return and
1139  // eventually exit the loop.
1141 
1142  // Notify
1143  Printf("Terminate: termination operations ended: quitting!");
1144 }
1145 
1146 ////////////////////////////////////////////////////////////////////////////////
1147 /// Try locking query area of session tagged sessiontag.
1148 /// The id of the locking file is returned in fid and must be
1149 /// unlocked via UnlockQueryFile(fid).
1150 
1151 Int_t TXProofServ::LockSession(const char *sessiontag, TProofLockPath **lck)
1152 {
1153  // We do not need to lock our own session
1154  if (strstr(sessiontag, fTopSessionTag))
1155  return 0;
1156 
1157  if (!lck) {
1158  Info("LockSession","locker space undefined");
1159  return -1;
1160  }
1161  *lck = 0;
1162 
1163  // Check the format
1164  TString stag = sessiontag;
1165  TRegexp re("session-.*-.*-.*");
1166  Int_t i1 = stag.Index(re);
1167  if (i1 == kNPOS) {
1168  Info("LockSession","bad format: %s", sessiontag);
1169  return -1;
1170  }
1171  stag.ReplaceAll("session-","");
1172 
1173  // Drop query number, if any
1174  Int_t i2 = stag.Index(":q");
1175  if (i2 != kNPOS)
1176  stag.Remove(i2);
1177 
1178  // Make sure that parent process does not exist anylonger
1179  TString parlog = fSessionDir;
1180  parlog = parlog.Remove(parlog.Index("master-")+strlen("master-"));
1181  parlog += stag;
1182  if (!gSystem->AccessPathName(parlog)) {
1183  Info("LockSession","parent still running: do nothing");
1184  return -1;
1185  }
1186 
1187  // Lock the query lock file
1188  TString qlock = fQueryLock->GetName();
1189  qlock.ReplaceAll(fTopSessionTag, stag);
1190 
1191  if (!gSystem->AccessPathName(qlock)) {
1192  *lck = new TProofLockPath(qlock);
1193  if (((*lck)->Lock()) < 0) {
1194  Info("LockSession","problems locking query lock file");
1195  SafeDelete(*lck);
1196  return -1;
1197  }
1198  }
1199 
1200  // We are done
1201  return 0;
1202 }
1203 
1204 ////////////////////////////////////////////////////////////////////////////////
1205 /// Send message to intermediate coordinator to release worker of last ordinal
1206 /// ord.
1207 
1208 void TXProofServ::ReleaseWorker(const char *ord)
1209 {
1210  if (gDebug > 2) Info("ReleaseWorker","releasing: %s", ord);
1211 
1212  ((TXSocket *)fSocket)->SendCoordinator(kReleaseWorker, ord);
1213 }
Bool_t fMasterServ
Definition: TProofServ.h:125
Float_t fEffSessions
Definition: TProofServ.h:136
FILE * fLogFile
Definition: TProofServ.h:114
Bool_t fRealTimeLog
Definition: TProofServ.h:150
virtual const char * GetName() const
Returns name of object.
Definition: TNamed.h:51
TXSocketHandler * fInputHandler
Definition: TXProofServ.h:38
virtual Bool_t AccessPathName(const char *path, EAccessMode mode=kFileExists)
Returns FALSE if one can access a file using the specified access mode.
Definition: TSystem.cxx:1266
void Interrupt(EUrgent type, ESlaves list=kActive)
Send interrupt to master or slave servers.
Definition: TProof.cxx:2254
Int_t CatMotd()
Print message of the day (in the file pointed by the env PROOFMOTD or from fConfDir/etc/proof/motd).
Int_t Setup()
Print the ProofServ logo on standard output.
static void SetLocation(const char *loc="")
Set location string.
Definition: TXSocket.cxx:242
virtual Long_t ProcessLine(const char *line, Bool_t sync=kFALSE, Int_t *error=0)
Process a single command line, either a C++ statement or an interpreter command starting with a "...
virtual Bool_t IsValid() const
Definition: TSocket.h:162
virtual int GetPid()
Get process id.
Definition: TSystem.cxx:712
Int_t fActSessions
Definition: TProofServ.h:135
TString fConfFile
Definition: TProofServ.h:94
virtual void Delete(Option_t *option="")
Remove all objects from the list AND delete all heap based objects.
Definition: TList.cxx:405
virtual void Syslog(ELogLevel level, const char *mess)
Send mess to syslog daemon.
Definition: TSystem.cxx:1649
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition: TObject.cxx:899
void LogToMaster(Bool_t on=kTRUE)
Definition: TProofServ.h:336
virtual EQueryAction GetWorkers(TList *workers, Int_t &prioritychange, Bool_t resume=kFALSE)
Get list of workers to be used from now on.
double write(int n, const std::string &file_name, const std::string &vector_type, int compress=0)
writing
static Long_t fgResMemMax
Definition: TProofServ.h:174
void SetCompressionSettings(Int_t settings=1)
Used to specify the compression level and algorithm: settings = 100 * algorithm + level...
Definition: TSocket.cxx:1103
Int_t SetupCommon()
Common part (between TProofServ and TXProofServ) of the setup phase.
const double pi
Int_t CreateServer()
Finalize the server setup.
Collectable string class.
Definition: TObjString.h:32
float Float_t
Definition: RtypesCore.h:53
void SetMonitor(TMonitor *mon=0, Bool_t on=kTRUE)
Activate (on == TRUE) or deactivate (on == FALSE) all sockets monitored by &#39;mon&#39;. ...
Definition: TProof.cxx:2386
virtual Int_t Reconnect()
Definition: TSocket.h:168
void InterruptCurrentMonitor()
If in active in a monitor set ready state.
Definition: TProof.cxx:11307
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
Definition: TSocket.cxx:520
virtual Int_t SetOption(ESockOptions opt, Int_t val)
Set socket options.
Definition: TSocket.cxx:1017
static TXSocketHandler * GetSocketHandler(TFileHandler *h=0, TSocket *s=0)
Get an instance of the input socket handler with &#39;h&#39; as handler, connected to socket &#39;s&#39;...
TString & ReplaceAll(const TString &s1, const TString &s2)
Definition: TString.h:635
TString fSessionTag
Definition: TProofServ.h:97
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
TH1 * h
Definition: legend2.C:5
TVirtualProofPlayer * fPlayer
Definition: TProofServ.h:113
Bool_t UnlinkDataDir(const char *path)
Scan recursively the datadir and unlink it if empty Return kTRUE if it can be unlinked, kFALSE otherwise.
Bool_t HandleInput(const void *in=0)
Handle asynchronous input on the input socket.
const char *const kPROOF_WorkDir
Definition: TProof.h:154
Bool_t NoLogOpt() const
Definition: TApplication.h:144
virtual TObject * Last() const
Return the last object in the list. Returns 0 when list is empty.
Definition: TList.cxx:581
TString fGroup
Definition: TProofServ.h:92
virtual int MakeDirectory(const char *name)
Make a directory.
Definition: TSystem.cxx:822
virtual void AddSignalHandler(TSignalHandler *sh)
Add a signal handler to list of system signal handlers.
Definition: TSystem.cxx:537
Regular expression class.
Definition: TRegexp.h:35
const char * GetName() const
Returns name of object.
Definition: TParameter.h:76
Int_t UpdateSessionStatus(Int_t xst=-1)
Update the session status in the relevant file.
virtual Bool_t ChangeDirectory(const char *path)
Change directory.
Definition: TSystem.cxx:857
const char *const XPD_GW_QueryEnqueued
TPluginHandler * FindHandler(const char *base, const char *uri=0)
Returns the handler if there exists a handler for the specified URI.
#define gROOT
Definition: TROOT.h:364
Ssiz_t Index(const char *pat, Ssiz_t i=0, ECaseCompare cmp=kExact) const
Definition: TString.h:582
void SetVal(const AParamType &val)
Definition: TParameter.h:79
Int_t LoadPlugin()
Load the plugin library for this handler.
TList * Queries() const
Basic string class.
Definition: TString.h:137
virtual void SaveWorkerInfo()
Save information about the worker set in the file .workers in the working dir.
Definition: TProof.cxx:11780
int Int_t
Definition: RtypesCore.h:41
virtual const char * DirName(const char *pathname)
Return the directory name in pathname.
Definition: TSystem.cxx:997
bool Bool_t
Definition: RtypesCore.h:59
const Bool_t kFALSE
Definition: Rtypes.h:92
const char *const kRM
Definition: TProof.h:172
#define gInterpreter
Definition: TInterpreter.h:517
const TString & GetImage() const
virtual char * Which(const char *search, const char *file, EAccessMode mode=kFileExists)
Find location of file in a search path.
Definition: TSystem.cxx:1512
TString fImage
Definition: TProofServ.h:96
TString fService
Definition: TProofServ.h:90
#define NAME_DEBUG
Int_t LockSession(const char *sessiontag, TProofLockPath **lck)
Try locking query area of session tagged sessiontag.
void HandleUrgentData()
Handle high priority data sent by the master or client.
void DisableTimeout()
Disable read timeout on the underlying socket.
TString & Insert(Ssiz_t pos, const char *s)
Definition: TString.h:592
void EnableTimeout()
Enable read timeout on the underlying socket.
Int_t fInt3
Definition: TXSocket.h:65
virtual TFileHandler * RemoveFileHandler(TFileHandler *fh)
Remove a file handler from the list of file handlers.
Definition: TSystem.cxx:569
static Long_t fgVirtMemMax
Definition: TProofServ.h:173
Int_t Post(TSocket *s)
Write a byte to the global pipe to signal new availibility of new messages.
Definition: TXSocket.cxx:2284
Int_t fInt2
Definition: TXSocket.h:64
virtual TObject * FindObject(const char *name) const
Find an object in this list using its name.
Definition: TList.cxx:497
virtual void HandleSocketInput()
Handle input coming from the client or from the master server.
static const char * GetMacroPath()
Get macro search path. Static utility function.
Definition: TROOT.cxx:2563
static Int_t GetErrno()
Static function returning system error number.
Definition: TSystem.cxx:265
virtual void SetValue(const char *name, const char *value, EEnvLevel level=kEnvChange, const char *type=0)
Set the value of a resource or create a new resource.
Definition: TEnv.cxx:751
Implementation of TXSocket using PF_UNIX sockets.
Definition: TXUnixSocket.h:31
virtual Bool_t Notify()
Notify when signal occurs.
#define SafeDelete(p)
Definition: RConfig.h:507
virtual int Unlink(const char *name)
Unlink, i.e. remove, a file.
Definition: TSystem.cxx:1347
TSocket * fSocket
Definition: TProofServ.h:111
void StopProcess(Bool_t abort, Int_t timeout=-1)
Send STOPPROCESS message to master and workers.
Definition: TProof.cxx:6196
#define PDB(mask, level)
Definition: TProofDebug.h:58
virtual void Sleep(UInt_t milliSec)
Sleep milliSec milli seconds.
Definition: TSystem.cxx:442
virtual void Start(Long_t milliSec=-1, Bool_t singleShot=kFALSE)
Starts the timer with a milliSec timeout.
Definition: TTimer.cxx:211
Bool_t IsEndMaster() const
Definition: TProof.h:694
TXProofServInterruptHandler * fInterruptHandler
Definition: TXProofServ.h:37
void ReleaseWorker(const char *ord)
Send message to intermediate coordinator to release worker of last ordinal ord.
virtual const char * Getenv(const char *env)
Get environment variable.
Definition: TSystem.cxx:1628
Int_t Collect(const TSlave *sl, Long_t timeout=-1, Int_t endtype=-1, Bool_t deactonfail=kFALSE)
Collect responses from slave sl.
Definition: TProof.cxx:2647
#define EnvPutInt(name, val)
Definition: XrdClientEnv.hh:47
Bool_t EndsWith(const char *pat, ECaseCompare cmp=kExact) const
Return true if string ends with the specified string.
Definition: TString.cxx:2221
virtual void ExitLoop()
Exit from event loop.
Definition: TSystem.cxx:397
virtual UserGroup_t * GetUserInfo(Int_t uid)
Returns all user info in the UserGroup_t structure.
Definition: TSystem.cxx:1564
TString fDataDir
Definition: TProofServ.h:104
void FlushLogFile()
Reposition the read pointer in the log file to the very end.
TProof * fProof
Definition: TProofServ.h:112
TString fConfDir
Definition: TProofServ.h:93
TString flog
Definition: pq2main.cxx:37
TString fOrdinal
Definition: TProofServ.h:118
EQueryAction GetWorkers(TList *workers, Int_t &prioritychange, Bool_t resume=kFALSE)
Get list of workers to be used from now on.
TString fAdminPath
Definition: TProofServ.h:106
Int_t fInt4
Definition: TXSocket.h:66
A doubly linked list.
Definition: TList.h:47
Int_t fGroupPriority
Definition: TProofServ.h:123
This class implements the XProofD version of TProofServ, with respect to which it differs only for th...
Definition: TXProofServ.h:34
Float_t fCpuTime
Definition: TProofServ.h:128
const char * GetName() const
Returns name of object.
Definition: TObjString.h:42
TApplication * GetTXProofServ(Int_t *argc, char **argv, FILE *flog)
void SendAsynMessage(const char *msg, Bool_t lf=kTRUE)
Send an asychronous message to the master / client .
void Terminate(Int_t status)
Terminate the proof server.
Named parameter, streamable and storable.
Definition: TParameter.h:49
Int_t fProtocol
Definition: TProofServ.h:117
TString fUser
Definition: TSystem.h:152
TList * fActiveSlaves
Definition: TProof.h:507
static Int_t fgLogToSysLog
Definition: TProofServ.h:186
The purpose of this class is to provide a complete node description for masters, submasters and worke...
virtual TObject * First() const
Return the first object in the list. Returns 0 when list is empty.
Definition: TList.cxx:557
const TString & GetNodeName() const
R__EXTERN TSystem * gSystem
Definition: TSystem.h:549
Class providing the PROOF server.
Definition: TProofServ.h:80
Int_t fCompressMsg
Definition: TProofServ.h:156
TString fSessionDir
Definition: TProofServ.h:99
Long_t ExecPlugin(int nargs, const T &... params)
virtual Int_t GetValue(const char *name, Int_t dflt)
Returns the integer value for a resource.
Definition: TEnv.cxx:496
virtual ~TXProofServ()
Cleanup.
High level handler of connections to XProofD.
Definition: TXSocket.h:73
Long_t fMemVirtual
Definition: TSystem.h:207
Bool_t BeginsWith(const char *s, ECaseCompare cmp=kExact) const
Definition: TString.h:558
void SetActive(Bool_t=kTRUE)
Definition: TProof.h:1018
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
Definition: TString.cxx:2322
TIdleTOTimer * fIdleTOTimer
Definition: TProofServ.h:154
void HandleSigPipe()
Called when the client is not alive anymore; terminate the session.
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition: TObject.cxx:925
char * Form(const char *fmt,...)
Ssiz_t Length() const
Definition: TString.h:390
void HandleTermination()
Called when the client is not alive anymore; terminate the session.
This class implements a plugin library manager.
Int_t Ping(ESlaves list)
Ping PROOF slaves. Returns the number of slaves that responded.
Definition: TProof.cxx:4724
virtual Int_t Exec(const char *shellcmd)
Execute a command.
Definition: TSystem.cxx:658
const Int_t kPROOF_Protocol
Definition: TProof.h:150
void Interrupt()
Definition: TProofServ.h:305
TString fUser
Definition: TProofServ.h:91
Bool_t fIdle
Definition: TProofServ.h:143
TShutdownTimer * fShutdownTimer
Definition: TProofServ.h:152
#define Printf
Definition: TGeoToOCC.h:18
const TString & GetOrdinal() const
TString & Remove(Ssiz_t pos)
Definition: TString.h:616
long Long_t
Definition: RtypesCore.h:50
int Ssiz_t
Definition: RtypesCore.h:63
virtual Bool_t IsEmpty() const
Definition: TCollection.h:99
void Close(Option_t *option="")
Close all open slave servers.
Definition: TProof.cxx:1776
virtual void SendLogFile(Int_t status=0, Int_t start=-1, Int_t end=-1)
Send log file to master.
Int_t Argc() const
Definition: TApplication.h:141
#define ClassImp(name)
Definition: Rtypes.h:279
TString fTopSessionTag
Definition: TProofServ.h:98
virtual const char * HostName()
Return the system&#39;s host name.
Definition: TSystem.cxx:308
Ssiz_t Last(char c) const
Find last occurrence of a character c.
Definition: TString.cxx:865
Int_t fLogLevel
Definition: TProofServ.h:121
int type
Definition: TGX11.cxx:120
R__EXTERN TEnv * gEnv
Definition: TEnv.h:174
This class controls a Parallel ROOT Facility, PROOF, cluster.
Definition: TProof.h:346
char ** Argv() const
Definition: TApplication.h:142
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
Definition: TString.h:567
Float_t fRealTime
Definition: TProofServ.h:127
Int_t fInt1
Definition: TXSocket.h:63
Int_t Unlock()
Unlock the directory.
virtual void StopProcess(Bool_t abort, Int_t timeout=-1)=0
Bool_t HandleError(const void *in=0)
Handle error on the input socket.
Bool_t fEndMaster
Definition: TProofServ.h:124
TQueryResult version adapted to PROOF neeeds.
Bool_t IsNull() const
Definition: TString.h:387
static TString fgSysLogEntity
Definition: TProofServ.h:188
const char *const XPD_GW_Static
TXProofServ(Int_t *argc, char **argv, FILE *flog=0)
Main constructor.
virtual void Add(TObject *obj)
Definition: TList.h:81
const Ssiz_t kNPOS
Definition: Rtypes.h:115
R__EXTERN Int_t gProofDebugLevel
Definition: TProofDebug.h:56
virtual int GetProcInfo(ProcInfo_t *info) const
Returns cpu and memory used by this process into the ProcInfo_t structure.
Definition: TSystem.cxx:2471
virtual Int_t GetDescriptor() const
Definition: TSocket.h:142
TString fWorkDir
Definition: TProofServ.h:95
static TXSockPipe fgPipe
Definition: TXSocket.h:125
#define snprintf
Definition: civetweb.c:822
const char * proto
Definition: civetweb.c:11652
Int_t CountChar(Int_t c) const
Return number of times character c occurs in the string.
Definition: TString.cxx:444
R__EXTERN Int_t gDebug
Definition: Rtypes.h:128
void RedirectOutput(const char *dir=0, const char *mode="w")
Redirect stdout to a log file.
const AParamType & GetVal() const
Definition: TParameter.h:77
virtual void AddFileHandler(TFileHandler *fh)
Add a file handler to the list of system file handlers.
Definition: TSystem.cxx:559
static void ResetErrno()
Static function resetting system error number.
Definition: TSystem.cxx:281
This class creates the ROOT Application Environment that interfaces to the windowing system eventloop...
Definition: TApplication.h:45
Bool_t IsValid() const
Definition: TProof.h:967
virtual Bool_t ExpandPathName(TString &path)
Expand a pathname getting rid of special shell characters like ~.
Definition: TSystem.cxx:1244
Int_t BroadcastGroupPriority(const char *grp, Int_t priority, ESlaves list=kAllUnique)
Broadcast the group priority to all workers in the specified list.
Definition: TProof.cxx:2429
TString fSockPath
Definition: TXProofServ.h:39
TList * fWaitingQueries
Definition: TProofServ.h:142
TString fQueryDir
Definition: TProofServ.h:102
Int_t fTotSessions
Definition: TProofServ.h:134
virtual Int_t GetSize() const
Definition: TCollection.h:95
const Bool_t kTRUE
Definition: Rtypes.h:91
virtual void SetTitle(const char *title="")
Set the title of the TNamed.
Definition: TNamed.cxx:155
Int_t fLogFileDes
Definition: TProofServ.h:115
Bool_t fTerminated
Definition: TXProofServ.h:41
Bool_t IsMaster() const
Definition: TProofServ.h:307
static volatile Int_t gProofServDebug
Definition: TXProofServ.cxx:60
Long_t fMemResident
Definition: TSystem.h:206
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition: TObject.cxx:911
if(line.BeginsWith("/*"))
Definition: HLFactory.cxx:443
TQueryResultManager * fQMgr
Definition: TProofServ.h:140
virtual Long_t ProcessFile(const char *file, Int_t *error=0, Bool_t keep=kFALSE)
Process a file containing a C++ macro.
TProofLockPath * fQueryLock
Definition: TProofServ.h:109
const char * Data() const
Definition: TString.h:349