Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
TXProofServ.cxx
Go to the documentation of this file.
1// @(#)root/proofx:$Id$
2// Author: Gerardo Ganis 12/12/2005
3
4/*************************************************************************
5 * Copyright (C) 1995-2005, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12/** \class TXProofServ
13\ingroup proofx
14
15This class implements the XProofD version of TProofServ, with respect to which it differs only
16for the underlying connection technology.
17
18*/
19
20#include "RConfigure.h"
21#include <ROOT/RConfig.hxx>
22#include "Riostream.h"
23
24#ifdef WIN32
25 #include <io.h>
26 typedef long off_t;
27#endif
28#include <sys/types.h>
29#include <netinet/in.h>
30#include <utime.h>
31
32#include "TXProofServ.h"
33#include "TObjString.h"
34#include "TEnv.h"
35#include "TError.h"
36#include "TException.h"
37#include "THashList.h"
38#include "TInterpreter.h"
39#include "TParameter.h"
40#include "TProofDebug.h"
41#include "TProof.h"
42#include "TVirtualProofPlayer.h"
43#include "TQueryResultManager.h"
44#include "TRegexp.h"
45#include "TClass.h"
46#include "TROOT.h"
47#include "TSystem.h"
48#include "TPluginManager.h"
49#include "TXSocketHandler.h"
50#include "TXUnixSocket.h"
51#include "compiledata.h"
52#include "TProofNodeInfo.h"
53#include "XProofProtocol.h"
54#include "snprintf.h"
55
58
59
60// debug hook
61static volatile Int_t gProofServDebug = 1;
62
63//----- SigPipe signal handler -------------------------------------------------
64////////////////////////////////////////////////////////////////////////////////
65
66class TXProofServSigPipeHandler : public TSignalHandler {
67 TXProofServ *fServ;
68public:
69 TXProofServSigPipeHandler(TXProofServ *s) : TSignalHandler(kSigInterrupt, kFALSE)
70 { fServ = s; }
71 Bool_t Notify();
72};
73
74////////////////////////////////////////////////////////////////////////////////
75
76Bool_t TXProofServSigPipeHandler::Notify()
77{
78 fServ->HandleSigPipe();
79 return kTRUE;
80}
81
82//----- Termination signal handler ---------------------------------------------
83////////////////////////////////////////////////////////////////////////////////
84
85class TXProofServTerminationHandler : public TSignalHandler {
86 TXProofServ *fServ;
87public:
88 TXProofServTerminationHandler(TXProofServ *s)
89 : TSignalHandler(kSigTermination, kFALSE) { fServ = s; }
90 Bool_t Notify();
91};
92
93////////////////////////////////////////////////////////////////////////////////
94
95Bool_t TXProofServTerminationHandler::Notify()
96{
97 Printf("Received SIGTERM: terminating");
98
99 fServ->HandleTermination();
100 return kTRUE;
101}
102
103//----- Seg violation signal handler ---------------------------------------------
104////////////////////////////////////////////////////////////////////////////////
105
106class TXProofServSegViolationHandler : public TSignalHandler {
107 TXProofServ *fServ;
108public:
109 TXProofServSegViolationHandler(TXProofServ *s)
111 Bool_t Notify();
112};
113
114////////////////////////////////////////////////////////////////////////////////
115
116Bool_t TXProofServSegViolationHandler::Notify()
117{
118 Printf("**** ");
119 Printf("**** Segmentation violation: terminating ****");
120 Printf("**** ");
121 fServ->HandleTermination();
122 return kTRUE;
123}
124
125//----- Input handler for messages from parent or master -----------------------
126////////////////////////////////////////////////////////////////////////////////
127
128class TXProofServInputHandler : public TFileHandler {
129 TXProofServ *fServ;
130public:
131 TXProofServInputHandler(TXProofServ *s, Int_t fd) : TFileHandler(fd, 1)
132 { fServ = s; }
133 Bool_t Notify();
134 Bool_t ReadNotify() { return Notify(); }
135};
136
137////////////////////////////////////////////////////////////////////////////////
138
139Bool_t TXProofServInputHandler::Notify()
140{
141 fServ->HandleSocketInput();
142 // This request has been completed: remove the client ID from the pipe
143 ((TXUnixSocket *) fServ->GetSocket())->RemoveClientID();
144 return kTRUE;
145}
146
148
149// Hook to the constructor. This is needed to avoid using the plugin manager
150// which may create problems in multi-threaded environments.
151extern "C" {
152 TApplication *GetTXProofServ(Int_t *argc, char **argv, FILE *flog)
153 { return new TXProofServ(argc, argv, flog); }
154}
155
156////////////////////////////////////////////////////////////////////////////////
157/// Main constructor
158
159TXProofServ::TXProofServ(Int_t *argc, char **argv, FILE *flog)
160 : TProofServ(argc, argv, flog)
161{
163 fInputHandler = 0;
165
166 // TODO:
167 // Int_t useFIFO = 0;
168/* if (GetParameter(fProof->GetInputList(), "PROOF_UseFIFO", useFIFO) != 0) {
169 if (useFIFO == 1)
170 Info("", "enablig use of FIFO (if allowed by the server)");
171 else
172 Warning("", "unsupported strategy index (%d): ignore", strategy);
173 }
174*/
175}
176
177////////////////////////////////////////////////////////////////////////////////
178/// Finalize the server setup. If master, create the TProof instance to talk
179/// the worker or submaster nodes.
180/// Return 0 on success, -1 on error
181
183{
184 Bool_t xtest = (Argc() > 3 && !strcmp(Argv(3), "test")) ? kTRUE : kFALSE;
185
186 if (gProofDebugLevel > 0)
187 Info("CreateServer", "starting%s server creation", (xtest ? " test" : ""));
188
189 // Get file descriptor for log file
190 if (fLogFile) {
191 // Use the file already open by pmain
192 if ((fLogFileDes = fileno(fLogFile)) < 0) {
193 Error("CreateServer", "resolving the log file description number");
194 return -1;
195 }
196 // Hide the session start-up logs unless we are in verbose mode
197 if (gProofDebugLevel <= 0)
198 lseek(fLogFileDes, (off_t) 0, SEEK_END);
199 }
200
201 // Global location string in TXSocket
202 TXSocket::SetLocation((IsMaster()) ? "master" : "slave");
203
204 // Set debug level in XrdClient
205 EnvPutInt(NAME_DEBUG, gEnv->GetValue("XNet.Debug", 0));
206
207 // Get socket to be used to call back our xpd
208 if (xtest) {
209 // test session, just send the protocol version on the open pipe
210 // and exit
211 if (!(fSockPath = gSystem->Getenv("ROOTOPENSOCK"))) {
212 Error("CreateServer", "test: socket setup by xpd undefined");
213 return -1;
214 }
215 Int_t fpw = (Int_t) strtol(fSockPath.Data(), 0, 10);
216 int proto = htonl(kPROOF_Protocol);
217 fSockPath = "";
218 if (write(fpw, &proto, sizeof(proto)) != sizeof(proto)) {
219 Error("CreateServer", "test: sending protocol number");
220 return -1;
221 }
222 exit(0);
223 } else {
224 fSockPath = gEnv->GetValue("ProofServ.OpenSock", "");
225 if (fSockPath.Length() <= 0) {
226 Error("CreateServer", "socket setup by xpd undefined");
227 return -1;
228 }
229 TString entity = gEnv->GetValue("ProofServ.Entity", "");
230 if (entity.Length() > 0)
231 fSockPath.Insert(0,Form("%s/", entity.Data()));
232 }
233
234 // Get open socket descriptor, if any
235 Int_t sockfd = -1;
236 const char *opensock = gSystem->Getenv("ROOTOPENSOCK");
237 if (opensock && strlen(opensock) > 0) {
239 sockfd = (Int_t) strtol(opensock, 0, 10);
240 if (TSystem::GetErrno() == ERANGE) {
241 sockfd = -1;
242 Warning("CreateServer", "socket descriptor: wrong conversion from '%s'", opensock);
243 }
244 if (sockfd > 0 && gProofDebugLevel > 0)
245 Info("CreateServer", "using open connection (descriptor %d)", sockfd);
246 }
247
248 // Get the sessions ID
249 Int_t psid = gEnv->GetValue("ProofServ.SessionID", -1);
250 if (psid < 0) {
251 Error("CreateServer", "Session ID undefined");
252 return -1;
253 }
254
255 // Call back the server
256 fSocket = new TXUnixSocket(fSockPath, psid, -1, this, sockfd);
257 if (!fSocket || !(fSocket->IsValid())) {
258 Error("CreateServer", "Failed to open connection to XrdProofd coordinator");
259 return -1;
260 }
261 // Set compression level, if any
263
264 // Set the title for debugging
265 TString tgt("client");
266 if (fOrdinal != "0") {
267 tgt = fOrdinal;
268 if (tgt.Last('.') != kNPOS) tgt.Remove(tgt.Last('.'));
269 }
270 fSocket->SetTitle(tgt);
271
272 // Set the this as reference of this socket
273 ((TXSocket *)fSocket)->fReference = this;
274
275 // Get socket descriptor
276 Int_t sock = fSocket->GetDescriptor();
277
278 // Install message input handlers
280 TXSocketHandler::GetSocketHandler(new TXProofServInputHandler(this, sock), fSocket);
282
283 // Get the client ID
284 Int_t cid = gEnv->GetValue("ProofServ.ClientID", -1);
285 if (cid < 0) {
286 Error("CreateServer", "Client ID undefined");
287 SendLogFile();
288 return -1;
289 }
290 ((TXSocket *)fSocket)->SetClientID(cid);
291
292 // debug hooks
293 if (IsMaster()) {
294 // wait (loop) in master to allow debugger to connect
295 if (gEnv->GetValue("Proof.GdbHook",0) == 1) {
296 while (gProofServDebug)
297 ;
298 }
299 } else {
300 // wait (loop) in slave to allow debugger to connect
301 if (gEnv->GetValue("Proof.GdbHook",0) == 2) {
302 while (gProofServDebug)
303 ;
304 }
305 }
306
307 if (gProofDebugLevel > 0)
308 Info("CreateServer", "Service: %s, ConfDir: %s, IsMaster: %d",
310
311 if (Setup() == -1) {
312 // Setup failure
313 LogToMaster();
314 SendLogFile();
315 Terminate(0);
316 return -1;
317 }
318
319 if (!fLogFile) {
321 // If for some reason we failed setting a redirection file for the logs
322 // we cannot continue
323 if (!fLogFile || (fLogFileDes = fileno(fLogFile)) < 0) {
324 LogToMaster();
325 SendLogFile(-98);
326 Terminate(0);
327 return -1;
328 }
329 }
330
331 // Send message of the day to the client
332 if (IsMaster()) {
333 if (CatMotd() == -1) {
334 LogToMaster();
335 SendLogFile(-99);
336 Terminate(0);
337 return -1;
338 }
339 }
340
341 // Everybody expects iostream to be available, so load it...
342 ProcessLine("#include <iostream>", kTRUE);
343 ProcessLine("#include <string>",kTRUE); // for std::string iostream.
344
345 // Load user functions
346 const char *logon;
347 logon = gEnv->GetValue("Proof.Load", (char *)0);
348 if (logon) {
349 char *mac = gSystem->Which(TROOT::GetMacroPath(), logon, kReadPermission);
350 if (mac)
351 ProcessLine(Form(".L %s", logon), kTRUE);
352 delete [] mac;
353 }
354
355 // Execute logon macro
356 logon = gEnv->GetValue("Proof.Logon", (char *)0);
357 if (logon && !NoLogOpt()) {
358 char *mac = gSystem->Which(TROOT::GetMacroPath(), logon, kReadPermission);
359 if (mac)
360 ProcessFile(logon);
361 delete [] mac;
362 }
363
364 // Save current interpreter context
365 gInterpreter->SaveContext();
366 gInterpreter->SaveGlobalsContext();
367
368 // if master, start slave servers
369 if (IsMaster()) {
370 TString master;
371
372 if (fConfFile.BeginsWith("lite:")) {
373 master = "lite://";
374 } else {
375 master.Form("proof://%s@__master__", fUser.Data());
376
377 // Add port, if defined
378 Int_t port = gEnv->GetValue("ProofServ.XpdPort", -1);
379 if (port > -1) {
380 master += ":";
381 master += port;
382 }
383 }
384
385 // Make sure that parallel startup via threads is not active
386 // (it is broken for xpd because of the locks on gInterpreterMutex)
387 gEnv->SetValue("Proof.ParallelStartup", 0);
388
389 // Get plugin manager to load appropriate TProof from
390 TPluginManager *pm = gROOT->GetPluginManager();
391 if (!pm) {
392 Error("CreateServer", "no plugin manager found");
393 SendLogFile(-99);
394 Terminate(0);
395 return -1;
396 }
397
398 // Find the appropriate handler
399 TPluginHandler *h = pm->FindHandler("TProof", fConfFile);
400 if (!h) {
401 Error("CreateServer", "no plugin found for TProof with a"
402 " config file of '%s'", fConfFile.Data());
403 SendLogFile(-99);
404 Terminate(0);
405 return -1;
406 }
407
408 // load the plugin
409 if (h->LoadPlugin() == -1) {
410 Error("CreateServer", "plugin for TProof could not be loaded");
411 SendLogFile(-99);
412 Terminate(0);
413 return -1;
414 }
415
416 // Make instance of TProof
417 if (fConfFile.BeginsWith("lite:")) {
418 // Remove input and signal handlers to avoid spurious "signals"
419 // during startup
421 fProof = reinterpret_cast<TProof*>(h->ExecPlugin(6, master.Data(),
422 0, 0,
423 fLogLevel,
424 fSessionDir.Data(), 0));
425 // Re-enable input and signal handlers
427 } else {
428 fProof = reinterpret_cast<TProof*>(h->ExecPlugin(5, master.Data(),
429 fConfFile.Data(),
430 fConfDir.Data(),
431 fLogLevel,
433 }
434
435 // Save worker info
437
438 if (!fProof || (fProof && !fProof->IsValid())) {
439 Error("CreateServer", "plugin for TProof could not be executed");
440 FlushLogFile();
441 delete fProof;
442 fProof = 0;
443 SendLogFile(-99);
444 Terminate(0);
445 return -1;
446 }
447 // Find out if we are a master in direct contact only with workers
449
450 SendLogFile();
451 }
452
453 // Setup the shutdown timer
454 if (!fShutdownTimer) {
455 // Check activity on socket every 5 mins
456 fShutdownTimer = new TShutdownTimer(this, 300000);
458 }
459
460 // Check if schema evolution is effective: clients running versions <=17 do not
461 // support that: send a warning message
462 if (fProtocol <= 17) {
463 TString msg;
464 msg.Form("Warning: client version is too old: automatic schema evolution is ineffective.\n"
465 " This may generate compatibility problems between streamed objects.\n"
466 " The advise is to move to ROOT >= 5.21/02 .");
467 SendAsynMessage(msg.Data());
468 }
469
470 // Setup the idle timer
471 if (IsMaster() && !fIdleTOTimer) {
472 // Check activity on socket every 5 mins
473 Int_t idle_to = gEnv->GetValue("ProofServ.IdleTimeout", -1);
474 if (idle_to > 0) {
475 fIdleTOTimer = new TIdleTOTimer(this, idle_to * 1000);
477 if (gProofDebugLevel > 0)
478 Info("CreateServer", " idle timer started (%d secs)", idle_to);
479 } else if (gProofDebugLevel > 0) {
480 Info("CreateServer", " idle timer not started (no idle timeout requested)");
481 }
482 }
483
484 // Done
485 return 0;
486}
487
488////////////////////////////////////////////////////////////////////////////////
489/// Cleanup. Not really necessary since after this dtor there is no
490/// live anyway.
491
493{
494 delete fSocket;
495}
496
497////////////////////////////////////////////////////////////////////////////////
498/// Handle high priority data sent by the master or client.
499
501{
502 // Real-time notification of messages
504
505 // Get interrupt
506 Bool_t fw = kFALSE;
507 Int_t iLev = ((TXSocket *)fSocket)->GetInterrupt(fw);
508 if (iLev < 0) {
509 Error("HandleUrgentData", "error receiving interrupt");
510 return;
511 }
512
513 PDB(kGlobal, 2)
514 Info("HandleUrgentData", "got interrupt: %d\n", iLev);
515
516 if (fProof)
517 fProof->SetActive();
518
519 switch (iLev) {
520
521 case TProof::kPing:
522 PDB(kGlobal, 2)
523 Info("HandleUrgentData", "*** Ping");
524
525 // If master server, propagate interrupt to slaves
526 if (fw && IsMaster()) {
528 if (nbad > 0) {
529 Info("HandleUrgentData","%d slaves did not reply to ping",nbad);
530 }
531 }
532
533 // Touch the admin path to show we are alive
534 if (fAdminPath.IsNull()) {
535 fAdminPath = gEnv->GetValue("ProofServ.AdminPath", "");
536 }
537
538 if (!fAdminPath.IsNull()) {
539 if (!fAdminPath.EndsWith(".status")) {
540 // Update file time stamps
541 if (utime(fAdminPath.Data(), 0) != 0)
542 Info("HandleUrgentData", "problems touching path: %s", fAdminPath.Data());
543 else
544 PDB(kGlobal, 2)
545 Info("HandleUrgentData", "touching path: %s", fAdminPath.Data());
546 } else {
547 // Update the status in the file
548 // 0 idle
549 // 1 running
550 // 2 being terminated (currently unused)
551 // 3 queued
552 // 4 idle timed-out
553 Int_t uss_rc = UpdateSessionStatus(-1);
554 if (uss_rc != 0)
555 Error("HandleUrgentData", "problems updating status path: %s (errno: %d)", fAdminPath.Data(), -uss_rc);
556 }
557 } else {
558 Info("HandleUrgentData", "admin path undefined");
559 }
560
561 break;
562
564 Info("HandleUrgentData", "*** Hard Interrupt");
565
566 // If master server, propagate interrupt to slaves
567 if (fw && IsMaster())
569
570 // Flush input socket
571 ((TXSocket *)fSocket)->Flush();
572
573 if (IsMaster())
574 SendLogFile();
575
576 break;
577
579 Info("HandleUrgentData", "Soft Interrupt");
580
581 // If master server, propagate interrupt to slaves
582 if (fw && IsMaster())
584
585 Interrupt();
586
587 if (IsMaster())
588 SendLogFile();
589
590 break;
591
592
594 Info("HandleUrgentData", "Shutdown Interrupt");
595
596 // When returning for here connection are closed
598
599 break;
600
601 default:
602 Error("HandleUrgentData", "unexpected type: %d", iLev);
603 break;
604 }
605
606
608}
609
610////////////////////////////////////////////////////////////////////////////////
611/// Called when the client is not alive anymore; terminate the session.
612
614{
615 // Real-time notification of messages
616
617 Info("HandleSigPipe","got sigpipe ... do nothing");
618}
619
620////////////////////////////////////////////////////////////////////////////////
621/// Called when the client is not alive anymore; terminate the session.
622
624{
625 // If master server, propagate interrupt to slaves
626 // (shutdown interrupt send internally).
627 if (IsMaster()) {
628
629 // If not idle, try first to stop processing
630 if (!fIdle) {
631 // Remove pending requests
633 // Interrupt the current monitor
635 // Do not wait for ever, but al least 20 seconds
636 Long_t timeout = gEnv->GetValue("Proof.ShutdownTimeout", 60);
637 timeout = (timeout > 20) ? timeout : 20;
638 // Processing will be aborted
639 fProof->StopProcess(kTRUE, (Long_t) (timeout / 2));
640 // Receive end-of-processing messages, but do not wait for ever
641 fProof->Collect(TProof::kActive, timeout);
642 // Still not idle
643 if (!fIdle)
644 Warning("HandleTermination","processing could not be stopped");
645 }
646 // Close the session
647 if (fProof)
648 fProof->Close("S");
649 }
650
651 Terminate(0); // will not return from here....
652}
653
654////////////////////////////////////////////////////////////////////////////////
655/// Print the ProofServ logo on standard output.
656/// Return 0 on success, -1 on error
657
659{
660 char str[512];
661
662 if (IsMaster()) {
663 snprintf(str, 512, "**** Welcome to the PROOF server @ %s ****", gSystem->HostName());
664 } else {
665 snprintf(str, 512, "**** PROOF worker server @ %s started ****", gSystem->HostName());
666 }
667
668 if (fSocket->Send(str) != 1+static_cast<Int_t>(strlen(str))) {
669 Error("Setup", "failed to send proof server startup message");
670 return -1;
671 }
672
673 // Get client protocol
674 if ((fProtocol = gEnv->GetValue("ProofServ.ClientVersion", -1)) < 0) {
675 Error("Setup", "remote proof protocol missing");
676 return -1;
677 }
678
679 // The local user
680 fUser = gEnv->GetValue("ProofServ.Entity", "");
681 if (fUser.Length() >= 0) {
682 if (fUser.Contains(":"))
683 fUser.Remove(fUser.Index(":"));
684 if (fUser.Contains("@"))
685 fUser.Remove(fUser.Index("@"));
686 } else {
688 if (pw) {
689 fUser = pw->fUser;
690 delete pw;
691 }
692 }
693
694 // Work dir and ...
695 if (IsMaster()) {
696 TString cf = gEnv->GetValue("ProofServ.ProofConfFile", "");
697 if (cf.Length() > 0)
698 fConfFile = cf;
699 }
700 fWorkDir = gEnv->GetValue("ProofServ.Sandbox", Form("~/%s", kPROOF_WorkDir));
701
702 // Get Session tag
703 if ((fSessionTag = gEnv->GetValue("ProofServ.SessionTag", "-1")) == "-1") {
704 Error("Setup", "Session tag missing");
705 return -1;
706 }
707 // Get top session tag, i.e. the tag of the PROOF session
708 if ((fTopSessionTag = gEnv->GetValue("ProofServ.TopSessionTag", "-1")) == "-1") {
709 fTopSessionTag = "";
710 // Try to extract it from log file path (for backward compatibility)
711 if (gSystem->Getenv("ROOTPROOFLOGFILE")) {
712 fTopSessionTag = gSystem->GetDirName(gSystem->Getenv("ROOTPROOFLOGFILE"));
713 Ssiz_t lstl;
714 if ((lstl = fTopSessionTag.Last('/')) != kNPOS) fTopSessionTag.Remove(0, lstl + 1);
715 if (fTopSessionTag.BeginsWith("session-")) {
716 fTopSessionTag.Remove(0, strlen("session-"));
717 } else {
718 fTopSessionTag = "";
719 }
720 }
721 if (fTopSessionTag.IsNull()) {
722 Error("Setup", "top session tag missing");
723 return -1;
724 }
725 }
726
727 // Make sure the process ID is in the tag
728 TString spid = Form("-%d", gSystem->GetPid());
729 if (!fSessionTag.EndsWith(spid)) {
730 Int_t nd = 0;
731 if ((nd = fSessionTag.CountChar('-')) >= 2) {
732 Int_t id = fSessionTag.Index("-", fSessionTag.Index("-") + 1);
733 if (id != kNPOS) fSessionTag.Remove(id);
734 } else if (nd != 1) {
735 Warning("Setup", "Wrong number of '-' in session tag: protocol error? %s", fSessionTag.Data());
736 }
737 // Add this process ID
738 fSessionTag += spid;
739 }
740 if (gProofDebugLevel > 0)
741 Info("Setup", "session tags: %s, %s", fTopSessionTag.Data(), fSessionTag.Data());
742
743 // Get Session dir (sandbox)
744 if ((fSessionDir = gEnv->GetValue("ProofServ.SessionDir", "-1")) == "-1") {
745 Error("Setup", "Session dir missing");
746 return -1;
747 }
748
749 // Goto to the main PROOF working directory
751 if (gProofDebugLevel > 0)
752 Info("Setup", "working directory set to %s", fWorkDir.Data());
753
754 // Common setup
755 if (SetupCommon() != 0) {
756 Error("Setup", "common setup failed");
757 return -1;
758 }
759
760 // Send packages off immediately to reduce latency
762
763 // Check every two hours if client is still alive
765
766 // Install SigPipe handler to handle kKeepAlive failure
767 gSystem->AddSignalHandler(new TXProofServSigPipeHandler(this));
768
769 // Install Termination handler
770 gSystem->AddSignalHandler(new TXProofServTerminationHandler(this));
771
772 // Install seg violation handler
773 gSystem->AddSignalHandler(new TXProofServSegViolationHandler(this));
774
775 if (gProofDebugLevel > 0)
776 Info("Setup", "successfully completed");
777
778 // Done
779 return 0;
780}
781
782////////////////////////////////////////////////////////////////////////////////
783/// Get list of workers to be used from now on.
784/// The list must be provided by the caller.
785
787 Int_t & /* prioritychange */,
788 Bool_t resume)
789{
791
792 // User config files, when enabled, override cluster-wide configuration
793 if (gEnv->GetValue("ProofServ.UseUserCfg", 0) != 0) {
794 Int_t pc = 1;
795 return TProofServ::GetWorkers(workers, pc);
796 }
797
798 // seqnum of the query for which we call getworkers
799 Bool_t dynamicStartup = gEnv->GetValue("Proof.DynamicStartup", kFALSE);
800 TString seqnum = (dynamicStartup) ? "" : XPD_GW_Static;
801 if (!fWaitingQueries->IsEmpty()) {
802 if (resume) {
803 seqnum += ((TProofQueryResult *)(fWaitingQueries->First()))->GetSeqNum();
804 } else {
805 seqnum += ((TProofQueryResult *)(fWaitingQueries->Last()))->GetSeqNum();
806 }
807 }
808 // Send request to the coordinator
809 TObjString *os = 0;
810 if (dynamicStartup) {
811 // We wait dynto seconds for the first worker to come; -1 means forever
812 Int_t dynto = gEnv->GetValue("Proof.DynamicStartupTimeout", -1);
813 Bool_t doto = (dynto > 0) ? kTRUE : kFALSE;
814 while (!(os = ((TXSocket *)fSocket)->SendCoordinator(kGetWorkers, seqnum.Data()))) {
815 if (doto > 0 && --dynto < 0) break;
816 // Another second
817 gSystem->Sleep(1000);
818 }
819 } else {
820 os = ((TXSocket *)fSocket)->SendCoordinator(kGetWorkers, seqnum.Data());
821 }
822
823 // The reply contains some information about the master (image, workdir)
824 // followed by the information about the workers; the tokens for each node
825 // are separated by '&'
826 if (os) {
827 TString fl(os->GetName());
829 SendAsynMessage("+++ Query cannot be processed now: enqueued");
830 return kQueryEnqueued;
831 }
832
833 // Honour a max number of workers request (typically when running in valgrind)
834 Int_t nwrks = -1;
835 Bool_t pernode = kFALSE;
836 if (gSystem->Getenv("PROOF_NWORKERS")) {
837 TString s(gSystem->Getenv("PROOF_NWORKERS"));
838 if (s.EndsWith("x")) {
839 pernode = kTRUE;
840 s.ReplaceAll("x", "");
841 }
842 if (s.IsDigit()) {
843 nwrks = s.Atoi();
844 if (!dynamicStartup && (nwrks > 0)) {
845 // Notify, except in dynamic workers mode to avoid flooding
846 TString msg;
847 if (pernode) {
848 msg.Form("+++ Starting max %d workers per node following the setting of PROOF_NWORKERS", nwrks);
849 } else {
850 msg.Form("+++ Starting max %d workers following the setting of PROOF_NWORKERS", nwrks);
851 }
852 SendAsynMessage(msg);
853 } else {
854 nwrks = -1;
855 }
856 } else {
857 pernode = kFALSE;
858 }
859 }
860
861 TString tok;
862 Ssiz_t from = 0;
863 TList *nodecnt = (pernode) ? new TList : 0 ;
864 if (fl.Tokenize(tok, from, "&")) {
865 if (!tok.IsNull()) {
866 TProofNodeInfo *master = new TProofNodeInfo(tok);
867 if (!master) {
868 Error("GetWorkers", "no appropriate master line got from coordinator");
869 return kQueryStop;
870 } else {
871 // Set image if not yet done and available
872 if (fImage.IsNull() && strlen(master->GetImage()) > 0)
873 fImage = master->GetImage();
874 SafeDelete(master);
875 }
876 // Now the workers
877 while (fl.Tokenize(tok, from, "&")) {
878 if (!tok.IsNull()) {
879 if (nwrks == -1 || nwrks > 0) {
880 // We have the minimal set of information to start
881 rc = kQueryOK;
882 if (pernode && nodecnt) {
883 TProofNodeInfo *ni = new TProofNodeInfo(tok);
884 TParameter<Int_t> *p = 0;
885 Int_t nw = 0;
886 if (!(p = (TParameter<Int_t> *) nodecnt->FindObject(ni->GetNodeName().Data()))) {
887 p = new TParameter<Int_t>(ni->GetNodeName().Data(), nw);
888 nodecnt->Add(p);
889 }
890 nw = p->GetVal();
891 if (gDebug > 0)
892 Info("GetWorkers","%p: name: %s (%s) val: %d (nwrks: %d)",
893 p, p->GetName(), ni->GetNodeName().Data(), nw, nwrks);
894 if (nw < nwrks) {
895 if (workers) workers->Add(ni);
896 nw++;
897 p->SetVal(nw);
898 } else {
899 // Two many workers on this machine already
900 SafeDelete(ni);
901 }
902 } else {
903 if (workers)
904 workers->Add(new TProofNodeInfo(tok));
905 // Count down
906 if (nwrks != -1) nwrks--;
907 }
908 } else {
909 // Release this worker (to cleanup the session list in the coordinator and get a fresh
910 // and correct list next call)
911 TProofNodeInfo *ni = new TProofNodeInfo(tok);
913 }
914 }
915 }
916 }
917 }
918 // Cleanup
919 if (nodecnt) {
920 nodecnt->SetOwner(kTRUE);
921 SafeDelete(nodecnt);
922 }
923 }
924
925 // We are done
926 return rc;
927}
928
929////////////////////////////////////////////////////////////////////////////////
930/// Handle error on the input socket
931
933{
934 // Try reconnection
935 if (fSocket && !fSocket->IsValid()) {
936
938 if (fSocket && fSocket->IsValid()) {
939 if (gDebug > 0)
940 Info("HandleError",
941 "%p: connection to local coordinator re-established", this);
942 FlushLogFile();
943 return kFALSE;
944 }
945 }
946 Printf("TXProofServ::HandleError: %p: got called ...", this);
947
948 // If master server, propagate interrupt to slaves
949 // (shutdown interrupt send internally).
950 if (IsMaster())
951 fProof->Close("S");
952
953 // Avoid communicating back anything to the coordinator (it is gone)
954 if (fSocket) ((TXSocket *)fSocket)->SetSessionID(-1);
955
956 Terminate(0);
957
958 Printf("TXProofServ::HandleError: %p: DONE ... ", this);
959
960 // We are done
961 return kTRUE;
962}
963
964////////////////////////////////////////////////////////////////////////////////
965/// Handle asynchronous input on the input socket
966
968{
969 if (gDebug > 2)
970 Printf("TXProofServ::HandleInput %p, in: %p", this, in);
971
972 XHandleIn_t *hin = (XHandleIn_t *) in;
973 Int_t acod = (hin) ? hin->fInt1 : kXPD_msg;
974
975 // Act accordingly
976 if (acod == kXPD_ping || acod == kXPD_interrupt) {
977 // Interrupt or Ping
979
980 } else if (acod == kXPD_flush) {
981 // Flush stdout, so that we can access the full log file
982 Info("HandleInput","kXPD_flush: flushing log file (stdout)");
983 fflush(stdout);
984
985 } else if (acod == kXPD_urgent) {
986 // Get type
987 Int_t type = hin->fInt2;
988 switch (type) {
990 {
991 // Abort or Stop ?
992 Bool_t abort = (hin->fInt3 != 0) ? kTRUE : kFALSE;
993 // Timeout
994 Int_t timeout = hin->fInt4;
995 // Act now
996 if (fProof)
997 fProof->StopProcess(abort, timeout);
998 else
999 if (fPlayer)
1000 fPlayer->StopProcess(abort, timeout);
1001 }
1002 break;
1003 default:
1004 Info("HandleInput","kXPD_urgent: unknown type: %d", type);
1005 }
1006
1007 } else if (acod == kXPD_inflate) {
1008
1009 // Obsolete type
1010 Warning("HandleInput", "kXPD_inflate: obsolete message type");
1011
1012 } else if (acod == kXPD_priority) {
1013
1014 // The factor is the priority to be propagated
1015 fGroupPriority = hin->fInt2;
1016 if (fProof)
1018 // Notify
1019 Info("HandleInput", "kXPD_priority: group %s priority set to %f",
1020 fGroup.Data(), (Float_t) fGroupPriority / 100.);
1021
1022 } else if (acod == kXPD_clusterinfo) {
1023
1024 // Information about the cluster status
1025 fTotSessions = hin->fInt2;
1026 fActSessions = hin->fInt3;
1027 fEffSessions = (hin->fInt4)/1000.;
1028 // Notify
1029 Info("HandleInput", "kXPD_clusterinfo: tot: %d, act: %d, eff: %f",
1031
1032 } else {
1033 // Standard socket input
1035 // This request has been completed: remove the client ID from the pipe
1036 ((TXSocket *)fSocket)->RemoveClientID();
1037 }
1038
1039 // We are done
1040 return kTRUE;
1041}
1042
1043////////////////////////////////////////////////////////////////////////////////
1044/// Disable read timeout on the underlying socket
1045
1047{
1048 if (fSocket)
1049 ((TXSocket *)fSocket)->DisableTimeout();
1050}
1051
1052////////////////////////////////////////////////////////////////////////////////
1053/// Enable read timeout on the underlying socket
1054
1056{
1057 if (fSocket)
1058 ((TXSocket *)fSocket)->EnableTimeout();
1059}
1060
1061////////////////////////////////////////////////////////////////////////////////
1062/// Terminate the proof server.
1063
1065{
1066 if (fTerminated)
1067 // Avoid doubling the exit operations
1068 exit(1);
1070
1071 // Notify
1072 Info("Terminate", "starting session termination operations ...");
1073 if (fgLogToSysLog > 0) {
1074 TString s;
1075 s.Form("%s -1 %.3f %.3f", fgSysLogEntity.Data(), fRealTime, fCpuTime);
1077 }
1078
1079 // Notify the memory footprint
1080 ProcInfo_t pi;
1081 if (!gSystem->GetProcInfo(&pi)){
1082 Info("Terminate", "process memory footprint: %ld/%ld kB virtual, %ld/%ld kB resident ",
1084 }
1085
1086 // Deactivate current monitor, if any
1087 if (fProof)
1089
1090 // Cleanup session directory
1091 if (status == 0) {
1092 // make sure we remain in a "connected" directory
1094 // needed in case fSessionDir is on NFS ?!
1095 gSystem->MakeDirectory(fSessionDir+"/.delete");
1096 gSystem->Exec(Form("%s %s", kRM, fSessionDir.Data()));
1097 }
1098
1099 // Cleanup queries directory if empty
1100 if (IsMaster()) {
1101 if (!(fQMgr && fQMgr->Queries() && fQMgr->Queries()->GetSize())) {
1102 // make sure we remain in a "connected" directory
1104 // needed in case fQueryDir is on NFS ?!
1105 gSystem->MakeDirectory(fQueryDir+"/.delete");
1106 gSystem->Exec(Form("%s %s", kRM, fQueryDir.Data()));
1107 // Remove lock file
1108 if (fQueryLock)
1110 }
1111
1112 // Unlock the query dir owned by this session
1113 if (fQueryLock)
1114 fQueryLock->Unlock();
1115 } else {
1116 // Try to stop processing if any
1117 Bool_t abort = (status == 0) ? kFALSE : kTRUE;
1118 if (!fIdle && fPlayer)
1119 fPlayer->StopProcess(abort,1);
1120 gSystem->Sleep(2000);
1121 }
1122
1123 // Cleanup data directory if empty
1126 Info("Terminate", "data directory '%s' has been removed", fDataDir.Data());
1127 }
1128
1129 // Remove input and signal handlers to avoid spurious "signals"
1130 // for closing activities executed upon exit()
1132
1133 // Stop processing events (set a flag to exit the event loop)
1134 gSystem->ExitLoop();
1135
1136 // We post the pipe once to wake up the main thread which is waiting for
1137 // activity on this socket; this fake activity will make it return and
1138 // eventually exit the loop.
1140
1141 // Notify
1142 Printf("Terminate: termination operations ended: quitting!");
1143}
1144
1145////////////////////////////////////////////////////////////////////////////////
1146/// Try locking query area of session tagged sessiontag.
1147/// The id of the locking file is returned in fid and must be
1148/// unlocked via UnlockQueryFile(fid).
1149
1150Int_t TXProofServ::LockSession(const char *sessiontag, TProofLockPath **lck)
1151{
1152 // We do not need to lock our own session
1153 if (strstr(sessiontag, fTopSessionTag))
1154 return 0;
1155
1156 if (!lck) {
1157 Info("LockSession","locker space undefined");
1158 return -1;
1159 }
1160 *lck = 0;
1161
1162 // Check the format
1163 TString stag = sessiontag;
1164 TRegexp re("session-.*-.*-.*");
1165 Int_t i1 = stag.Index(re);
1166 if (i1 == kNPOS) {
1167 Info("LockSession","bad format: %s", sessiontag);
1168 return -1;
1169 }
1170 stag.ReplaceAll("session-","");
1171
1172 // Drop query number, if any
1173 Int_t i2 = stag.Index(":q");
1174 if (i2 != kNPOS)
1175 stag.Remove(i2);
1176
1177 // Make sure that parent process does not exist anylonger
1178 TString parlog = fSessionDir;
1179 parlog = parlog.Remove(parlog.Index("master-")+strlen("master-"));
1180 parlog += stag;
1181 if (!gSystem->AccessPathName(parlog)) {
1182 Info("LockSession","parent still running: do nothing");
1183 return -1;
1184 }
1185
1186 // Lock the query lock file
1187 TString qlock = fQueryLock->GetName();
1188 qlock.ReplaceAll(fTopSessionTag, stag);
1189
1190 if (!gSystem->AccessPathName(qlock)) {
1191 *lck = new TProofLockPath(qlock);
1192 if (((*lck)->Lock()) < 0) {
1193 Info("LockSession","problems locking query lock file");
1194 SafeDelete(*lck);
1195 return -1;
1196 }
1197 }
1198
1199 // We are done
1200 return 0;
1201}
1202
1203////////////////////////////////////////////////////////////////////////////////
1204/// Send message to intermediate coordinator to release worker of last ordinal
1205/// ord.
1206
1207void TXProofServ::ReleaseWorker(const char *ord)
1208{
1209 if (gDebug > 2) Info("ReleaseWorker","releasing: %s", ord);
1210
1211 ((TXSocket *)fSocket)->SendCoordinator(kReleaseWorker, ord);
1212}
#define SafeDelete(p)
Definition RConfig.hxx:547
#define h(i)
Definition RSha256.hxx:106
const Ssiz_t kNPOS
Definition RtypesCore.h:115
int Int_t
Definition RtypesCore.h:45
const Bool_t kFALSE
Definition RtypesCore.h:92
long Long_t
Definition RtypesCore.h:54
bool Bool_t
Definition RtypesCore.h:63
float Float_t
Definition RtypesCore.h:57
const Bool_t kTRUE
Definition RtypesCore.h:91
#define ClassImp(name)
Definition Rtypes.h:364
R__EXTERN TEnv * gEnv
Definition TEnv.h:171
int type
Definition TGX11.cxx:121
#define gInterpreter
#define PDB(mask, level)
Definition TProofDebug.h:56
R__EXTERN Int_t gProofDebugLevel
Definition TProofDebug.h:54
static volatile Int_t gProofServDebug
const char *const kRM
Definition TProof.h:142
const char *const kPROOF_WorkDir
Definition TProof.h:124
const Int_t kPROOF_Protocol
Definition TProof.h:120
Int_t gDebug
Definition TROOT.cxx:590
#define gROOT
Definition TROOT.h:406
char * Form(const char *fmt,...)
void Printf(const char *fmt,...)
@ kSigTermination
@ kSigInterrupt
@ kSigSegmentationViolation
@ kKeepAlive
Definition TSystem.h:219
@ kNoDelay
Definition TSystem.h:221
@ kReadPermission
Definition TSystem.h:47
@ kWritePermission
Definition TSystem.h:46
@ kLogNotice
Definition TSystem.h:61
R__EXTERN TSystem * gSystem
Definition TSystem.h:559
static volatile Int_t gProofServDebug
TApplication * GetTXProofServ(Int_t *argc, char **argv, FILE *flog)
@ kReleaseWorker
@ kGetWorkers
const char *const XPD_GW_QueryEnqueued
@ kXPD_flush
@ kXPD_msg
@ kXPD_urgent
@ kXPD_clusterinfo
@ kXPD_interrupt
@ kXPD_ping
@ kXPD_inflate
@ kXPD_priority
const char *const XPD_GW_Static
#define NAME_DEBUG
#define EnvPutInt(name, val)
const char * proto
Definition civetweb.c:16604
#define snprintf
Definition civetweb.c:1540
This class creates the ROOT Application Environment that interfaces to the windowing system eventloop...
char ** Argv() const
Bool_t NoLogOpt() const
virtual Long_t ProcessLine(const char *line, Bool_t sync=kFALSE, Int_t *error=0)
Process a single command line, either a C++ statement or an interpreter command starting with a "....
virtual Long_t ProcessFile(const char *file, Int_t *error=0, Bool_t keep=kFALSE)
Process a file containing a C++ macro.
Int_t Argc() const
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
virtual Bool_t IsEmpty() const
virtual Int_t GetSize() const
Return the capacity of the collection, i.e.
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
Definition TEnv.cxx:491
virtual void SetValue(const char *name, const char *value, EEnvLevel level=kEnvChange, const char *type=nullptr)
Set the value of a resource or create a new resource.
Definition TEnv.cxx:736
virtual Bool_t Notify()
Notify when event occurred on descriptor associated with this handler.
virtual Bool_t ReadNotify()
Notify when something can be read from the descriptor associated with this handler.
A doubly linked list.
Definition TList.h:44
virtual void Add(TObject *obj)
Definition TList.h:87
virtual TObject * FindObject(const char *name) const
Find an object in this list using its name.
Definition TList.cxx:578
virtual TObject * Last() const
Return the last object in the list. Returns 0 when list is empty.
Definition TList.cxx:693
virtual void Delete(Option_t *option="")
Remove all objects from the list AND delete all heap based objects.
Definition TList.cxx:470
virtual TObject * First() const
Return the first object in the list. Returns 0 when list is empty.
Definition TList.cxx:659
virtual void SetTitle(const char *title="")
Set the title of the TNamed.
Definition TNamed.cxx:164
virtual const char * GetName() const
Returns name of object.
Definition TNamed.h:47
Collectable string class.
Definition TObjString.h:28
const char * GetName() const
Returns name of object.
Definition TObjString.h:38
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition TObject.cxx:879
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition TObject.cxx:893
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition TObject.cxx:867
Named parameter, streamable and storable.
Definition TParameter.h:35
void SetVal(const AParamType &val)
Definition TParameter.h:69
const AParamType & GetVal() const
Definition TParameter.h:67
const char * GetName() const
Returns name of object.
Definition TParameter.h:66
This class implements a plugin library manager.
TPluginHandler * FindHandler(const char *base, const char *uri=0)
Returns the handler if there exists a handler for the specified URI.
Int_t Unlock()
Unlock the directory.
The purpose of this class is to provide a complete node description for masters, submasters and worke...
const TString & GetImage() const
const TString & GetOrdinal() const
const TString & GetNodeName() const
TQueryResult version adapted to PROOF neeeds.
Class providing the PROOF server.
Definition TProofServ.h:66
Float_t fCpuTime
Definition TProofServ.h:114
TString fQueryDir
Definition TProofServ.h:88
Int_t fProtocol
Definition TProofServ.h:103
void Interrupt()
Definition TProofServ.h:291
Int_t CatMotd()
Print message of the day (in the file pointed by the env PROOFMOTD or from fConfDir/etc/proof/motd).
Int_t fLogLevel
Definition TProofServ.h:107
FILE * fLogFile
Definition TProofServ.h:100
virtual EQueryAction GetWorkers(TList *workers, Int_t &prioritychange, Bool_t resume=kFALSE)
Get list of workers to be used from now on.
TString fDataDir
Definition TProofServ.h:90
static Long_t fgResMemMax
Definition TProofServ.h:160
TString fSessionDir
Definition TProofServ.h:85
TString fAdminPath
Definition TProofServ.h:92
void FlushLogFile()
Reposition the read pointer in the log file to the very end.
TString fService
Definition TProofServ.h:76
Int_t fLogFileDes
Definition TProofServ.h:101
TProofLockPath * fQueryLock
Definition TProofServ.h:95
void RedirectOutput(const char *dir=0, const char *mode="w")
Redirect stdout to a log file.
Bool_t fMasterServ
Definition TProofServ.h:111
TSocket * fSocket
Definition TProofServ.h:97
Int_t fCompressMsg
Definition TProofServ.h:142
Int_t SetupCommon()
Common part (between TProofServ and TXProofServ) of the setup phase.
void SendAsynMessage(const char *msg, Bool_t lf=kTRUE)
Send an asychronous message to the master / client .
TVirtualProofPlayer * fPlayer
Definition TProofServ.h:99
TString fSessionTag
Definition TProofServ.h:83
Bool_t IsMaster() const
Definition TProofServ.h:293
TProof * fProof
Definition TProofServ.h:98
TString fOrdinal
Definition TProofServ.h:104
Bool_t fEndMaster
Definition TProofServ.h:110
virtual void HandleSocketInput()
Handle input coming from the client or from the master server.
TString fWorkDir
Definition TProofServ.h:81
TShutdownTimer * fShutdownTimer
Definition TProofServ.h:138
Int_t fActSessions
Definition TProofServ.h:121
TString fGroup
Definition TProofServ.h:78
Bool_t fRealTimeLog
Definition TProofServ.h:136
static Long_t fgVirtMemMax
Definition TProofServ.h:159
TString fImage
Definition TProofServ.h:82
TString fTopSessionTag
Definition TProofServ.h:84
Float_t fEffSessions
Definition TProofServ.h:122
TString fConfDir
Definition TProofServ.h:79
TIdleTOTimer * fIdleTOTimer
Definition TProofServ.h:140
static Int_t fgLogToSysLog
Definition TProofServ.h:172
Bool_t UnlinkDataDir(const char *path)
Scan recursively the datadir and unlink it if empty Return kTRUE if it can be unlinked,...
friend class TXProofServ
Definition TProofServ.h:69
TString fConfFile
Definition TProofServ.h:80
Int_t fTotSessions
Definition TProofServ.h:120
TString fUser
Definition TProofServ.h:77
TQueryResultManager * fQMgr
Definition TProofServ.h:126
TList * fWaitingQueries
Definition TProofServ.h:128
static TString fgSysLogEntity
Definition TProofServ.h:174
void LogToMaster(Bool_t on=kTRUE)
Definition TProofServ.h:322
Int_t fGroupPriority
Definition TProofServ.h:109
virtual void SendLogFile(Int_t status=0, Int_t start=-1, Int_t end=-1)
Send log file to master.
Float_t fRealTime
Definition TProofServ.h:113
Int_t UpdateSessionStatus(Int_t xst=-1)
Update the session status in the relevant file.
Bool_t fIdle
Definition TProofServ.h:129
This class controls a Parallel ROOT Facility, PROOF, cluster.
Definition TProof.h:316
Int_t BroadcastGroupPriority(const char *grp, Int_t priority, ESlaves list=kAllUnique)
Broadcast the group priority to all workers in the specified list.
Definition TProof.cxx:2441
@ kShutdownInterrupt
Definition TProof.h:398
@ kHardInterrupt
Definition TProof.h:396
@ kPing
Definition TProof.h:395
@ kSoftInterrupt
Definition TProof.h:397
Bool_t IsEndMaster() const
Definition TProof.h:664
void Close(Option_t *option="")
Close all open slave servers.
Definition TProof.cxx:1788
TList * fActiveSlaves
Definition TProof.h:477
void InterruptCurrentMonitor()
If in active in a monitor set ready state.
Definition TProof.cxx:11325
Bool_t IsValid() const
Definition TProof.h:937
Int_t Collect(const TSlave *sl, Long_t timeout=-1, Int_t endtype=-1, Bool_t deactonfail=kFALSE)
Collect responses from slave sl.
Definition TProof.cxx:2659
void SetActive(Bool_t=kTRUE)
Definition TProof.h:988
void SetMonitor(TMonitor *mon=0, Bool_t on=kTRUE)
Activate (on == TRUE) or deactivate (on == FALSE) all sockets monitored by 'mon'.
Definition TProof.cxx:2398
@ kActive
Definition TProof.h:564
Int_t Ping(ESlaves list)
Ping PROOF slaves. Returns the number of slaves that responded.
Definition TProof.cxx:4742
void StopProcess(Bool_t abort, Int_t timeout=-1)
Send STOPPROCESS message to master and workers.
Definition TProof.cxx:6214
void Interrupt(EUrgent type, ESlaves list=kActive)
Send interrupt to master or slave servers.
Definition TProof.cxx:2266
virtual void SaveWorkerInfo()
Save information about the worker set in the file .workers in the working dir.
Definition TProof.cxx:11798
static const char * GetMacroPath()
Get macro search path. Static utility function.
Definition TROOT.cxx:2723
Regular expression class.
Definition TRegexp.h:31
virtual Bool_t Notify()
Notify when signal occurs.
virtual Int_t SetOption(ESockOptions opt, Int_t val)
Set socket options.
Definition TSocket.cxx:1013
virtual Int_t GetDescriptor() const
Definition TSocket.h:112
void SetCompressionSettings(Int_t settings=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault)
Used to specify the compression level and algorithm: settings = 100 * algorithm + level.
Definition TSocket.cxx:1098
virtual Bool_t IsValid() const
Definition TSocket.h:132
virtual Int_t Reconnect()
Definition TSocket.h:138
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
Definition TSocket.cxx:522
Basic string class.
Definition TString.h:136
Ssiz_t Length() const
Definition TString.h:410
TString & Insert(Ssiz_t pos, const char *s)
Definition TString.h:649
Int_t Atoi() const
Return integer value of string.
Definition TString.cxx:1941
Bool_t EndsWith(const char *pat, ECaseCompare cmp=kExact) const
Return true if string ends with the specified string.
Definition TString.cxx:2197
const char * Data() const
Definition TString.h:369
Bool_t IsDigit() const
Returns true if all characters in string are digits (0-9) or white spaces, i.e.
Definition TString.cxx:1783
TString & ReplaceAll(const TString &s1, const TString &s2)
Definition TString.h:692
Ssiz_t Last(char c) const
Find last occurrence of a character c.
Definition TString.cxx:912
TObjArray * Tokenize(const TString &delim) const
This function is used to isolate sequential tokens in a TString.
Definition TString.cxx:2217
Bool_t BeginsWith(const char *s, ECaseCompare cmp=kExact) const
Definition TString.h:615
Bool_t IsNull() const
Definition TString.h:407
Int_t CountChar(Int_t c) const
Return number of times character c occurs in the string.
Definition TString.cxx:496
TString & Remove(Ssiz_t pos)
Definition TString.h:673
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
Definition TString.cxx:2309
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
Definition TString.h:624
Ssiz_t Index(const char *pat, Ssiz_t i=0, ECaseCompare cmp=kExact) const
Definition TString.h:639
virtual void AddFileHandler(TFileHandler *fh)
Add a file handler to the list of system file handlers.
Definition TSystem.cxx:555
virtual void Syslog(ELogLevel level, const char *mess)
Send mess to syslog daemon.
Definition TSystem.cxx:1682
static void ResetErrno()
Static function resetting system error number.
Definition TSystem.cxx:277
virtual Bool_t ExpandPathName(TString &path)
Expand a pathname getting rid of special shell characters like ~.
Definition TSystem.cxx:1272
static Int_t GetErrno()
Static function returning system error number.
Definition TSystem.cxx:261
virtual int GetPid()
Get process id.
Definition TSystem.cxx:708
virtual const char * Getenv(const char *env)
Get environment variable.
Definition TSystem.cxx:1661
virtual int MakeDirectory(const char *name)
Make a directory.
Definition TSystem.cxx:826
virtual Int_t Exec(const char *shellcmd)
Execute a command.
Definition TSystem.cxx:654
virtual TFileHandler * RemoveFileHandler(TFileHandler *fh)
Remove a file handler from the list of file handlers.
Definition TSystem.cxx:565
virtual Bool_t AccessPathName(const char *path, EAccessMode mode=kFileExists)
Returns FALSE if one can access a file using the specified access mode.
Definition TSystem.cxx:1294
virtual void ExitLoop()
Exit from event loop.
Definition TSystem.cxx:393
virtual Bool_t ChangeDirectory(const char *path)
Change directory.
Definition TSystem.cxx:861
virtual int GetProcInfo(ProcInfo_t *info) const
Returns cpu and memory used by this process into the ProcInfo_t structure.
Definition TSystem.cxx:2493
virtual void AddSignalHandler(TSignalHandler *sh)
Add a signal handler to list of system signal handlers.
Definition TSystem.cxx:533
virtual const char * HostName()
Return the system's host name.
Definition TSystem.cxx:304
virtual void Sleep(UInt_t milliSec)
Sleep milliSec milli seconds.
Definition TSystem.cxx:438
virtual char * Which(const char *search, const char *file, EAccessMode mode=kFileExists)
Find location of file in a search path.
Definition TSystem.cxx:1544
virtual TString GetDirName(const char *pathname)
Return the directory name in pathname.
Definition TSystem.cxx:1030
virtual int Unlink(const char *name)
Unlink, i.e.
Definition TSystem.cxx:1379
virtual UserGroup_t * GetUserInfo(Int_t uid)
Returns all user info in the UserGroup_t structure.
Definition TSystem.cxx:1597
virtual void Start(Long_t milliSec=-1, Bool_t singleShot=kFALSE)
Starts the timer with a milliSec timeout.
Definition TTimer.cxx:211
virtual void StopProcess(Bool_t abort, Int_t timeout=-1)=0
This class implements the XProofD version of TProofServ, with respect to which it differs only for th...
Definition TXProofServ.h:30
void EnableTimeout()
Enable read timeout on the underlying socket.
TString fSockPath
Definition TXProofServ.h:35
Int_t Setup()
Print the ProofServ logo on standard output.
void Terminate(Int_t status)
Terminate the proof server.
void HandleTermination()
Called when the client is not alive anymore; terminate the session.
Bool_t fTerminated
Definition TXProofServ.h:37
EQueryAction GetWorkers(TList *workers, Int_t &prioritychange, Bool_t resume=kFALSE)
Get list of workers to be used from now on.
virtual ~TXProofServ()
Cleanup.
void DisableTimeout()
Disable read timeout on the underlying socket.
void HandleSigPipe()
Called when the client is not alive anymore; terminate the session.
Bool_t HandleError(const void *in=0)
Handle error on the input socket.
Bool_t HandleInput(const void *in=0)
Handle asynchronous input on the input socket.
TXProofServInterruptHandler * fInterruptHandler
Definition TXProofServ.h:33
void ReleaseWorker(const char *ord)
Send message to intermediate coordinator to release worker of last ordinal ord.
Int_t LockSession(const char *sessiontag, TProofLockPath **lck)
Try locking query area of session tagged sessiontag.
TXSocketHandler * fInputHandler
Definition TXProofServ.h:34
Int_t CreateServer()
Finalize the server setup.
void HandleUrgentData()
Handle high priority data sent by the master or client.
Int_t Post(TSocket *s)
Write a byte to the global pipe to signal new availibility of new messages.
static TXSocketHandler * GetSocketHandler(TFileHandler *h=0, TSocket *s=0)
Get an instance of the input socket handler with 'h' as handler, connected to socket 's'.
High level handler of connections to XProofD.
Definition TXSocket.h:59
static void SetLocation(const char *loc="")
Set location string.
Definition TXSocket.cxx:242
@ kStopProcess
Definition TXSocket.h:139
static TXSockPipe fgPipe
Definition TXSocket.h:111
Implementation of TXSocket using PF_UNIX sockets.
Long_t fMemVirtual
Definition TSystem.h:196
Long_t fMemResident
Definition TSystem.h:195
TString fUser
Definition TSystem.h:141
Int_t fInt2
Definition TXSocket.h:50
Int_t fInt4
Definition TXSocket.h:52
Int_t fInt1
Definition TXSocket.h:49
Int_t fInt3
Definition TXSocket.h:51