Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
TProofServ.cxx
Go to the documentation of this file.
1// @(#)root/proof:$Id$
2// Author: Fons Rademakers 16/02/97
3
4/*************************************************************************
5 * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12/** \class TProofServ
13\ingroup proofkernel
14
15Class providing the PROOF server. It can act either as the master
16server or as a slave server, depending on its startup arguments. It
17receives and handles message coming from the client or from the
18master server.
19
20*/
21
22#include "RConfigure.h"
23#include <ROOT/RConfig.hxx>
24#include "Riostream.h"
25
26#ifdef WIN32
27 #include <process.h>
28 #include <io.h>
29 #include "snprintf.h"
30 typedef long off_t;
31#endif
32#include <errno.h>
33#include <time.h>
34#include <fcntl.h>
35#include <sys/types.h>
36#include <sys/stat.h>
37#ifndef WIN32
38#include <sys/wait.h>
39#endif
40#include <cstdlib>
41
42// To handle exceptions
43#include <exception>
44#include <new>
45
46#if (defined(__FreeBSD__) && (__FreeBSD__ < 4)) || \
47 (defined(__APPLE__) && (!defined(MAC_OS_X_VERSION_10_3) || \
48 (MAC_OS_X_VERSION_MAX_ALLOWED < MAC_OS_X_VERSION_10_3)))
49#include <sys/file.h>
50#define lockf(fd, op, sz) flock((fd), (op))
51#ifndef F_LOCK
52#define F_LOCK (LOCK_EX | LOCK_NB)
53#endif
54#ifndef F_ULOCK
55#define F_ULOCK LOCK_UN
56#endif
57#endif
58
59#include "TProofServ.h"
60#include "TDSetProxy.h"
61#include "TEnv.h"
62#include "TError.h"
63#include "TEventList.h"
64#include "TEntryList.h"
65#include "TException.h"
66#include "TFile.h"
67#include "THashList.h"
68#include "TInterpreter.h"
69#include "TKey.h"
70#include "TMessage.h"
71#include "TVirtualPerfStats.h"
72#include "TProofDebug.h"
73#include "TProof.h"
74#include "TVirtualProofPlayer.h"
75#include "TProofQueryResult.h"
76#include "TQueryResultManager.h"
77#include "TRegexp.h"
78#include "TROOT.h"
79#include "TObjArray.h"
80#include "TSocket.h"
81#include "TStopwatch.h"
82#include "TSystem.h"
83#include "TTimeStamp.h"
84#include "TUrl.h"
85#include "TPluginManager.h"
86#include "TObjString.h"
87#include "compiledata.h"
89#include "TProofNodeInfo.h"
90#include "TFileInfo.h"
91#include "TClass.h"
92#include "TSQLServer.h"
93#include "TSQLResult.h"
94#include "TSQLRow.h"
95#include "TPRegexp.h"
96#include "TParameter.h"
97#include "TMap.h"
98#include "TSortedList.h"
99#include "TFileCollection.h"
100#include "TLockFile.h"
101#include "TDataSetManagerFile.h"
102#include "TProofProgressStatus.h"
103#include "TServerSocket.h"
104#include "TMonitor.h"
105#include "TProofOutputFile.h"
106#include "TSelector.h"
107#include "TPackMgr.h"
108
109// global proofserv handle
111
112// debug hook
113static volatile Int_t gProofServDebug = 1;
114
115// Syslog control
118TString TProofServ::fgSysLogEntity("undef:default");
119
120// File where to log: default stderr
122
123// Integrate with crash reporter.
124#ifdef __APPLE__
125extern "C" {
126static const char *__crashreporter_info__ = 0;
127asm(".desc ___crashreporter_info__, 0x10");
128}
129#endif
130
131// To control allowed actions while processing
133
134// Last message and entry before exceptions
137
138// Memory controllers
143
144// Async Logger
145static void SendAsynMsg(const char *msg) {
147}
148
149//----- Termination signal handler ---------------------------------------------
150////////////////////////////////////////////////////////////////////////////////
151
154public:
157 Bool_t Notify() override;
158};
159
160////////////////////////////////////////////////////////////////////////////////
161/// Handle this interrupt
162
164{
165 Printf("Received SIGTERM: terminating");
167 return kTRUE;
168}
169
170//----- Interrupt signal handler -----------------------------------------------
171////////////////////////////////////////////////////////////////////////////////
172
175public:
178 Bool_t Notify() override;
179};
180
181////////////////////////////////////////////////////////////////////////////////
182/// Handle this interrupt
183
185{
187 if (TROOT::Initialized()) {
188 Throw(GetSignal());
189 }
190 return kTRUE;
191}
192
193//----- SigPipe signal handler -------------------------------------------------
194////////////////////////////////////////////////////////////////////////////////
195
198public:
200 { fServ = s; }
201 Bool_t Notify() override;
202};
203
204////////////////////////////////////////////////////////////////////////////////
205/// Handle this signal
206
208{
210 return kTRUE;
211}
212
213//----- Input handler for messages from parent or master -----------------------
214////////////////////////////////////////////////////////////////////////////////
215
218public:
220 { fServ = s; }
221 Bool_t Notify() override;
222 Bool_t ReadNotify() override { return Notify(); }
223};
224
225////////////////////////////////////////////////////////////////////////////////
226/// Handle this input
227
229{
231 return kTRUE;
232}
233
234TString TProofServLogHandler::fgPfx = ""; // Default prefix to be prepended to messages
235Int_t TProofServLogHandler::fgCmdRtn = 0; // Return code of the command execution (available only
236 // after closing the pipe)
237////////////////////////////////////////////////////////////////////////////////
238/// Execute 'cmd' in a pipe and handle output messages from the related file
239
241 TSocket *s, const char *pfx)
242 : TFileHandler(-1, 1), fSocket(s), fPfx(pfx)
243{
245 fgCmdRtn = 0;
246 fFile = 0;
247 if (s && cmd) {
248 fFile = gSystem->OpenPipe(cmd, "r");
249 if (fFile) {
250 SetFd(fileno(fFile));
251 // Notify what already in the file
252 Notify();
253 // Used in the destructor
255 } else {
256 fSocket = 0;
257 Error("TProofServLogHandler", "executing command in pipe");
258 fgCmdRtn = -1;
259 }
260 } else {
261 Error("TProofServLogHandler",
262 "undefined command (%p) or socket (%p)", (int *)cmd, s);
263 }
264}
265////////////////////////////////////////////////////////////////////////////////
266/// Handle available message from the open file 'f'
267
269 : TFileHandler(-1, 1), fSocket(s), fPfx(pfx)
270{
272 fgCmdRtn = 0;
273 fFile = 0;
274 if (s && f) {
275 fFile = f;
276 SetFd(fileno(fFile));
277 // Notify what already in the file
278 Notify();
279 } else {
280 Error("TProofServLogHandler", "undefined file (%p) or socket (%p)", f, s);
281 }
282}
283////////////////////////////////////////////////////////////////////////////////
284/// Handle available message in the open file
285
287{
288 if (TestBit(kFileIsPipe) && fFile) {
290#ifdef WIN32
291 fgCmdRtn = rc;
292#else
293 fgCmdRtn = WIFEXITED(rc) ? WEXITSTATUS(rc) : -1;
294#endif
295 }
296 fFile = 0;
297 fSocket = 0;
299}
300////////////////////////////////////////////////////////////////////////////////
301/// Handle available message in the open file
302
304{
305 if (IsValid()) {
307 // Read buffer
308 char line[4096];
309 char *plf = 0;
310 while (fgets(line, sizeof(line), fFile)) {
311 if ((plf = strchr(line, '\n')))
312 *plf = 0;
313 // Create log string
314 TString log;
315 if (fPfx.Length() > 0) {
316 // Prepend prefix specific to this instance
317 log.Form("%s: %s", fPfx.Data(), line);
318 } else if (fgPfx.Length() > 0) {
319 // Prepend default prefix
320 log.Form("%s: %s", fgPfx.Data(), line);
321 } else {
322 // Nothing to prepend
323 log = line;
324 }
325 // Send the message one level up
326 m.Reset(kPROOF_MESSAGE);
327 m << log;
328 fSocket->Send(m);
329 }
330 }
331 return kTRUE;
332}
333////////////////////////////////////////////////////////////////////////////////
334/// Static method to set the default prefix
335
337{
338 fgPfx = pfx;
339}
340////////////////////////////////////////////////////////////////////////////////
341/// Static method to get the return code from the execution of a command via
342/// the pipe. This is always 0 when the log handler is not used with a pipe
343
345{
346 return fgCmdRtn;
347}
348
349////////////////////////////////////////////////////////////////////////////////
350/// Init a guard for executing a command in a pipe
351
353 const char *pfx, Bool_t on)
354{
355 fExecHandler = 0;
356 if (cmd && on) {
357 fExecHandler = new TProofServLogHandler(cmd, s, pfx);
358 if (fExecHandler->IsValid()) {
360 } else {
361 Error("TProofServLogHandlerGuard","invalid handler");
362 }
363 } else {
364 if (on)
365 Error("TProofServLogHandlerGuard","undefined command");
366 }
367}
368
369////////////////////////////////////////////////////////////////////////////////
370/// Init a guard for executing a command in a pipe
371
373 const char *pfx, Bool_t on)
374{
375 fExecHandler = 0;
376 if (f && on) {
377 fExecHandler = new TProofServLogHandler(f, s, pfx);
378 if (fExecHandler->IsValid()) {
380 } else {
381 Error("TProofServLogHandlerGuard","invalid handler");
382 }
383 } else {
384 if (on)
385 Error("TProofServLogHandlerGuard","undefined file");
386 }
387}
388
389////////////////////////////////////////////////////////////////////////////////
390/// Close a guard for executing a command in a pipe
391
393{
397 }
398}
399
400//--- Special timer to control delayed shutdowns ----------------------------//
401////////////////////////////////////////////////////////////////////////////////
402/// Construtor
403
405 : TTimer(delay, kFALSE), fProofServ(p)
406{
407 fTimeout = gEnv->GetValue("ProofServ.ShutdownTimeout", 20);
408 // Backward compaitibility: until 5.32 the variable was called ProofServ.ShutdonwTimeout
409 fTimeout = gEnv->GetValue("ProofServ.ShutdonwTimeout", fTimeout);
410}
411
412////////////////////////////////////////////////////////////////////////////////
413/// Handle expiration of the shutdown timer. In the case of low activity the
414/// process will be aborted.
415
417{
418 if (gDebug > 0)
419 printf("TShutdownTimer::Notify: checking activity on the input socket\n");
420
421 // Check activity on the socket
422 TSocket *xs = 0;
423 if (fProofServ && (xs = fProofServ->GetSocket())) {
424 TTimeStamp now;
425 TTimeStamp ts = xs->GetLastUsage();
426 Long_t dt = (Long_t)(now.GetSec() - ts.GetSec()) * 1000 +
427 (Long_t)(now.GetNanoSec() - ts.GetNanoSec()) / 1000000 ;
428 if (dt > fTimeout * 60000) {
429 printf("TShutdownTimer::Notify: input socket: %p: did not show any activity"
430 " during the last %d mins: aborting\n", xs, fTimeout);
431 // At this point we lost our controller: we need to abort to avoid
432 // hidden timeouts or loops
433 gSystem->Abort();
434 } else {
435 if (gDebug > 0)
436 printf("TShutdownTimer::Notify: input socket: %p: show activity"
437 " %ld secs ago\n", xs, dt / 60000);
438 }
439 }
440 // Needed for the next shot
441 Reset();
442 return kTRUE;
443}
444
445//--- Synchronous timer used to reap children processes change of state ------//
446////////////////////////////////////////////////////////////////////////////////
447/// Destructor
448
450{
451 if (fChildren) {
453 delete fChildren;
454 fChildren = 0;
455 }
456}
457
458////////////////////////////////////////////////////////////////////////////////
459/// Add an entry for 'pid' in the internal list
460
462{
463 if (pid > 0) {
464 if (!fChildren)
465 fChildren = new TList;
466 TString spid;
467 spid.Form("%d", pid);
468 fChildren->Add(new TParameter<Int_t>(spid.Data(), pid));
469 TurnOn();
470 }
471}
472
473////////////////////////////////////////////////////////////////////////////////
474/// Check if any of the registered children has changed its state.
475/// Unregister those that are gone.
476
478{
479 if (fChildren) {
480 TIter nxp(fChildren);
481 TParameter<Int_t> *p = 0;
482 while ((p = (TParameter<Int_t> *)nxp())) {
483 int status;
484#ifndef WIN32
485 pid_t pid;
486 do {
487 pid = waitpid(p->GetVal(), &status, WNOHANG);
488 } while (pid < 0 && errno == EINTR);
489#else
490 intptr_t pid;
491 pid = _cwait(&status, (intptr_t)p->GetVal(), 0);
492#endif
493 if (pid > 0 && pid == p->GetVal()) {
494 // Remove from the list
496 delete p;
497 }
498 }
499 }
500
501 // Stop the timer if no children
502 if (!fChildren || fChildren->GetSize() <= 0) {
503 Stop();
504 } else {
505 // Needed for the next shot
506 Reset();
507 }
508 return kTRUE;
509}
510
511//--- Special timer to terminate idle sessions ----------------------------//
512////////////////////////////////////////////////////////////////////////////////
513/// Handle expiration of the idle timer. The session will just be terminated.
514
516{
517 Info ("Notify", "session idle for more then %lld secs: terminating", Long64_t(fTime)/1000);
518
519 if (fProofServ) {
520 // Set the status to timed-out
521 Int_t uss_rc = -1;
522 if ((uss_rc = fProofServ->UpdateSessionStatus(4)) != 0)
523 Warning("Notify", "problems updating session status (errno: %d)", -uss_rc);
524 // Send a terminate request
525 TString msg;
526 if (fProofServ->GetProtocol() < 29) {
527 msg.Form("\n//\n// PROOF session at %s (%s) terminated because idle for more than %lld secs\n"
528 "// Please IGNORE any error message possibly displayed below\n//",
530 } else {
531 msg.Form("\n//\n// PROOF session at %s (%s) terminated because idle for more than %lld secs\n//",
533 }
536 Reset();
537 Stop();
538 } else {
539 Warning("Notify", "fProofServ undefined!");
540 Start(-1, kTRUE);
541 }
542 return kTRUE;
543}
544
546
547// Hook to the constructor. This is needed to avoid using the plugin manager
548// which may create problems in multi-threaded environments.
549extern "C" {
550 TApplication *GetTProofServ(Int_t *argc, char **argv, FILE *flog)
551 { return new TProofServ(argc, argv, flog); }
552}
553
554////////////////////////////////////////////////////////////////////////////////
555/// Main constructor. Create an application environment. The TProofServ
556/// environment provides an eventloop via inheritance of TApplication.
557/// Actual server creation work is done in CreateServer() to allow
558/// overloading.
559
560TProofServ::TProofServ(Int_t *argc, char **argv, FILE *flog)
561 : TApplication("proofserv", argc, argv, 0, -1)
562{
563 // If test and tty, we are done
564 Bool_t xtest = (argc && *argc == 1) ? kTRUE : kFALSE;
565 if (xtest) {
566 Printf("proofserv: command line testing: OK");
567 exit(0);
568 }
569
570 // Read session specific rootrc file
571 TString rcfile = gSystem->Getenv("ROOTRCFILE") ? gSystem->Getenv("ROOTRCFILE")
572 : "session.rootrc";
574 gEnv->ReadFile(rcfile, kEnvChange);
575
576 // Upper limit on Virtual Memory (in kB)
577 fgVirtMemMax = gEnv->GetValue("Proof.VirtMemMax",-1);
578 if (fgVirtMemMax < 0 && gSystem->Getenv("PROOF_VIRTMEMMAX")) {
579 Long_t mmx = strtol(gSystem->Getenv("PROOF_VIRTMEMMAX"), 0, 10);
580 if (mmx < kMaxLong && mmx > 0)
581 fgVirtMemMax = mmx * 1024;
582 }
583 // Old variable for backward compatibility
584 if (fgVirtMemMax < 0 && gSystem->Getenv("ROOTPROOFASHARD")) {
585 Long_t mmx = strtol(gSystem->Getenv("ROOTPROOFASHARD"), 0, 10);
586 if (mmx < kMaxLong && mmx > 0)
587 fgVirtMemMax = mmx * 1024;
588 }
589 // Upper limit on Resident Memory (in kB)
590 fgResMemMax = gEnv->GetValue("Proof.ResMemMax",-1);
591 if (fgResMemMax < 0 && gSystem->Getenv("PROOF_RESMEMMAX")) {
592 Long_t mmx = strtol(gSystem->Getenv("PROOF_RESMEMMAX"), 0, 10);
593 if (mmx < kMaxLong && mmx > 0)
594 fgResMemMax = mmx * 1024;
595 }
596 // Thresholds for warnings and stop processing
597 fgMemStop = gEnv->GetValue("Proof.MemStop", 0.95);
598 fgMemHWM = gEnv->GetValue("Proof.MemHWM", 0.80);
599 if (fgVirtMemMax > 0 || fgResMemMax > 0) {
600 if ((fgMemStop < 0.) || (fgMemStop > 1.)) {
601 Warning("TProofServ", "requested memory fraction threshold to stop processing"
602 " (MemStop) out of range [0,1] - ignoring");
603 fgMemStop = 0.95;
604 }
605 if ((fgMemHWM < 0.) || (fgMemHWM > fgMemStop)) {
606 Warning("TProofServ", "requested memory fraction threshold for warning and finer monitoring"
607 " (MemHWM) out of range [0,MemStop] - ignoring");
608 fgMemHWM = 0.80;
609 }
610 }
611
612 // Wait (loop) to allow debugger to connect
613 Bool_t test = (argc && *argc >= 4 && !strcmp(argv[3], "test")) ? kTRUE : kFALSE;
614 if ((gEnv->GetValue("Proof.GdbHook",0) == 3 && !test) ||
615 (gEnv->GetValue("Proof.GdbHook",0) == 4 && test)) {
616 while (gProofServDebug)
617 ;
618 }
619
620 // Test instance
621 if (argc && *argc >= 4)
622 if (!strcmp(argv[3], "test"))
623 fService = "prooftest";
624
625 // crude check on number of arguments
626 if (argc && *argc < 2) {
627 Error("TProofServ", "Must have at least 1 arguments (see proofd).");
628 exit(1);
629 }
630
631 // Set global to this instance
632 gProofServ = this;
633
634 // Log control flags
636
637 // Abort on higher than kSysError's and set error handler
639 SetErrorHandlerFile(stderr);
641
642 fNcmd = 0;
643 fGroupPriority = 100;
645 fProtocol = 0;
646 fOrdinal = gEnv->GetValue("ProofServ.Ordinal", "-1");
647 fGroupId = -1;
648 fGroupSize = 0;
649 fRealTime = 0.0;
650 fCpuTime = 0.0;
651 fProof = 0;
652 fPlayer = 0;
653 fSocket = 0;
654
655 fTotSessions = -1;
656 fActSessions = -1;
657 fEffSessions = -1.;
658 fPackMgr = 0;
659
660 fLogFile = flog;
661 fLogFileDes = -1;
662
663 fArchivePath = "";
664 // Init lockers
665 fCacheLock = 0;
666 fQueryLock = 0;
667
668 fQMgr = 0;
670 fIdle = kTRUE;
671 fQuerySeqNum = -1;
672
673 fQueuedMsg = new TList;
674
676
677 fShutdownTimer = 0;
678 fReaperTimer = 0;
679 fIdleTOTimer = 0;
680
681 fDataSetManager = 0; // Initialized in Setup()
682 fDataSetStgRepo = 0; // Initialized in Setup()
683
684 fInputHandler = 0;
685
686 // Quotas disabled by default
687 fMaxQueries = -1;
688 fMaxBoxSize = -1;
689 fHWMBoxSize = -1;
690
691 // Submerger quantities
692 fMergingSocket = 0;
693 fMergingMonitor = 0;
694 fMergedWorkers = 0;
695
696 // Bit to flg high-memory footprint
698
699 // Max message size
700 fMsgSizeHWM = gEnv->GetValue("ProofServ.MsgSizeHWM", 1000000);
701
702 // Message compression
703 fCompressMsg = gEnv->GetValue("ProofServ.CompressMessage", 0);
704
705 gProofDebugLevel = gEnv->GetValue("Proof.DebugLevel",0);
707
709 if (gProofDebugLevel > 0)
710 Info("TProofServ", "DebugLevel %d Mask 0x%x", gProofDebugLevel, gProofDebugMask);
711
712 // Max log file size
713 fLogFileMaxSize = -1;
714 TString logmx = gEnv->GetValue("ProofServ.LogFileMaxSize", "");
715 if (!logmx.IsNull()) {
716 Long64_t xf = 1;
717 if (!logmx.IsDigit()) {
718 if (logmx.EndsWith("K")) {
719 xf = 1024;
720 logmx.Remove(TString::kTrailing, 'K');
721 } else if (logmx.EndsWith("M")) {
722 xf = 1024*1024;
723 logmx.Remove(TString::kTrailing, 'M');
724 } else if (logmx.EndsWith("G")) {
725 xf = 1024*1024*1024;
726 logmx.Remove(TString::kTrailing, 'G');
727 }
728 }
729 if (logmx.IsDigit()) {
730 fLogFileMaxSize = logmx.Atoi() * xf;
731 if (fLogFileMaxSize > 0)
732 Info("TProofServ", "keeping the log file size within %lld bytes", fLogFileMaxSize);
733 } else {
734 logmx = gEnv->GetValue("ProofServ.LogFileMaxSize", "");
735 Warning("TProofServ", "bad formatted log file size limit ignored: '%s'", logmx.Data());
736 }
737 }
738
739 // Parse options
740 GetOptions(argc, argv);
741
742 // Default prefix in the form '<role>-<ordinal>'
743 fPrefix = (IsMaster() ? "Mst-" : "Wrk-");
744 if (test) fPrefix = "Test";
745 if (fOrdinal != "-1")
746 fPrefix += fOrdinal;
748
749 // Syslog control
750 TString slog = gEnv->GetValue("ProofServ.LogToSysLog", "");
751 if (!(slog.IsNull())) {
752 if (slog.IsDigit()) {
753 fgLogToSysLog = slog.Atoi();
754 } else {
755 char c = (slog[0] == 'M' || slog[0] == 'm') ? 'm' : 'a';
756 c = (slog[0] == 'W' || slog[0] == 'w') ? 'w' : c;
757 Bool_t dosyslog = ((c == 'm' && IsMaster()) ||
758 (c == 'w' && !IsMaster()) || c == 'a') ? kTRUE : kFALSE;
759 if (dosyslog) {
760 slog.Remove(0,1);
761 if (slog.IsDigit()) fgLogToSysLog = slog.Atoi();
762 if (fgLogToSysLog <= 0)
763 Warning("TProofServ", "request for syslog logging ineffective!");
764 }
765 }
766 }
767 // Initialize proper service if required
768 if (fgLogToSysLog > 0) {
769 fgSysLogService = (IsMaster()) ? "proofm" : "proofw";
770 if (fOrdinal != "-1") fgSysLogService += TString::Format("-%s", fOrdinal.Data());
772 }
773
774 // Enable optimized sending of streamer infos to use embedded backward/forward
775 // compatibility support between different ROOT versions and different versions of
776 // users classes
777 Bool_t enableSchemaEvolution = gEnv->GetValue("Proof.SchemaEvolution",1);
778 if (enableSchemaEvolution) {
780 } else {
781 Info("TProofServ", "automatic schema evolution in TMessage explicitly disabled");
782 }
783}
784
785////////////////////////////////////////////////////////////////////////////////
786/// Finalize the server setup. If master, create the TProof instance to talk
787/// to the worker or submaster nodes.
788/// Return 0 on success, -1 on error
789
791{
792 // Get socket to be used (setup in proofd)
793 TString opensock = gSystem->Getenv("ROOTOPENSOCK");
794 if (opensock.Length() <= 0)
795 opensock = gEnv->GetValue("ProofServ.OpenSock", "-1");
796 Int_t sock = opensock.Atoi();
797 if (sock <= 0) {
798 Fatal("CreateServer", "Invalid socket descriptor number (%d)", sock);
799 return -1;
800 }
801 fSocket = new TSocket(sock);
802
803 // Set compression level, if any
805
806 // debug hooks
807 if (IsMaster()) {
808 // wait (loop) in master to allow debugger to connect
809 if (gEnv->GetValue("Proof.GdbHook",0) == 1) {
810 while (gProofServDebug)
811 ;
812 }
813 } else {
814 // wait (loop) in slave to allow debugger to connect
815 if (gEnv->GetValue("Proof.GdbHook",0) == 2) {
816 while (gProofServDebug)
817 ;
818 }
819 }
820
821 if (gProofDebugLevel > 0)
822 Info("CreateServer", "Service %s ConfDir %s IsMaster %d\n",
824
825 if (Setup() != 0) {
826 // Setup failure
827 LogToMaster();
828 SendLogFile();
829 Terminate(0);
830 return -1;
831 }
832
833 // Set the default prefix in the form '<role>-<ordinal>' (it was already done
834 // in the constructor, but for standard PROOF the ordinal number is only set in
835 // Setup(), so we need to do it again here)
836 TString pfx = (IsMaster() ? "Mst-" : "Wrk-");
837 pfx += GetOrdinal();
839
840 if (!fLogFile) {
842 // If for some reason we failed setting a redirection file for the logs
843 // we cannot continue
844 if (!fLogFile || (fLogFileDes = fileno(fLogFile)) < 0) {
845 LogToMaster();
846 SendLogFile(-98);
847 Terminate(0);
848 return -1;
849 }
850 } else {
851 // Use the file already open by pmain
852 if ((fLogFileDes = fileno(fLogFile)) < 0) {
853 LogToMaster();
854 SendLogFile(-98);
855 Terminate(0);
856 return -1;
857 }
858 }
859
860 // Send message of the day to the client
861 if (IsMaster()) {
862 if (CatMotd() == -1) {
863 LogToMaster();
864 SendLogFile(-99);
865 Terminate(0);
866 return -1;
867 }
868 }
869
870 // Everybody expects std::iostream to be available, so load it...
871 ProcessLine("#include <iostream>", kTRUE);
872 ProcessLine("#include <string>",kTRUE); // for std::string std::iostream.
873
874 // The following libs are also useful to have, make sure they are loaded...
875 //gROOT->LoadClass("TMinuit", "Minuit");
876 //gROOT->LoadClass("TPostScript", "Postscript");
877
878 // Load user functions
879 const char *logon;
880 logon = gEnv->GetValue("Proof.Load", (char *)0);
881 if (logon) {
882 char *mac = gSystem->Which(TROOT::GetMacroPath(), logon, kReadPermission);
883 if (mac)
884 ProcessLine(TString::Format(".L %s", logon), kTRUE);
885 delete [] mac;
886 }
887
888 // Execute logon macro
889 logon = gEnv->GetValue("Proof.Logon", (char *)0);
890 if (logon && !NoLogOpt()) {
891 char *mac = gSystem->Which(TROOT::GetMacroPath(), logon, kReadPermission);
892 if (mac)
893 ProcessFile(logon);
894 delete [] mac;
895 }
896
897 // Save current interpreter context
898 gInterpreter->SaveContext();
899 gInterpreter->SaveGlobalsContext();
900
901 // Install interrupt and message input handlers
904 fInputHandler = new TProofServInputHandler(this, sock);
906
907 // if master, start slave servers
908 if (IsMaster()) {
909 TString master = "proof://__master__";
911 if (a.IsValid()) {
912 master += ":";
913 master += a.GetPort();
914 }
915
916 // Get plugin manager to load appropriate TProof from
917 TPluginManager *pm = gROOT->GetPluginManager();
918 if (!pm) {
919 Error("CreateServer", "no plugin manager found");
920 SendLogFile(-99);
921 Terminate(0);
922 return -1;
923 }
924
925 // Find the appropriate handler
926 TPluginHandler *h = pm->FindHandler("TProof", fConfFile);
927 if (!h) {
928 Error("CreateServer", "no plugin found for TProof with a"
929 " config file of '%s'", fConfFile.Data());
930 SendLogFile(-99);
931 Terminate(0);
932 return -1;
933 }
934
935 // load the plugin
936 if (h->LoadPlugin() == -1) {
937 Error("CreateServer", "plugin for TProof could not be loaded");
938 SendLogFile(-99);
939 Terminate(0);
940 return -1;
941 }
942
943 // make instance of TProof
944 fProof = reinterpret_cast<TProof*>(h->ExecPlugin(5, master.Data(),
945 fConfFile.Data(),
946 GetConfDir(),
947 fLogLevel, 0));
948 if (!fProof || !fProof->IsValid()) {
949 Error("CreateServer", "plugin for TProof could not be executed");
951 SendLogFile(-99);
952 Terminate(0);
953 return -1;
954 }
955 // Find out if we are a master in direct contact only with workers
957
958 SendLogFile();
959 }
960
961 // Setup the shutdown timer
962 if (!fShutdownTimer) {
963 // Check activity on socket every 5 mins
964 fShutdownTimer = new TShutdownTimer(this, 300000);
966 }
967
968 // Check if schema evolution is effective: clients running versions <=17 do not
969 // support that: send a warning message
970 if (fProtocol <= 17) {
971 TString msg;
972 msg.Form("Warning: client version is too old: automatic schema evolution is ineffective.\n"
973 " This may generate compatibility problems between streamed objects.\n"
974 " The advise is to move to ROOT >= 5.21/02 .");
975 SendAsynMessage(msg.Data());
976 }
977
978 // Setup the idle timer
979 if (IsMaster() && !fIdleTOTimer) {
980 // Check activity on socket every 5 mins
981 Int_t idle_to = gEnv->GetValue("ProofServ.IdleTimeout", -1);
982 if (idle_to > 0) {
983 fIdleTOTimer = new TIdleTOTimer(this, idle_to * 1000);
985 if (gProofDebugLevel > 0)
986 Info("CreateServer", " idle timer started (%d secs)", idle_to);
987 } else if (gProofDebugLevel > 0) {
988 Info("CreateServer", " idle timer not started (no idle timeout requested)");
989 }
990 }
991
992 // Done
993 return 0;
994}
995
996////////////////////////////////////////////////////////////////////////////////
997/// Cleanup. Not really necessary since after this dtor there is no
998/// live anyway.
999
1001{
1009 close(fLogFileDes);
1010}
1011
1012////////////////////////////////////////////////////////////////////////////////
1013/// Print message of the day (in the file pointed by the env PROOFMOTD
1014/// or from fConfDir/etc/proof/motd). The motd is not shown more than
1015/// once a day. If the file pointed by env PROOFNOPROOF exists (or the
1016/// file fConfDir/etc/proof/noproof exists), show its contents and close
1017/// the connection.
1018
1020{
1021 TString lastname;
1022 FILE *motd;
1023 Bool_t show = kFALSE;
1024
1025 // If we are disabled just print the message and close the connection
1026 TString motdname(GetConfDir());
1027 // The env variable PROOFNOPROOF allows to put the file in an alternative
1028 // location not overwritten by a new installation
1029 if (gSystem->Getenv("PROOFNOPROOF")) {
1030 motdname = gSystem->Getenv("PROOFNOPROOF");
1031 } else {
1032 motdname += "/etc/proof/noproof";
1033 }
1034 if ((motd = fopen(motdname, "r"))) {
1035 Int_t c;
1036 printf("\n");
1037 while ((c = getc(motd)) != EOF)
1038 putchar(c);
1039 fclose(motd);
1040 printf("\n");
1041
1042 return -1;
1043 }
1044
1045 // get last modification time of the file ~/proof/.prooflast
1046 lastname = TString(GetWorkDir()) + "/.prooflast";
1047 gSystem->ExpandPathName(lastname);
1048 Long64_t size;
1049 Long_t id, flags, modtime, lasttime = 0;
1050 if (gSystem->GetPathInfo(lastname.Data(), &id, &size, &flags, &lasttime) == 1)
1051 lasttime = 0;
1052
1053 // show motd at least once per day
1054 if (time(0) - lasttime > (time_t)86400)
1055 show = kTRUE;
1056
1057 // The env variable PROOFMOTD allows to put the file in an alternative
1058 // location not overwritten by a new installation
1059 if (gSystem->Getenv("PROOFMOTD")) {
1060 motdname = gSystem->Getenv("PROOFMOTD");
1061 } else {
1062 motdname = GetConfDir();
1063 motdname += "/etc/proof/motd";
1064 }
1065 if (gSystem->GetPathInfo(motdname, &id, &size, &flags, &modtime) == 0) {
1066 if (modtime > lasttime || show) {
1067 if ((motd = fopen(motdname, "r"))) {
1068 Int_t c;
1069 printf("\n");
1070 while ((c = getc(motd)) != EOF)
1071 putchar(c);
1072 fclose(motd);
1073 printf("\n");
1074 }
1075 }
1076 }
1077
1078 if (lasttime)
1079 gSystem->Unlink(lastname.Data());
1080 Int_t fd = creat(lastname.Data(), 0600);
1081 if (fd >= 0) close(fd);
1082
1083 return 0;
1084}
1085
1086////////////////////////////////////////////////////////////////////////////////
1087/// Get object with name "name;cycle" (e.g. "aap;2") from master or client.
1088/// This method is called by TDirectory::Get() in case the object can not
1089/// be found locally.
1090
1091TObject *TProofServ::Get(const char *namecycle)
1092{
1093 if (fSocket->Send(namecycle, kPROOF_GETOBJECT) < 0) {
1094 Error("Get", "problems sending request");
1095 return (TObject *)0;
1096 }
1097
1098 TObject *idcur = 0;
1099
1100 Bool_t notdone = kTRUE;
1101 while (notdone) {
1102 TMessage *mess = 0;
1103 if (fSocket->Recv(mess) < 0)
1104 return 0;
1105 Int_t what = mess->What();
1106 if (what == kMESS_OBJECT) {
1107 idcur = mess->ReadObject(mess->GetClass());
1108 notdone = kFALSE;
1109 } else {
1110 Int_t xrc = HandleSocketInput(mess, kFALSE);
1111 if (xrc == -1) {
1112 Error("Get", "command %d cannot be executed while processing", what);
1113 } else if (xrc == -2) {
1114 Error("Get", "unknown command %d ! Protocol error?", what);
1115 }
1116 }
1117 delete mess;
1118 }
1119
1120 return idcur;
1121}
1122
1123////////////////////////////////////////////////////////////////////////////////
1124/// Reset the compute time
1125
1127{
1128 fCompute.Stop();
1129 if (fPlayer) {
1131 if (status) status->SetLearnTime(fCompute.RealTime());
1132 Info("RestartComputeTime", "compute time restarted after %f secs (%d entries)",
1134 }
1136}
1137
1138////////////////////////////////////////////////////////////////////////////////
1139/// Get next range of entries to be processed on this server.
1140
1142{
1143 Long64_t bytesRead = 0;
1144
1145 if (gPerfStats) bytesRead = gPerfStats->GetBytesRead();
1146
1147 if (fCompute.Counter() > 0)
1148 fCompute.Stop();
1149
1151 Double_t cputime = fCompute.CpuTime();
1152 Double_t realtime = fCompute.RealTime();
1153
1154 if (fProtocol > 18) {
1155 req << fLatency.RealTime();
1156 TProofProgressStatus *status = 0;
1157 if (fPlayer) {
1159 status = fPlayer->GetProgressStatus();
1160 } else {
1161 Error("GetNextPacket", "no progress status object");
1162 return 0;
1163 }
1164 // the CPU and wallclock proc times are kept in the TProofServ and here
1165 // added to the status object in the fPlayer.
1166 if (status->GetEntries() > 0) {
1167 PDB(kLoop, 2) status->Print(GetOrdinal());
1168 status->IncProcTime(realtime);
1169 status->IncCPUTime(cputime);
1170 }
1171 // Flag cases with problems in opening files
1172 if (totalEntries < 0) status->SetBit(TProofProgressStatus::kFileNotOpen);
1173 // Add to the message
1174 req << status;
1175 // Send tree cache information
1176 Long64_t cacheSize = (fPlayer) ? fPlayer->GetCacheSize() : -1;
1177 Int_t learnent = (fPlayer) ? fPlayer->GetLearnEntries() : -1;
1178 req << cacheSize << learnent;
1179
1180 // Sent over the number of entries in the file, used by packetizer do not relying
1181 // on initial validation. Also, -1 means that the file could not be open, which is
1182 // used to flag files as missing
1183 req << totalEntries;
1184
1185 // Send the time spent in saving the partial result to file
1186 if (fProtocol > 34) req << fSaveOutput.RealTime();
1187
1188 PDB(kLoop, 1) {
1189 PDB(kLoop, 2) status->Print();
1190 Info("GetNextPacket","cacheSize: %lld, learnent: %d", cacheSize, learnent);
1191 }
1192 // Reset the status bits
1195 status = 0; // status is owned by the player.
1196 } else {
1197 req << fLatency.RealTime() << realtime << cputime
1198 << bytesRead << totalEntries;
1199 if (fPlayer)
1200 req << fPlayer->GetEventsProcessed();
1201 }
1202
1203 fLatency.Start();
1204 Int_t rc = fSocket->Send(req);
1205 if (rc <= 0) {
1206 Error("GetNextPacket","Send() failed, returned %d", rc);
1207 return 0;
1208 }
1209
1210 // Save the current output
1211 if (fPlayer) {
1214 Warning("GetNextPacket", "problems saving partial results");
1215 fSaveOutput.Stop();
1216 }
1217
1218 TDSetElement *e = 0;
1219 Bool_t notdone = kTRUE;
1220 while (notdone) {
1221
1222 TMessage *mess;
1223 if ((rc = fSocket->Recv(mess)) <= 0) {
1224 fLatency.Stop();
1225 Error("GetNextPacket","Recv() failed, returned %d", rc);
1226 return 0;
1227 }
1228
1229 Int_t xrc = 0;
1230 TString file, dir, obj;
1231
1232 Int_t what = mess->What();
1233
1234 switch (what) {
1235 case kPROOF_GETPACKET:
1236
1237 fLatency.Stop();
1238 (*mess) >> e;
1239 if (e != 0) {
1240 fCompute.Start();
1241 PDB(kLoop, 2) Info("GetNextPacket", "'%s' '%s' '%s' %lld %lld",
1242 e->GetFileName(), e->GetDirectory(),
1243 e->GetObjName(), e->GetFirst(),e->GetNum());
1244 } else {
1245 PDB(kLoop, 2) Info("GetNextPacket", "Done");
1246 }
1247 notdone = kFALSE;
1248 break;
1249
1250 case kPROOF_STOPPROCESS:
1251 // if a kPROOF_STOPPROCESS message is returned to kPROOF_GETPACKET
1252 // GetNextPacket() will return 0 and the TPacketizer and hence
1253 // TEventIter will be stopped
1254 fLatency.Stop();
1255 PDB(kLoop, 2) Info("GetNextPacket:kPROOF_STOPPROCESS","received");
1256 break;
1257
1258 default:
1259 xrc = HandleSocketInput(mess, kFALSE);
1260 if (xrc == -1) {
1261 Error("GetNextPacket", "command %d cannot be executed while processing", what);
1262 } else if (xrc == -2) {
1263 Error("GetNextPacket", "unknown command %d ! Protocol error?", what);
1264 }
1265 break;
1266 }
1267
1268 delete mess;
1269
1270 }
1271
1272 // Done
1273 return e;
1274}
1275
1276////////////////////////////////////////////////////////////////////////////////
1277/// Get and handle command line options. Fixed format:
1278/// "proofserv"|"proofslave" `<confdir>`
1279
1280void TProofServ::GetOptions(Int_t *argc, char **argv)
1281{
1282 Bool_t xtest = (argc && *argc > 3 && !strcmp(argv[3], "test")) ? kTRUE : kFALSE;
1283
1284 // If test and tty
1285 if (xtest && !(isatty(0) == 0 || isatty(1) == 0)) {
1286 Printf("proofserv: command line testing: OK");
1287 exit(0);
1288 }
1289
1290 if (!argc || (argc && *argc <= 1)) {
1291 Fatal("GetOptions", "Must be started from proofd with arguments");
1292 exit(1);
1293 }
1294
1295 if (!strcmp(argv[1], "proofserv")) {
1297 fEndMaster = kTRUE;
1298 } else if (!strcmp(argv[1], "proofslave")) {
1301 } else {
1302 Fatal("GetOptions", "Must be started as 'proofserv' or 'proofslave'");
1303 exit(1);
1304 }
1305
1306 fService = argv[1];
1307
1308 // Confdir
1309 if (!(gSystem->Getenv("ROOTCONFDIR"))) {
1310 Fatal("GetOptions", "ROOTCONFDIR shell variable not set");
1311 exit(1);
1312 }
1313 fConfDir = gSystem->Getenv("ROOTCONFDIR");
1314}
1315
1316////////////////////////////////////////////////////////////////////////////////
1317/// Handle input coming from the client or from the master server.
1318
1320{
1321 // The idle timeout guard: stops the timer and restarts when we return from here
1323
1324 Bool_t all = (fgRecursive > 0) ? kFALSE : kTRUE;
1325 fgRecursive++;
1326
1327 TMessage *mess;
1328 Int_t rc = 0;
1329 TString exmsg;
1330
1331 // Check log file length (before the action, so we have the chance to keep the
1332 // latest logs)
1334
1335 try {
1336
1337 // Get message
1338 if (fSocket->Recv(mess) <= 0 || !mess) {
1339 // Pending: do something more intelligent here
1340 // but at least get a message in the log file
1341 Error("HandleSocketInput", "retrieving message from input socket");
1342 Terminate(0);
1343 return;
1344 }
1345 Int_t what = mess->What();
1346 PDB(kCollect, 1)
1347 Info("HandleSocketInput", "got type %d from '%s'", what, fSocket->GetTitle());
1348
1349 fNcmd++;
1350
1351 if (fProof) fProof->SetActive();
1352
1353 Bool_t doit = kTRUE;
1354
1355 while (doit) {
1356
1357 // Process the message
1358 rc = HandleSocketInput(mess, all);
1359 if (rc < 0) {
1360 TString emsg;
1361 if (rc == -1) {
1362 emsg.Form("HandleSocketInput: command %d cannot be executed while processing", what);
1363 } else if (rc == -3) {
1364 emsg.Form("HandleSocketInput: message %d undefined! Protocol error?", what);
1365 } else {
1366 emsg.Form("HandleSocketInput: unknown command %d! Protocol error?", what);
1367 }
1368 SendAsynMessage(emsg.Data());
1369 } else if (rc == 2) {
1370 // Add to the queue
1371 fQueuedMsg->Add(mess);
1372 PDB(kGlobal, 1)
1373 Info("HandleSocketInput", "message of type %d enqueued; sz: %d",
1374 what, fQueuedMsg->GetSize());
1375 mess = 0;
1376 }
1377
1378 // Still something to do?
1379 doit = 0;
1380 if (fgRecursive == 1 && fQueuedMsg->GetSize() > 0) {
1381 // Add to the queue
1382 PDB(kCollect, 1)
1383 Info("HandleSocketInput", "processing enqueued message of type %d; left: %d",
1384 what, fQueuedMsg->GetSize());
1385 all = 1;
1386 SafeDelete(mess);
1387 mess = (TMessage *) fQueuedMsg->First();
1388 if (mess) fQueuedMsg->Remove(mess);
1389 doit = 1;
1390 }
1391 }
1392
1393 } catch (std::bad_alloc &) {
1394 // Memory allocation problem:
1395 exmsg.Form("caught exception 'bad_alloc' (memory leak?) %s %lld",
1397 } catch (std::exception &exc) {
1398 // Standard exception caught
1399 exmsg.Form("caught standard exception '%s' %s %lld",
1400 exc.what(), fgLastMsg.Data(), fgLastEntry);
1401 } catch (int i) {
1402 // Other exception caught
1403 exmsg.Form("caught exception throwing %d %s %lld",
1404 i, fgLastMsg.Data(), fgLastEntry);
1405 } catch (const char *str) {
1406 // Other exception caught
1407 exmsg.Form("caught exception throwing '%s' %s %lld",
1408 str, fgLastMsg.Data(), fgLastEntry);
1409 } catch (...) {
1410 // Caught other exception
1411 exmsg.Form("caught exception <unknown> %s %lld",
1413 }
1414
1415 // Terminate on exception
1416 if (!exmsg.IsNull()) {
1417 // Save info in the log file too
1418 Error("HandleSocketInput", "%s", exmsg.Data());
1419 // Try to warn the user
1420 SendAsynMessage(TString::Format("%s: %s", GetOrdinal(), exmsg.Data()));
1421 // Terminate
1422 Terminate(0);
1423 }
1424
1425 // Terminate also if a high memory footprint was detected before the related
1426 // exception was thrwon
1428 // Save info in the log file too
1429 exmsg.Form("high-memory footprint detected during Process(...) - terminating");
1430 Error("HandleSocketInput", "%s", exmsg.Data());
1431 // Try to warn the user
1432 SendAsynMessage(TString::Format("%s: %s", GetOrdinal(), exmsg.Data()));
1433 // Terminate
1434 Terminate(0);
1435 }
1436
1437 fgRecursive--;
1438
1439 if (fProof) {
1440 // If something wrong went on during processing and we do not have
1441 // any worker anymore, we shutdown this session
1442 Bool_t masterOnly = gEnv->GetValue("Proof.MasterOnly", kFALSE);
1443 Bool_t dynamicStartup = gEnv->GetValue("Proof.DynamicStartup", kFALSE);
1445 if (rc == 0 && ngwrks == 0 && !masterOnly && !dynamicStartup) {
1446 SendAsynMessage(" *** No workers left: cannot continue! Terminating ... *** ");
1447 Terminate(0);
1448 }
1450 // Reset PROOF to running state
1452 }
1453
1454 // Cleanup
1455 SafeDelete(mess);
1456}
1457
1458////////////////////////////////////////////////////////////////////////////////
1459/// Process input coming from the client or from the master server.
1460/// If 'all' is kFALSE, process only those messages that can be handled
1461/// during query processing.
1462/// Returns -1 if the message could not be processed, <-1 if something went
1463/// wrong. Returns 1 if the action may have changed the parallel state.
1464/// Returns 2 if the message has to be enqueued.
1465/// Returns 0 otherwise
1466
1468{
1469 static TStopwatch timer;
1470 char str[2048];
1471 Bool_t aborted = kFALSE;
1472
1473 if (!mess) return -3;
1474
1475 Int_t what = mess->What();
1476 PDB(kCollect, 1)
1477 Info("HandleSocketInput", "processing message type %d from '%s'",
1478 what, fSocket->GetTitle());
1479
1480 timer.Start();
1481
1482 Int_t rc = 0, lirc = 0;
1483 TString slb;
1484 TString *pslb = (fgLogToSysLog > 0) ? &slb : (TString *)0;
1485
1486 switch (what) {
1487
1488 case kMESS_CINT:
1489 if (all) {
1490 mess->ReadString(str, sizeof(str));
1491 // Make sure that the relevant files are available
1492 TString fn;
1493
1494 Bool_t hasfn = TProof::GetFileInCmd(str, fn);
1495
1496 if (IsParallel() && fProof && !fProof->UseDynamicStartup()) {
1497 fProof->SendCommand(str);
1498 } else {
1499 PDB(kGlobal, 1)
1500 Info("HandleSocketInput:kMESS_CINT", "processing: %s...", str);
1501 TString ocwd;
1502 if (hasfn) {
1503 fCacheLock->Lock();
1504 ocwd = gSystem->WorkingDirectory();
1506 }
1507 ProcessLine(str);
1508 if (hasfn) {
1509 gSystem->ChangeDirectory(ocwd);
1510 fCacheLock->Unlock();
1511 }
1512 }
1513
1514 LogToMaster();
1515 } else {
1516 rc = -1;
1517 }
1518 SendLogFile();
1519 if (pslb) slb = str;
1520 break;
1521
1522 case kMESS_STRING:
1523 if (all) {
1524 mess->ReadString(str, sizeof(str));
1525 } else {
1526 rc = -1;
1527 }
1528 break;
1529
1530 case kMESS_OBJECT:
1531 if (all) {
1532 mess->ReadObject(mess->GetClass());
1533 } else {
1534 rc = -1;
1535 }
1536 break;
1537
1538 case kPROOF_GROUPVIEW:
1539 if (all) {
1540 mess->ReadString(str, sizeof(str));
1541 // coverity[secure_coding]
1542 sscanf(str, "%d %d", &fGroupId, &fGroupSize);
1543 } else {
1544 rc = -1;
1545 }
1546 break;
1547
1548 case kPROOF_LOGLEVEL:
1549 { UInt_t mask;
1550 mess->ReadString(str, sizeof(str));
1551 sscanf(str, "%d %u", &fLogLevel, &mask);
1552 Bool_t levelchanged = (fLogLevel != gProofDebugLevel) ? kTRUE : kFALSE;
1555 if (levelchanged)
1556 Info("HandleSocketInput:kPROOF_LOGLEVEL", "debug level set to %d (mask: 0x%x)",
1558 if (IsMaster())
1560 }
1561 break;
1562
1563 case kPROOF_PING:
1564 { if (IsMaster())
1565 fProof->Ping();
1566 // do nothing (ping is already acknowledged)
1567 }
1568 break;
1569
1570 case kPROOF_PRINT:
1571 mess->ReadString(str, sizeof(str));
1572 Print(str);
1573 LogToMaster();
1574 SendLogFile();
1575 break;
1576
1577 case kPROOF_RESET:
1578 if (all) {
1579 mess->ReadString(str, sizeof(str));
1580 Reset(str);
1581 } else {
1582 rc = -1;
1583 }
1584 break;
1585
1586 case kPROOF_STATUS:
1587 Warning("HandleSocketInput:kPROOF_STATUS",
1588 "kPROOF_STATUS message is obsolete");
1590 Warning("HandleSocketInput:kPROOF_STATUS", "problem sending of request");
1591 break;
1592
1593 case kPROOF_GETSTATS:
1595 break;
1596
1597 case kPROOF_GETPARALLEL:
1598 SendParallel();
1599 break;
1600
1601 case kPROOF_STOP:
1602 if (all) {
1603 if (IsMaster()) {
1604 TString ord;
1605 *mess >> ord;
1606 PDB(kGlobal, 1)
1607 Info("HandleSocketInput:kPROOF_STOP", "request for worker %s", ord.Data());
1608 if (fProof) fProof->TerminateWorker(ord);
1609 } else {
1610 PDB(kGlobal, 1)
1611 Info("HandleSocketInput:kPROOF_STOP", "got request to terminate");
1612 Terminate(0);
1613 }
1614 } else {
1615 rc = -1;
1616 }
1617 break;
1618
1619 case kPROOF_STOPPROCESS:
1620 if (all) {
1621 // this message makes only sense when the query is being processed,
1622 // however the message can also be received if the user pressed
1623 // ctrl-c, so ignore it!
1624 PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_STOPPROCESS","enter");
1625 } else {
1626 Long_t timeout = -1;
1627 (*mess) >> aborted;
1628 if (fProtocol > 9)
1629 (*mess) >> timeout;
1630 PDB(kGlobal, 1)
1631 Info("HandleSocketInput:kPROOF_STOPPROCESS",
1632 "recursive mode: enter %d, %ld", aborted, timeout);
1633 if (fProof)
1634 // On the master: propagate further
1635 fProof->StopProcess(aborted, timeout);
1636 else
1637 // Worker: actually stop processing
1638 if (fPlayer)
1639 fPlayer->StopProcess(aborted, timeout);
1640 }
1641 break;
1642
1643 case kPROOF_PROCESS:
1644 {
1646 PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_PROCESS","enter");
1647 HandleProcess(mess, pslb);
1648 // The log file is send either in HandleProcess or HandleSubmergers.
1649 // The reason is that the order of various messages depend on the
1650 // processing mode (sync/async) and/or merging mode
1651 }
1652 break;
1653
1654 case kPROOF_SENDOUTPUT:
1655 {
1656 PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_SENDOUTPUT",
1657 "worker was asked to send output to master");
1658 Int_t sorc = 0;
1659 if (SendResults(fSocket, fPlayer->GetOutputList()) != 0) {
1660 Error("HandleSocketInput:kPROOF_SENDOUTPUT", "problems sending output list");
1661 sorc = 1;
1662 }
1663 // Signal the master that we are idle
1665 SetIdle(kTRUE);
1666 DeletePlayer();
1667 SendLogFile(sorc);
1668 }
1669 break;
1670
1671 case kPROOF_QUERYLIST:
1672 {
1673 HandleQueryList(mess);
1674 // Notify
1675 SendLogFile();
1676 }
1677 break;
1678
1679 case kPROOF_REMOVE:
1680 {
1681 HandleRemove(mess, pslb);
1682 // Notify
1683 SendLogFile();
1684 }
1685 break;
1686
1687 case kPROOF_RETRIEVE:
1688 {
1689 HandleRetrieve(mess, pslb);
1690 // Notify
1691 SendLogFile();
1692 }
1693 break;
1694
1695 case kPROOF_ARCHIVE:
1696 {
1697 HandleArchive(mess, pslb);
1698 // Notify
1699 SendLogFile();
1700 }
1701 break;
1702
1703 case kPROOF_MAXQUERIES:
1704 { PDB(kGlobal, 1)
1705 Info("HandleSocketInput:kPROOF_MAXQUERIES", "Enter");
1707 m << fMaxQueries;
1708 fSocket->Send(m);
1709 // Notify
1710 SendLogFile();
1711 }
1712 break;
1713
1715 if (all) {
1716 PDB(kGlobal, 1)
1717 Info("HandleSocketInput:kPROOF_CLEANUPSESSION", "Enter");
1718 TString stag;
1719 (*mess) >> stag;
1720 if (fQMgr && fQMgr->CleanupSession(stag) == 0) {
1721 Printf("Session %s cleaned up", stag.Data());
1722 } else {
1723 Printf("Could not cleanup session %s", stag.Data());
1724 }
1725 } else {
1726 rc = -1;
1727 }
1728 // Notify
1729 SendLogFile();
1730 break;
1731
1732 case kPROOF_GETENTRIES:
1733 { PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETENTRIES", "Enter");
1734 Bool_t isTree;
1736 TString dir;
1737 TString objname("undef");
1738 Long64_t entries = -1;
1739
1740 if (all) {
1741 (*mess) >> isTree >> filename >> dir >> objname;
1742 PDB(kGlobal, 2) Info("HandleSocketInput:kPROOF_GETENTRIES",
1743 "Report size of object %s (%s) in dir %s in file %s",
1744 objname.Data(), isTree ? "T" : "O",
1745 dir.Data(), filename.Data());
1746 entries = TDSet::GetEntries(isTree, filename, dir, objname);
1747 PDB(kGlobal, 2) Info("HandleSocketInput:kPROOF_GETENTRIES",
1748 "Found %lld %s", entries, isTree ? "entries" : "objects");
1749 } else {
1750 rc = -1;
1751 }
1753 answ << entries << objname;
1754 SendLogFile(); // in case of error messages
1755 fSocket->Send(answ);
1756 PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETENTRIES", "Done");
1757 }
1758 break;
1759
1760 case kPROOF_CHECKFILE:
1761 if (!all && fProtocol <= 19) {
1762 // Come back later
1763 rc = 2;
1764 } else {
1765 // Handle file checking request
1766 HandleCheckFile(mess, pslb);
1767 FlushLogFile(); // Avoid sending (error) messages at next action
1768 }
1769 break;
1770
1771 case kPROOF_SENDFILE:
1772 if (!all && fProtocol <= 19) {
1773 // Come back later
1774 rc = 2;
1775 } else {
1776 mess->ReadString(str, sizeof(str));
1777 Long_t size;
1778 Int_t bin, fw = 1;
1779 char name[1024];
1780 if (fProtocol > 5) {
1781 sscanf(str, "%1023s %d %ld %d", name, &bin, &size, &fw);
1782 } else {
1783 sscanf(str, "%1023s %d %ld", name, &bin, &size);
1784 }
1785 TString fnam(name);
1786 Bool_t copytocache = kTRUE;
1787 if (fnam.BeginsWith("cache:")) {
1788 fnam.ReplaceAll("cache:", TString::Format("%s/", fCacheDir.Data()));
1789 copytocache = kFALSE;
1790 }
1791
1792 Int_t rfrc = 0;
1793 if (size > 0) {
1794 rfrc = ReceiveFile(fnam, bin ? kTRUE : kFALSE, size);
1795 } else {
1796 // Take it from the cache
1797 if (!fnam.BeginsWith(fCacheDir.Data())) {
1798 fnam.Insert(0, TString::Format("%s/", fCacheDir.Data()));
1799 }
1800 }
1801 if (rfrc == 0) {
1802 // copy file to cache if not a PAR file
1803 if (copytocache && size > 0 && !fPackMgr->IsInDir(name))
1804 gSystem->Exec(TString::Format("%s %s %s", kCP, fnam.Data(), fCacheDir.Data()));
1805 if (IsMaster() && fw == 1) {
1807 if (bin)
1808 opt |= TProof::kBinary;
1809 PDB(kGlobal, 1)
1810 Info("HandleSocketInput","forwarding file: %s", fnam.Data());
1811 if (fProof->SendFile(fnam, opt, (copytocache ? "cache" : "")) < 0) {
1812 Error("HandleSocketInput", "forwarding file: %s", fnam.Data());
1813 }
1814 }
1816 } else {
1817 // There was an error
1818 SendLogFile(1);
1819 }
1820 }
1821 break;
1822
1823 case kPROOF_LOGFILE:
1824 {
1825 Int_t start, end;
1826 (*mess) >> start >> end;
1827 PDB(kGlobal, 1)
1828 Info("HandleSocketInput:kPROOF_LOGFILE",
1829 "Logfile request - byte range: %d - %d", start, end);
1830
1831 LogToMaster();
1832 SendLogFile(0, start, end);
1833 }
1834 break;
1835
1836 case kPROOF_PARALLEL:
1837 if (all) {
1838 if (IsMaster()) {
1839 Int_t nodes;
1840 Bool_t random = kFALSE;
1841 (*mess) >> nodes;
1842 if ((mess->BufferSize() > mess->Length()))
1843 (*mess) >> random;
1844 if (fProof) fProof->SetParallel(nodes, random);
1845 rc = 1;
1846 }
1847 } else {
1848 rc = -1;
1849 }
1850 // Notify
1851 SendLogFile();
1852 break;
1853
1854 case kPROOF_CACHE:
1855 if (!all && fProtocol <= 19) {
1856 // Come back later
1857 rc = 2;
1858 } else {
1860 PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_CACHE","enter");
1861 Int_t hcrc = HandleCache(mess, pslb);
1862 // Notify
1863 SendLogFile(hcrc);
1864 }
1865 break;
1866
1867 case kPROOF_WORKERLISTS:
1868 { Int_t wlrc = -1;
1869 if (all) {
1870 if (IsMaster())
1871 wlrc = HandleWorkerLists(mess);
1872 else
1873 Warning("HandleSocketInput:kPROOF_WORKERLISTS",
1874 "Action meaning-less on worker nodes: protocol error?");
1875 } else {
1876 rc = -1;
1877 }
1878 // Notify
1879 SendLogFile(wlrc);
1880 }
1881 break;
1882
1884 if (all) {
1885 PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETSLAVEINFO", "Enter");
1886 if (IsMaster()) {
1887
1888 Bool_t ok = kTRUE;
1889 // if the session does not have workers and is in the dynamic mode
1890 if (fProof->UseDynamicStartup()) {
1891 ok = kFALSE;
1892 // get the a list of workers and start them
1893 Int_t pc = 0;
1894 TList* workerList = new TList();
1895 EQueryAction retVal = GetWorkers(workerList, pc);
1896 if (retVal != TProofServ::kQueryStop && retVal != TProofServ::kQueryEnqueued) {
1897 Int_t ret = fProof->AddWorkers(workerList);
1898 if (ret < 0) {
1899 Error("HandleSocketInput:kPROOF_GETSLAVEINFO",
1900 "adding a list of worker nodes returned: %d", ret);
1901 }
1902 } else {
1903 Error("HandleSocketInput:kPROOF_GETSLAVEINFO",
1904 "getting list of worker nodes returned: %d", retVal);
1905 }
1906 ok = kTRUE;
1907 }
1908 if (ok) {
1909 TList *info = fProof->GetListOfSlaveInfos();
1911 answ << info;
1912 fSocket->Send(answ);
1913 // stop the workers
1915 }
1916 } else {
1918 TList *info = new TList;
1920 SysInfo_t si;
1921 gSystem->GetSysInfo(&si);
1922 wi->SetSysInfo(si);
1923 info->Add(wi);
1924 answ << (TList *)info;
1925 fSocket->Send(answ);
1926 info->SetOwner(kTRUE);
1927 delete info;
1928 }
1929
1930 PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETSLAVEINFO", "Done");
1931 } else {
1933 answ << (TList *)0;
1934 fSocket->Send(answ);
1935 rc = -1;
1936 }
1937 break;
1938
1940 if (all) {
1941 PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETTREEHEADER", "Enter");
1942
1944 if (p) {
1945 p->HandleGetTreeHeader(mess);
1946 delete p;
1947 } else {
1948 Error("HandleSocketInput:kPROOF_GETTREEHEADER", "could not create TProofPlayer instance!");
1949 }
1950
1951 PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETTREEHEADER", "Done");
1952 } else {
1954 answ << TString("Failed") << (TObject *)0;
1955 fSocket->Send(answ);
1956 rc = -1;
1957 }
1958 break;
1959
1961 { PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETOUTPUTLIST", "Enter");
1962 TList* outputList = 0;
1963 if (IsMaster()) {
1964 outputList = fProof->GetOutputList();
1965 if (!outputList)
1966 outputList = new TList();
1967 } else {
1968 outputList = new TList();
1969 if (fProof->GetPlayer()) {
1970 TList *olist = fProof->GetPlayer()->GetOutputList();
1971 TIter next(olist);
1972 TObject *o;
1973 while ( (o = next()) ) {
1974 outputList->Add(new TNamed(o->GetName(), ""));
1975 }
1976 }
1977 }
1978 outputList->SetOwner();
1980 answ << outputList;
1981 fSocket->Send(answ);
1982 delete outputList;
1983 PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETOUTPUTLIST", "Done");
1984 }
1985 break;
1986
1988 if (all) {
1989 PDB(kGlobal, 1)
1990 Info("HandleSocketInput:kPROOF_VALIDATE_DSET", "Enter");
1991
1992 TDSet* dset = 0;
1993 (*mess) >> dset;
1994
1995 if (IsMaster()) fProof->ValidateDSet(dset);
1996 else dset->Validate();
1997
1999 answ << dset;
2000 fSocket->Send(answ);
2001 delete dset;
2002 PDB(kGlobal, 1)
2003 Info("HandleSocketInput:kPROOF_VALIDATE_DSET", "Done");
2004 } else {
2005 rc = -1;
2006 }
2007 // Notify
2008 SendLogFile();
2009 break;
2010
2011 case kPROOF_DATA_READY:
2012 if (all) {
2013 PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_DATA_READY", "Enter");
2015 if (IsMaster()) {
2016 Long64_t totalbytes = 0, bytesready = 0;
2017 Bool_t dataready = fProof->IsDataReady(totalbytes, bytesready);
2018 answ << dataready << totalbytes << bytesready;
2019 } else {
2020 Error("HandleSocketInput:kPROOF_DATA_READY",
2021 "This message should not be sent to slaves");
2022 answ << kFALSE << Long64_t(0) << Long64_t(0);
2023 }
2024 fSocket->Send(answ);
2025 PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_DATA_READY", "Done");
2026 } else {
2028 answ << kFALSE << Long64_t(0) << Long64_t(0);
2029 fSocket->Send(answ);
2030 rc = -1;
2031 }
2032 // Notify
2033 SendLogFile();
2034 break;
2035
2036 case kPROOF_DATASETS:
2037 { Int_t dsrc = -1;
2038 if (fProtocol > 16) {
2039 dsrc = HandleDataSets(mess, pslb);
2040 } else {
2041 Error("HandleSocketInput", "old client: no or incompatible dataset support");
2042 }
2043 SendLogFile(dsrc);
2044 }
2045 break;
2046
2047 case kPROOF_SUBMERGER:
2048 { HandleSubmerger(mess);
2049 }
2050 break;
2051
2053 if (all) {
2054 lirc = HandleLibIncPath(mess);
2055 } else {
2056 rc = -1;
2057 }
2058 // Notify the client
2059 if (lirc > 0) SendLogFile();
2060 break;
2061
2062 case kPROOF_REALTIMELOG:
2063 { Bool_t on;
2064 (*mess) >> on;
2065 PDB(kGlobal, 1)
2066 Info("HandleSocketInput:kPROOF_REALTIMELOG",
2067 "setting real-time logging %s", (on ? "ON" : "OFF"));
2068 fRealTimeLog = on;
2069 // Forward the request to lower levels
2070 if (IsMaster())
2072 }
2073 break;
2074
2075 case kPROOF_FORK:
2076 if (all) {
2077 HandleFork(mess);
2078 LogToMaster();
2079 } else {
2080 rc = -1;
2081 }
2082 SendLogFile();
2083 break;
2084
2086 if (all) {
2087 // This message resumes the session; should not come during processing.
2088
2089 if (WaitingQueries() == 0) {
2090 Error("HandleSocketInput", "no queries enqueued");
2091 break;
2092 }
2093
2094 // Similar to handle process
2095 // get the list of workers and start them
2096 TList *workerList = (fProof->UseDynamicStartup()) ? new TList : (TList *)0;
2097 Int_t pc = 0;
2098 EQueryAction retVal = GetWorkers(workerList, pc, kTRUE);
2099
2100 if (retVal == TProofServ::kQueryOK) {
2101 Int_t ret = 0;
2102 if (workerList && (ret = fProof->AddWorkers(workerList)) < 0) {
2103 Error("HandleSocketInput", "adding a list of worker nodes returned: %d", ret);
2104 } else {
2105 ProcessNext(pslb);
2106 // Set idle
2107 SetIdle(kTRUE);
2108 // Signal the client that we are idle
2110 Bool_t waiting = (WaitingQueries() > 0) ? kTRUE : kFALSE;
2111 m << waiting;
2112 fSocket->Send(m);
2113 }
2114 } else {
2115 if (retVal == TProofServ::kQueryStop) {
2116 Error("HandleSocketInput", "error getting list of worker nodes");
2117 } else if (retVal != TProofServ::kQueryEnqueued) {
2118 Warning("HandleSocketInput", "query was re-queued!");
2119 } else {
2120 Error("HandleSocketInput", "unexpected answer: %d", retVal);
2121 break;
2122 }
2123 }
2124
2125 }
2126 break;
2127
2128 case kPROOF_GOASYNC:
2129 { // The client requested to switch to asynchronous mode:
2130 // communicate the sequential number of the running query for later
2131 // identification, if any
2132 if (!IsIdle() && fPlayer) {
2133 // Get query currently being processed
2136 m << pq->GetSeqNum() << kFALSE;
2137 fSocket->Send(m);
2138 } else {
2139 // Idle or undefined: nothing to do; ignore
2140 SendAsynMessage("Processing request to go asynchronous:"
2141 " idle or undefined player - ignoring");
2142 }
2143 }
2144 break;
2145
2146 case kPROOF_ECHO:
2147 { // Echo request: an object has been sent along. If the object is a
2148 // string, it is simply echoed back to the client from the master
2149 // and each worker. Elsewhere, the output of TObject::Print() is
2150 // sent. Received object is disposed after usage.
2151
2152 TObject *obj = mess->ReadObject(0x0); // class type ignored
2153
2154 if (IsMaster()) {
2155 // We are on master
2156 // dbTODO: forward on dynamic startup when wrks are up
2157 if (IsParallel() && fProof && !fProof->UseDynamicStartup()) {
2158 fProof->Echo(obj); // forward to lower layer
2159 }
2160 }
2161
2163 TString smsg;
2164
2165 if (obj->InheritsFrom(TObjString::Class())) {
2166 // It's a string: echo it
2167 smsg.Form("Echo response from %s:%s: %s",
2169 ((TObjString *)obj)->String().Data());
2170 }
2171 else {
2172 // Not a string: collect Print() output and send it
2173
2174 // Output to tempfile
2175 TString tmpfn = "echo-out-";
2176 FILE *tf = gSystem->TempFileName(tmpfn, fDataDir);
2177 if (!tf || (gSystem->RedirectOutput(tmpfn.Data()) == -1)) {
2178 Error("HandleSocketInput", "Can't redirect output");
2179 if (tf) {
2180 fclose(tf);
2181 gSystem->Unlink(tmpfn);
2182 }
2183 rc = -1;
2184 delete obj;
2185 break;
2186 }
2187 //cout << obj->ClassName() << endl;
2188 obj->Print();
2189 gSystem->RedirectOutput(0x0); // restore
2190 fclose(tf);
2191
2192 // Read file back and send it via message
2193 smsg.Form("*** Echo response from %s:%s ***\n",
2195 TMacro *fr = new TMacro();
2196 fr->ReadFile(tmpfn);
2197 TIter nextLine(fr->GetListOfLines());
2199 while (( line = (TObjString *)nextLine() )) {
2200 smsg.Append( line->String() );
2201 }
2202
2203 // Close the reader (TMacro) and remove file
2204 delete fr;
2205 gSystem->Unlink(tmpfn);
2206 }
2207
2208 // Send message and dispose object
2209 rmsg << smsg;
2210 GetSocket()->Send(rmsg);
2211 delete obj;
2212 }
2213 break;
2214
2215 default:
2216 Error("HandleSocketInput", "unknown command %d", what);
2217 rc = -2;
2218 break;
2219 }
2220
2221 fRealTime += (Float_t)timer.RealTime();
2222 fCpuTime += (Float_t)timer.CpuTime();
2223
2224 if (!(slb.IsNull()) || fgLogToSysLog > 1) {
2225 TString s;
2226 s.Form("%s %d %.3f %.3f %s", fgSysLogEntity.Data(),
2227 what, timer.RealTime(), timer.CpuTime(), slb.Data());
2229 }
2230
2231 // Done
2232 return rc;
2233}
2234
2235////////////////////////////////////////////////////////////////////////////////
2236/// Accept and merge results from a set of workers
2237
2239{
2240 TMessage *mess = new TMessage();
2241 Int_t mergedWorkers = 0;
2242
2243 PDB(kSubmerger, 1) Info("AcceptResults", "enter");
2244
2245 // Overall result of this procedure
2247
2248 fMergingMonitor = new TMonitor();
2250
2251 Int_t numworkers = 0;
2252 while (fMergingMonitor->GetActive() > 0 && mergedWorkers < connections) {
2253
2255 if (!s) {
2256 Info("AcceptResults", "interrupt!");
2257 result = kFALSE;
2258 break;
2259 }
2260
2261 if (s == fMergingSocket) {
2262 // New incoming connection
2263 TSocket *sw = fMergingSocket->Accept();
2264 if (sw && sw != (TSocket *)(-1)) {
2265 fMergingMonitor->Add(sw);
2266
2267 PDB(kSubmerger, 2)
2268 Info("AcceptResults", "connection from a worker accepted on merger %s ",
2269 fOrdinal.Data());
2270 // All assigned workers are connected
2271 if (++numworkers >= connections)
2273 } else {
2274 PDB(kSubmerger, 1)
2275 Info("AcceptResults", "spurious signal found of merging socket");
2276 }
2277 } else {
2278 if (s->Recv(mess) < 0) {
2279 Error("AcceptResults", "problems receiving message");
2280 continue;
2281 }
2282 PDB(kSubmerger, 2)
2283 Info("AcceptResults", "message received: %d ", (mess ? mess->What() : 0));
2284 if (!mess) {
2285 Error("AcceptResults", "message received: %p ", mess);
2286 continue;
2287 }
2288 Int_t type = 0;
2289
2290 // Read output objec(s) from the received message
2291 while ((mess->BufferSize() > mess->Length())) {
2292 (*mess) >> type;
2293
2294 PDB(kSubmerger, 2) Info("AcceptResults", " type %d ", type);
2295 if (type == 2) {
2296 mergedWorkers++;
2297 PDB(kSubmerger, 2)
2298 Info("AcceptResults",
2299 "a new worker has been mergerd. Total merged workers: %d",
2300 mergedWorkers);
2301 }
2302 TObject *o = mess->ReadObject(TObject::Class());
2303 if (mergerPlayer->AddOutputObject(o) == 1) {
2304 // Remove the object if it has been merged
2305 PDB(kSubmerger, 2) Info("AcceptResults", "removing %p (has been merged)", o);
2306 SafeDelete(o);
2307 } else
2308 PDB(kSubmerger, 2) Info("AcceptResults", "%p not merged yet", o);
2309 }
2310 }
2311 }
2313
2315 Int_t size = sockets->GetSize();
2316 for (Int_t i =0; i< size; ++i){
2317 ((TSocket*)(sockets->At(i)))->Close();
2318 PDB(kSubmerger, 2) Info("AcceptResults", "closing socket");
2319 delete ((TSocket*)(sockets->At(i)));
2320 }
2321 delete sockets;
2322
2325
2326 PDB(kSubmerger, 2) Info("AcceptResults", "exit: %d", result);
2327 return result;
2328}
2329
2330////////////////////////////////////////////////////////////////////////////////
2331/// Handle Out-Of-Band data sent by the master or client.
2332
2334{
2335 char oob_byte;
2336 Int_t n, nch, wasted = 0;
2337
2338 const Int_t kBufSize = 1024;
2339 char waste[kBufSize];
2340
2341 // Real-time notification of messages
2343
2344 PDB(kGlobal, 5)
2345 Info("HandleUrgentData", "handling oob...");
2346
2347 // Receive the OOB byte
2348 while ((n = fSocket->RecvRaw(&oob_byte, 1, kOob)) < 0) {
2349 if (n == -2) { // EWOULDBLOCK
2350 //
2351 // The OOB data has not yet arrived: flush the input stream
2352 //
2353 // In some systems (Solaris) regular recv() does not return upon
2354 // receipt of the oob byte, which makes the below call to recv()
2355 // block indefinitely if there are no other data in the queue.
2356 // FIONREAD ioctl can be used to check if there are actually any
2357 // data to be flushed. If not, wait for a while for the oob byte
2358 // to arrive and try to read it again.
2359 //
2361 if (nch == 0) {
2362 gSystem->Sleep(1000);
2363 continue;
2364 }
2365
2366 if (nch > kBufSize) nch = kBufSize;
2367 n = fSocket->RecvRaw(waste, nch);
2368 if (n <= 0) {
2369 Error("HandleUrgentData", "error receiving waste");
2370 break;
2371 }
2372 wasted = 1;
2373 } else {
2374 Error("HandleUrgentData", "error receiving OOB");
2375 return;
2376 }
2377 }
2378
2379 PDB(kGlobal, 5)
2380 Info("HandleUrgentData", "got OOB byte: %d\n", oob_byte);
2381
2382 if (fProof) fProof->SetActive();
2383
2384 switch (oob_byte) {
2385
2387 Info("HandleUrgentData", "*** Hard Interrupt");
2388
2389 // If master server, propagate interrupt to slaves
2390 if (IsMaster())
2392
2393 // Flush input socket
2394 while (1) {
2395 Int_t atmark;
2396
2397 fSocket->GetOption(kAtMark, atmark);
2398
2399 if (atmark) {
2400 // Send the OOB byte back so that the client knows where
2401 // to stop flushing its input stream of obsolete messages
2402 n = fSocket->SendRaw(&oob_byte, 1, kOob);
2403 if (n <= 0)
2404 Error("HandleUrgentData", "error sending OOB");
2405 break;
2406 }
2407
2408 // find out number of bytes to read before atmark
2410 if (nch == 0) {
2411 gSystem->Sleep(1000);
2412 continue;
2413 }
2414
2415 if (nch > kBufSize) nch = kBufSize;
2416 n = fSocket->RecvRaw(waste, nch);
2417 if (n <= 0) {
2418 Error("HandleUrgentData", "error receiving waste (2)");
2419 break;
2420 }
2421 }
2422
2423 SendLogFile();
2424
2425 break;
2426
2428 Info("HandleUrgentData", "Soft Interrupt");
2429
2430 // If master server, propagate interrupt to slaves
2431 if (IsMaster())
2433
2434 if (wasted) {
2435 Error("HandleUrgentData", "soft interrupt flushed stream");
2436 break;
2437 }
2438
2439 Interrupt();
2440
2441 SendLogFile();
2442
2443 break;
2444
2446 Info("HandleUrgentData", "Shutdown Interrupt");
2447
2448 // If master server, propagate interrupt to slaves
2449 if (IsMaster())
2451
2452 Terminate(0);
2453
2454 break;
2455
2456 default:
2457 Error("HandleUrgentData", "unexpected OOB byte");
2458 break;
2459 }
2460
2462}
2463
2464////////////////////////////////////////////////////////////////////////////////
2465/// Called when the client is not alive anymore (i.e. when kKeepAlive
2466/// has failed).
2467
2469{
2470 // Real-time notification of messages
2472
2473 if (IsMaster()) {
2474 // Check if we are here because client is closed. Try to ping client,
2475 // if that works it we are here because some slave died
2476 if (fSocket->Send(kPROOF_PING | kMESS_ACK) < 0) {
2477 Info("HandleSigPipe", "keepAlive probe failed");
2478 // Tell slaves we are going to close since there is no client anymore
2479
2480 fProof->SetActive();
2483 Terminate(0);
2484 }
2485 } else {
2486 Info("HandleSigPipe", "keepAlive probe failed");
2487 Terminate(0); // will not return from here....
2488 }
2489}
2490
2491////////////////////////////////////////////////////////////////////////////////
2492/// True if in parallel mode.
2493
2495{
2496 if (IsMaster() && fProof)
2497 return fProof->IsParallel() || fProof->UseDynamicStartup() ;
2498
2499 // false in case we are a slave
2500 return kFALSE;
2501}
2502
2503////////////////////////////////////////////////////////////////////////////////
2504/// Print status of slave server.
2505
2507{
2508 if (IsMaster() && fProof)
2510 else
2511 Printf("This is worker %s", gSystem->HostName());
2512}
2513
2514////////////////////////////////////////////////////////////////////////////////
2515/// Redirect stdout to a log file. This log file will be flushed to the
2516/// client or master after each command.
2517
2518void TProofServ::RedirectOutput(const char *dir, const char *mode)
2519{
2520 char logfile[512];
2521
2522 TString sdir = (dir && strlen(dir) > 0) ? dir : fSessionDir.Data();
2523 if (IsMaster()) {
2524 snprintf(logfile, 512, "%s/master-%s.log", sdir.Data(), fOrdinal.Data());
2525 } else {
2526 snprintf(logfile, 512, "%s/worker-%s.log", sdir.Data(), fOrdinal.Data());
2527 }
2528
2529 if ((freopen(logfile, mode, stdout)) == 0)
2530 SysError("RedirectOutput", "could not freopen stdout (%s)", logfile);
2531
2532 if ((dup2(fileno(stdout), fileno(stderr))) < 0)
2533 SysError("RedirectOutput", "could not redirect stderr");
2534
2535 if ((fLogFile = fopen(logfile, "r")) == 0)
2536 SysError("RedirectOutput", "could not open logfile '%s'", logfile);
2537
2538 // from this point on stdout and stderr are properly redirected
2539 if (fProtocol < 4 && fWorkDir != TString::Format("~/%s", kPROOF_WorkDir)) {
2540 Warning("RedirectOutput", "no way to tell master (or client) where"
2541 " to upload packages");
2542 }
2543}
2544
2545////////////////////////////////////////////////////////////////////////////////
2546/// Reset PROOF environment to be ready for execution of next command.
2547
2548void TProofServ::Reset(const char *dir)
2549{
2550 // First go to new directory. Check first that we got a reasonable path;
2551 // in PROOF-Lite it may not be the case
2552 TString dd(dir);
2553 if (!dd.BeginsWith("proofserv")) {
2554 Int_t ic = dd.Index(":");
2555 if (ic != kNPOS)
2556 dd.Replace(0, ic, "proofserv");
2557 }
2558 gDirectory->cd(dd.Data());
2559
2560 // Clear interpreter environment.
2561 gROOT->Reset();
2562
2563 // Make sure current directory is empty (don't delete anything when
2564 // we happen to be in the ROOT memory only directory!?)
2565 if (gDirectory != gROOT) {
2566 gDirectory->Delete();
2567 }
2568
2570}
2571
2572////////////////////////////////////////////////////////////////////////////////
2573/// Receive a file, either sent by a client or a master server.
2574/// If bin is true it is a binary file, other wise it is an ASCII
2575/// file and we need to check for Windows \\r tokens. Returns -1 in
2576/// case of error, 0 otherwise.
2577
2579{
2580 if (size <= 0) return 0;
2581
2582 // open file, overwrite already existing file
2583 Int_t fd = open(file, O_CREAT | O_TRUNC | O_WRONLY, 0600);
2584 if (fd < 0) {
2585 SysError("ReceiveFile", "error opening file %s", file);
2586 return -1;
2587 }
2588
2589 const Int_t kMAXBUF = 16384; //32768 //16384 //65536;
2590 char buf[kMAXBUF], cpy[kMAXBUF];
2591
2592 Int_t left, r;
2593 Long64_t filesize = 0;
2594
2595 while (filesize < size) {
2596 left = Int_t(size - filesize);
2597 if (left > kMAXBUF)
2598 left = kMAXBUF;
2599 r = fSocket->RecvRaw(&buf, left);
2600 if (r > 0) {
2601 char *p = buf;
2602
2603 filesize += r;
2604 while (r) {
2605 Int_t w;
2606
2607 if (!bin) {
2608 Int_t k = 0, i = 0, j = 0;
2609 char *q;
2610 while (i < r) {
2611 if (p[i] == '\r') {
2612 i++;
2613 k++;
2614 }
2615 cpy[j++] = buf[i++];
2616 }
2617 q = cpy;
2618 r -= k;
2619 w = write(fd, q, r);
2620 } else {
2621 w = write(fd, p, r);
2622 }
2623
2624 if (w < 0) {
2625 SysError("ReceiveFile", "error writing to file %s", file);
2626 close(fd);
2627 return -1;
2628 }
2629 r -= w;
2630 p += w;
2631 }
2632 } else if (r < 0) {
2633 Error("ReceiveFile", "error during receiving file %s", file);
2634 close(fd);
2635 return -1;
2636 }
2637 }
2638
2639 close(fd);
2640
2641 if (chmod(file, 0644) != 0)
2642 Warning("ReceiveFile", "error setting mode 0644 on file %s", file);
2643
2644 return 0;
2645}
2646
2647////////////////////////////////////////////////////////////////////////////////
2648/// Main server eventloop.
2649
2651{
2652 // Setup the server
2653 if (CreateServer() == 0) {
2654
2655 // Run the main event loop
2656 TApplication::Run(retrn);
2657 }
2658}
2659
2660////////////////////////////////////////////////////////////////////////////////
2661/// Send log file to master.
2662/// If start > -1 send only bytes in the range from start to end,
2663/// if end <= start send everything from start.
2664
2666{
2667 // Determine the number of bytes left to be read from the log file.
2668 fflush(stdout);
2669
2670 // On workers we do not send the logs to masters (to avoid duplication of
2671 // text) unless asked explicitly, e.g. after an Exec(...) request.
2672 if (!IsMaster()) {
2673 if (!fSendLogToMaster) {
2674 FlushLogFile();
2675 } else {
2676 // Decide case by case
2678 }
2679 }
2680
2681 off_t ltot=0, lnow=0;
2682 Int_t left = -1;
2683 Bool_t adhoc = kFALSE;
2684
2685 if (fLogFileDes > -1) {
2686 ltot = lseek(fileno(stdout), (off_t) 0, SEEK_END);
2687 lnow = lseek(fLogFileDes, (off_t) 0, SEEK_CUR);
2688
2689 if (ltot >= 0 && lnow >= 0) {
2690 if (start > -1) {
2691 lseek(fLogFileDes, (off_t) start, SEEK_SET);
2692 if (end <= start || end > ltot)
2693 end = ltot;
2694 left = (Int_t)(end - start);
2695 if (end < ltot)
2696 left++;
2697 adhoc = kTRUE;
2698 } else {
2699 left = (Int_t)(ltot - lnow);
2700 }
2701 }
2702 }
2703
2704 if (left > 0) {
2705 if (fSocket->Send(left, kPROOF_LOGFILE) < 0) {
2706 SysError("SendLogFile", "error sending kPROOF_LOGFILE");
2707 return;
2708 }
2709
2710 const Int_t kMAXBUF = 32768; //16384 //65536;
2711 char buf[kMAXBUF];
2712 Int_t wanted = (left > kMAXBUF) ? kMAXBUF : left;
2713 Int_t len;
2714 do {
2715 while ((len = read(fLogFileDes, buf, wanted)) < 0 &&
2716 TSystem::GetErrno() == EINTR)
2718
2719 if (len < 0) {
2720 SysError("SendLogFile", "error reading log file");
2721 break;
2722 }
2723
2724 if (end == ltot && len == wanted)
2725 buf[len-1] = '\n';
2726
2727 if (fSocket->SendRaw(buf, len) < 0) {
2728 SysError("SendLogFile", "error sending log file");
2729 break;
2730 }
2731
2732 // Update counters
2733 left -= len;
2734 wanted = (left > kMAXBUF) ? kMAXBUF : left;
2735
2736 } while (len > 0 && left > 0);
2737 }
2738
2739 // Restore initial position if partial send
2740 if (adhoc && lnow >=0 )
2741 lseek(fLogFileDes, lnow, SEEK_SET);
2742
2744 if (IsMaster())
2745 mess << status << (fProof ? fProof->GetParallel() : 0);
2746 else
2747 mess << status << (Int_t) 1;
2748
2749 if (fSocket->Send(mess) < 0) {
2750 SysError("SendLogFile", "error sending kPROOF_LOGDONE");
2751 return;
2752 }
2753
2754 PDB(kGlobal, 1) Info("SendLogFile", "kPROOF_LOGDONE sent");
2755}
2756
2757////////////////////////////////////////////////////////////////////////////////
2758/// Send statistics of slave server to master or client.
2759
2761{
2762 Long64_t bytesread = TFile::GetFileBytesRead();
2763 Float_t cputime = fCpuTime, realtime = fRealTime;
2764 if (IsMaster()) {
2765 bytesread = fProof->GetBytesRead();
2766 cputime = fProof->GetCpuTime();
2767 }
2768
2770 TString workdir = gSystem->WorkingDirectory(); // expect TString on other side
2771 mess << bytesread << realtime << cputime << workdir;
2772 if (fProtocol >= 4) mess << TString(gProofServ->GetWorkDir());
2773 mess << TString(gProofServ->GetImage());
2774 fSocket->Send(mess);
2775}
2776
2777////////////////////////////////////////////////////////////////////////////////
2778/// Send number of parallel nodes to master or client.
2779
2781{
2782 Int_t nparallel = 0;
2783 if (IsMaster()) {
2784 PDB(kGlobal, 2)
2785 Info("SendParallel", "Will invoke AskParallel()");
2787 PDB(kGlobal, 2)
2788 Info("SendParallel", "Will invoke GetParallel()");
2789 nparallel = fProof->GetParallel();
2790 } else {
2791 nparallel = 1;
2792 }
2793
2795 mess << nparallel << async;
2796 fSocket->Send(mess);
2797}
2798
2799////////////////////////////////////////////////////////////////////////////////
2800/// Print the ProofServ logo on standard output.
2801/// Return 0 on success, -1 on failure
2802
2804{
2805 char str[512];
2806
2807 if (IsMaster()) {
2808 snprintf(str, 512, "**** Welcome to the PROOF server @ %s ****", gSystem->HostName());
2809 } else {
2810 snprintf(str, 512, "**** PROOF slave server @ %s started ****", gSystem->HostName());
2811 }
2812
2813 if (fSocket->Send(str) != 1+static_cast<Int_t>(strlen(str))) {
2814 Error("Setup", "failed to send proof server startup message");
2815 return -1;
2816 }
2817
2818 // exchange protocol level between client and master and between
2819 // master and slave
2820 Int_t what;
2821 if (fSocket->Recv(fProtocol, what) != 2*sizeof(Int_t)) {
2822 Error("Setup", "failed to receive remote proof protocol");
2823 return -1;
2824 }
2825 if (fSocket->Send(kPROOF_Protocol, kROOTD_PROTOCOL) != 2*sizeof(Int_t)) {
2826 Error("Setup", "failed to send local proof protocol");
2827 return -1;
2828 }
2829
2830 // If old version, setup authentication related stuff
2831 if (fProtocol < 5) {
2832 TString wconf;
2833 if (OldAuthSetup(wconf) != 0) {
2834 Error("Setup", "OldAuthSetup: failed to setup authentication");
2835 return -1;
2836 }
2837 if (IsMaster()) {
2838 fConfFile = wconf;
2839 fWorkDir.Form("~/%s", kPROOF_WorkDir);
2840 } else {
2841 if (fProtocol < 4) {
2842 fWorkDir.Form("~/%s", kPROOF_WorkDir);
2843 } else {
2844 fWorkDir = wconf;
2845 if (fWorkDir.IsNull()) fWorkDir.Form("~/%s", kPROOF_WorkDir);
2846 }
2847 }
2848 } else {
2849
2850 // Receive some useful information
2851 TMessage *mess;
2852 if ((fSocket->Recv(mess) <= 0) || !mess) {
2853 Error("Setup", "failed to receive ordinal and config info");
2854 return -1;
2855 }
2856 if (IsMaster()) {
2857 (*mess) >> fUser >> fOrdinal >> fConfFile;
2858 fWorkDir = gEnv->GetValue("ProofServ.Sandbox", TString::Format("~/%s", kPROOF_WorkDir));
2859 } else {
2860 (*mess) >> fUser >> fOrdinal >> fWorkDir;
2861 if (fWorkDir.IsNull())
2862 fWorkDir = gEnv->GetValue("ProofServ.Sandbox", TString::Format("~/%s", kPROOF_WorkDir));
2863 }
2864 // Set the correct prefix
2865 if (fOrdinal != "-1")
2866 fPrefix += fOrdinal;
2868 delete mess;
2869 }
2870
2871 if (IsMaster()) {
2872
2873 // strip off any prooftype directives
2874 TString conffile = fConfFile;
2875 conffile.Remove(0, 1 + conffile.Index(":"));
2876
2877 // parse config file to find working directory
2878 TProofResourcesStatic resources(fConfDir, conffile);
2879 if (resources.IsValid()) {
2880 if (resources.GetMaster()) {
2881 TString tmpWorkDir = resources.GetMaster()->GetWorkDir();
2882 if (tmpWorkDir != "")
2883 fWorkDir = tmpWorkDir;
2884 }
2885 } else {
2886 Info("Setup", "invalid config file %s (missing or unreadable",
2887 resources.GetFileName().Data());
2888 }
2889 }
2890
2891 // Set $HOME and $PATH. The HOME directory was already set to the
2892 // user's home directory by proofd.
2893 gSystem->Setenv("HOME", gSystem->HomeDirectory());
2894
2895 // Add user name in case of non default workdir
2896 if (fWorkDir.BeginsWith("/") &&
2898 if (!fWorkDir.EndsWith("/"))
2899 fWorkDir += "/";
2901 if (u) {
2902 fWorkDir += u->fUser;
2903 delete u;
2904 }
2905 }
2906
2907 // Goto to the main PROOF working directory
2909 if (gProofDebugLevel > 0)
2910 Info("Setup", "working directory set to %s", fWorkDir.Data());
2911
2912 // host first name
2913 TString host = gSystem->HostName();
2914 if (host.Index(".") != kNPOS)
2915 host.Remove(host.Index("."));
2916
2917 // Session tag
2918 fSessionTag.Form("%s-%s-%ld-%d", fOrdinal.Data(), host.Data(),
2919 (Long_t)TTimeStamp().GetSec(),gSystem->GetPid());
2921
2922 // create session directory and make it the working directory
2924 if (IsMaster())
2925 fSessionDir += "/master-";
2926 else
2927 fSessionDir += "/slave-";
2929
2930 // Common setup
2931 if (SetupCommon() != 0) {
2932 Error("Setup", "common setup failed");
2933 return -1;
2934 }
2935
2936 // Incoming OOB should generate a SIGURG
2938
2939 // Send packets off immediately to reduce latency
2941
2942 // Check every two hours if client is still alive
2944
2945 // Done
2946 return 0;
2947}
2948
2949////////////////////////////////////////////////////////////////////////////////
2950/// Common part (between TProofServ and TXProofServ) of the setup phase.
2951/// Return 0 on success, -1 on error
2952
2954{
2955 // deny write access for group and world
2956 gSystem->Umask(022);
2957
2958#ifdef R__UNIX
2959 // Add bindir to PATH
2960 TString path(gSystem->Getenv("PATH"));
2961 TString bindir(TROOT::GetBinDir());
2962 // Augment PATH, if required
2963 // ^<compiler>, <compiler>, ^<sysbin>, <sysbin>
2964 TString paths = gEnv->GetValue("ProofServ.BinPaths", "");
2965 if (paths.Length() > 0) {
2966 Int_t icomp = 0;
2967 if (paths.Contains("^<compiler>"))
2968 icomp = 1;
2969 else if (paths.Contains("<compiler>"))
2970 icomp = -1;
2971 if (icomp != 0) {
2972# ifdef COMPILER
2973 TString compiler = COMPILER;
2974 if (compiler.Index("is ") != kNPOS)
2975 compiler.Remove(0, compiler.Index("is ") + 3);
2976 compiler = gSystem->GetDirName(compiler);
2977 if (icomp == 1) {
2978 if (!bindir.IsNull()) bindir += ":";
2979 bindir += compiler;
2980 } else if (icomp == -1) {
2981 if (!path.IsNull()) path += ":";
2982 path += compiler;
2983 }
2984#endif
2985 }
2986 Int_t isysb = 0;
2987 if (paths.Contains("^<sysbin>"))
2988 isysb = 1;
2989 else if (paths.Contains("<sysbin>"))
2990 isysb = -1;
2991 if (isysb != 0) {
2992 if (isysb == 1) {
2993 if (!bindir.IsNull()) bindir += ":";
2994 bindir += "/bin:/usr/bin:/usr/local/bin";
2995 } else if (isysb == -1) {
2996 if (!path.IsNull()) path += ":";
2997 path += "/bin:/usr/bin:/usr/local/bin";
2998 }
2999 }
3000 }
3001 // Final insert
3002 if (!bindir.IsNull()) bindir += ":";
3003 path.Insert(0, bindir);
3004 gSystem->Setenv("PATH", path);
3005#endif
3006
3010 Error("SetupCommon", "can not change to PROOF directory %s",
3011 fWorkDir.Data());
3012 return -1;
3013 }
3014 } else {
3019 Error("SetupCommon", "can not change to PROOF directory %s",
3020 fWorkDir.Data());
3021 return -1;
3022 }
3023 }
3024 }
3025
3026 // Set group
3027 fGroup = gEnv->GetValue("ProofServ.ProofGroup", "default");
3028
3029 // Check and make sure "cache" directory exists
3030 fCacheDir = gEnv->GetValue("ProofServ.CacheDir",
3035 if (gProofDebugLevel > 0)
3036 Info("SetupCommon", "cache directory set to %s", fCacheDir.Data());
3037 fCacheLock =
3038 new TProofLockPath(TString::Format("%s/%s%s",
3040 TString(fCacheDir).ReplaceAll("/","%").Data()));
3041 // Make also sure the cache path is in the macro path
3043
3044 // Check and make sure "packages" directory exists
3045 TString packdir = gEnv->GetValue("ProofServ.PackageDir",
3047 ResolveKeywords(packdir);
3048 if (gSystem->AccessPathName(packdir))
3049 gSystem->mkdir(packdir, kTRUE);
3050 fPackMgr = new TPackMgr(packdir);
3052 // Notification message
3053 TString noth;
3054 const char *k = (IsMaster()) ? "Mst" : "Wrk";
3055 noth.Form("%s-%s", k, fOrdinal.Data());
3056 fPackMgr->SetPrefix(noth.Data());
3057 if (gProofDebugLevel > 0)
3058 Info("SetupCommon", "package directory set to %s", packdir.Data());
3059
3060 // Check and make sure "data" directory exists
3061 fDataDir = gEnv->GetValue("ProofServ.DataDir","");
3062 Ssiz_t isep = kNPOS;
3063 if (fDataDir.IsNull()) {
3064 // Use default
3065 fDataDir.Form("%s/%s/<ord>/<stag>", fWorkDir.Data(), kPROOF_DataDir);
3066 } else if ((isep = fDataDir.Last(' ')) != kNPOS) {
3067 fDataDirOpts = fDataDir(isep + 1, fDataDir.Length());
3068 fDataDir.Remove(isep);
3069 }
3072 if (gSystem->mkdir(fDataDir, kTRUE) != 0) {
3073 Warning("SetupCommon", "problems creating path '%s' (errno: %d)",
3075 }
3076 if (gProofDebugLevel > 0)
3077 Info("SetupCommon", "data directory set to %s", fDataDir.Data());
3078
3079 // Check and apply possible options
3080 TString dataDirOpts = gEnv->GetValue("ProofServ.DataDirOpts","");
3081 if (!dataDirOpts.IsNull()) {
3082 // Do they apply to this server type
3083 Bool_t doit = kTRUE;
3084 if ((IsMaster() && !dataDirOpts.Contains("M")) ||
3085 (!IsMaster() && !dataDirOpts.Contains("W"))) doit = kFALSE;
3086 if (doit) {
3087 // Get the wanted mode
3088 UInt_t m = 0755;
3089 if (dataDirOpts.Contains("g")) m = 0775;
3090 if (dataDirOpts.Contains("a") || dataDirOpts.Contains("o")) m = 0777;
3091 if (gProofDebugLevel > 0)
3092 Info("SetupCommon", "requested mode for data directories is '%o'", m);
3093 // Loop over paths
3094 FileStat_t st;
3095 TString p, subp;
3096 Int_t from = 0;
3097 if (fDataDir.BeginsWith("/")) p = "/";
3098 while (fDataDir.Tokenize(subp, from, "/")) {
3099 if (subp.IsNull()) continue;
3100 p += subp;
3101 if (gSystem->GetPathInfo(p, st) == 0) {
3102 if (st.fUid == (Int_t) gSystem->GetUid() && st.fGid == (Int_t) gSystem->GetGid()) {
3103 if (gSystem->Chmod(p.Data(), m) != 0) {
3104 Warning("SetupCommon", "problems setting mode '%o' on path '%s' (errno: %d)",
3105 m, p.Data(), TSystem::GetErrno());
3106 break;
3107 }
3108 }
3109 p += "/";
3110 } else {
3111 Warning("SetupCommon", "problems stat-ing path '%s' (errno: %d; datadir: %s)",
3112 p.Data(), TSystem::GetErrno(), fDataDir.Data());
3113 break;
3114 }
3115 }
3116 }
3117 }
3118
3119 // List of directories where to look for global packages
3120 TString globpack = gEnv->GetValue("Proof.GlobalPackageDirs","");
3121
3122 ResolveKeywords(globpack);
3123 Int_t nglb = TPackMgr::RegisterGlobalPath(globpack);
3124 Info("SetupCommon", " %d global package directories registered", nglb);
3125 FlushLogFile();
3126
3127 // Check the session dir
3133 Error("SetupCommon", "can not change to working directory '%s'",
3134 fSessionDir.Data());
3135 return -1;
3136 }
3137 }
3138 gSystem->Setenv("PROOF_SANDBOX", fSessionDir);
3139 if (gProofDebugLevel > 0)
3140 Info("SetupCommon", "session dir is '%s'", fSessionDir.Data());
3141
3142 // On masters, check and make sure that "queries" and "datasets"
3143 // directories exist
3144 if (IsMaster()) {
3145
3146 // Make sure that the 'queries' dir exist
3152 fQueryDir += TString("/session-") + fTopSessionTag;
3155 if (gProofDebugLevel > 0)
3156 Info("SetupCommon", "queries dir is %s", fQueryDir.Data());
3157
3158 // Create 'queries' locker instance and lock it
3159 fQueryLock = new TProofLockPath(TString::Format("%s/%s%s-%s",
3162 TString(fQueryDir).ReplaceAll("/","%").Data()));
3163 fQueryLock->Lock();
3164 // Create the query manager
3166 fQueryLock, 0);
3167 }
3168
3169 // Server image
3170 fImage = gEnv->GetValue("ProofServ.Image", "");
3171
3172 // Get the group priority
3173 if (IsMaster()) {
3174 // Send session tag to client
3176 m << fTopSessionTag << fGroup << fUser;
3177 fSocket->Send(m);
3178 // Group priority
3180 // Dataset manager instance via plug-in
3181 TPluginHandler *h = 0;
3182 TString dsms = gEnv->GetValue("Proof.DataSetManager", "");
3183 if (!dsms.IsNull()) {
3184 TString dsm;
3185 Int_t from = 0;
3186 while (dsms.Tokenize(dsm, from, ",")) {
3188 Warning("SetupCommon", "a valid dataset manager already initialized");
3189 Warning("SetupCommon", "support for multiple managers not yet available");
3190 break;
3191 }
3192 // Get plugin manager to load the appropriate TDataSetManager
3193 if (gROOT->GetPluginManager()) {
3194 // Find the appropriate handler
3195 h = gROOT->GetPluginManager()->FindHandler("TDataSetManager", dsm);
3196 if (h && h->LoadPlugin() != -1) {
3197 // make instance of the dataset manager
3199 reinterpret_cast<TDataSetManager*>(h->ExecPlugin(3, fGroup.Data(),
3200 fUser.Data(), dsm.Data()));
3201 }
3202 }
3203 }
3204 // Check the result of the dataset manager initialization
3206 Warning("SetupCommon", "dataset manager plug-in initialization failed");
3207 SendAsynMessage("TXProofServ::SetupCommon: dataset manager plug-in initialization failed");
3209 }
3210 } else {
3211 // Initialize the default dataset manager
3212 TString opts("Av:");
3213 TString dsetdir = gEnv->GetValue("ProofServ.DataSetDir", "");
3214 if (dsetdir.IsNull()) {
3215 // Use the default in the sandbox
3216 dsetdir.Form("%s/%s", fWorkDir.Data(), kPROOF_DataSetDir);
3219 opts += "Sb:";
3220 }
3221 // Find the appropriate handler
3222 if (!h) {
3223 h = gROOT->GetPluginManager()->FindHandler("TDataSetManager", "file");
3224 if (h && h->LoadPlugin() == -1) h = 0;
3225 }
3226 if (h) {
3227 // make instance of the dataset manager
3228 TString oo = TString::Format("dir:%s opt:%s", dsetdir.Data(), opts.Data());
3229 fDataSetManager = reinterpret_cast<TDataSetManager*>(h->ExecPlugin(3,
3230 fGroup.Data(), fUser.Data(), oo.Data()));
3231 }
3233 Warning("SetupCommon", "default dataset manager plug-in initialization failed");
3235 }
3236 }
3237 // Dataset manager for staging requests
3238 TString dsReqCfg = gEnv->GetValue("Proof.DataSetStagingRequests", "");
3239 if (!dsReqCfg.IsNull()) {
3240 TPMERegexp reReqDir("(^| )(dir:)?([^ ]+)( |$)");
3241
3242 if (reReqDir.Match(dsReqCfg) == 5) {
3243 TString dsDirFmt;
3244 dsDirFmt.Form("dir:%s perms:open", reReqDir[3].Data());
3245 fDataSetStgRepo = new TDataSetManagerFile("_stage_", "_stage_",
3246 dsDirFmt);
3247 if (fDataSetStgRepo &&
3249 Warning("SetupCommon",
3250 "failed init of dataset staging requests repository");
3252 }
3253 } else {
3254 Warning("SetupCommon",
3255 "specify, with [dir:]<path>, a valid path for staging requests");
3256 }
3257 } else if (gProofDebugLevel > 0) {
3258 Warning("SetupCommon", "no repository for staging requests available");
3259 }
3260 }
3261
3262 // Quotas
3263 TString quotas = gEnv->GetValue(TString::Format("ProofServ.UserQuotas.%s", fUser.Data()),"");
3264 if (quotas.IsNull())
3265 quotas = gEnv->GetValue(TString::Format("ProofServ.UserQuotasByGroup.%s", fGroup.Data()),"");
3266 if (quotas.IsNull())
3267 quotas = gEnv->GetValue("ProofServ.UserQuotas", "");
3268 if (!quotas.IsNull()) {
3269 // Parse it; format ("maxquerykept=10 hwmsz=800m maxsz=1g")
3270 TString tok;
3271 Ssiz_t from = 0;
3272 while (quotas.Tokenize(tok, from, " ")) {
3273 // Set max number of query results to keep
3274 if (tok.BeginsWith("maxquerykept=")) {
3275 tok.ReplaceAll("maxquerykept=","");
3276 if (tok.IsDigit())
3277 fMaxQueries = tok.Atoi();
3278 else
3279 Info("SetupCommon",
3280 "parsing 'maxquerykept' :ignoring token %s : not a digit", tok.Data());
3281 }
3282 // Set High-Water-Mark or max on the sandbox size
3283 const char *ksz[2] = {"hwmsz=", "maxsz="};
3284 for (Int_t j = 0; j < 2; j++) {
3285 if (tok.BeginsWith(ksz[j])) {
3286 tok.ReplaceAll(ksz[j],"");
3287 Long64_t fact = -1;
3288 if (!tok.IsDigit()) {
3289 // Parse (k, m, g)
3290 tok.ToLower();
3291 const char *s[3] = {"k", "m", "g"};
3292 Int_t i = 0, ki = 1024;
3293 while (fact < 0) {
3294 if (tok.EndsWith(s[i++]))
3295 fact = ki;
3296 else
3297 ki *= 1024;
3298 }
3299 tok.Remove(tok.Length()-1);
3300 }
3301 if (tok.IsDigit()) {
3302 if (j == 0)
3303 fHWMBoxSize = (fact > 0) ? tok.Atoi() * fact : tok.Atoi();
3304 else
3305 fMaxBoxSize = (fact > 0) ? tok.Atoi() * fact : tok.Atoi();
3306 } else {
3307 TString ssz(ksz[j], strlen(ksz[j])-1);
3308 Info("SetupCommon", "parsing '%s' : ignoring token %s", ssz.Data(), tok.Data());
3309 }
3310 }
3311 }
3312 }
3313 }
3314
3315 // Apply quotas, if any
3316 if (IsMaster() && fQMgr)
3318 Warning("SetupCommon", "problems applying fMaxQueries");
3319
3320 // Send "ROOTversion|ArchCompiler" flag
3321 if (fProtocol > 12) {
3322 TString vac = gROOT->GetVersion();
3323 vac += TString::Format(":%s", gROOT->GetGitCommit());
3324 TString rtag = gEnv->GetValue("ProofServ.RootVersionTag", "");
3325 if (rtag.Length() > 0)
3326 vac += TString::Format(":%s", rtag.Data());
3329 m << vac;
3330 fSocket->Send(m);
3331 }
3332
3333 // Set user vars in TProof
3334 TString all_vars(gSystem->Getenv("PROOF_ALLVARS"));
3335 TString name;
3336 Int_t from = 0;
3337 while (all_vars.Tokenize(name, from, ",")) {
3338 if (!name.IsNull()) {
3341 }
3342 }
3343
3344 if (fgLogToSysLog > 0) {
3345 // Set the syslog entity (all the information is available now)
3346 if (!(fUser.IsNull()) && !(fGroup.IsNull())) {
3347 fgSysLogEntity.Form("%s:%s", fUser.Data(), fGroup.Data());
3348 } else if (!(fUser.IsNull()) && fGroup.IsNull()) {
3349 fgSysLogEntity.Form("%s:default", fUser.Data());
3350 } else if (fUser.IsNull() && !(fGroup.IsNull())) {
3351 fgSysLogEntity.Form("undef:%s", fGroup.Data());
3352 }
3353 // Log the beginning of this session
3354 TString s;
3355 s.Form("%s 0 %.3f %.3f", fgSysLogEntity.Data(), fRealTime, fCpuTime);
3357 }
3358
3359 if (gProofDebugLevel > 0)
3360 Info("SetupCommon", "successfully completed");
3361
3362 // Done
3363 return 0;
3364}
3365
3366////////////////////////////////////////////////////////////////////////////////
3367/// Terminate the proof server.
3368
3370{
3371 if (fgLogToSysLog > 0) {
3372 TString s;
3373 s.Form("%s -1 %.3f %.3f %d", fgSysLogEntity.Data(), fRealTime, fCpuTime, status);
3375 }
3376
3377 // Notify the memory footprint
3378 ProcInfo_t pi;
3379 if (!gSystem->GetProcInfo(&pi)){
3380 Info("Terminate", "process memory footprint: %ld/%ld kB virtual, %ld/%ld kB resident ",
3382 }
3383
3384 // Cleanup session directory
3385 if (status == 0) {
3386 // make sure we remain in a "connected" directory
3388 // needed in case fSessionDir is on NFS ?!
3389 gSystem->MakeDirectory(fSessionDir+"/.delete");
3391 }
3392
3393 // Cleanup queries directory if empty
3394 if (IsMaster()) {
3395 if (!(fQMgr && fQMgr->Queries() && fQMgr->Queries()->GetSize())) {
3396 // make sure we remain in a "connected" directory
3398 // needed in case fQueryDir is on NFS ?!
3399 gSystem->MakeDirectory(fQueryDir+"/.delete");
3401 // Remove lock file
3402 if (fQueryLock)
3404 }
3405
3406 // Unlock the query dir owned by this session
3407 if (fQueryLock)
3408 fQueryLock->Unlock();
3409 }
3410
3411 // Cleanup data directory if empty
3414 Info("Terminate", "data directory '%s' has been removed", fDataDir.Data());
3415 }
3416
3417 // Remove input handler to avoid spurious signals in socket
3418 // selection for closing activities executed upon exit()
3420 TObject *fh = 0;
3421 while ((fh = next())) {
3422 TProofServInputHandler *ih = dynamic_cast<TProofServInputHandler *>(fh);
3423 if (ih)
3425 }
3426
3427 // Stop processing events
3428 gSystem->ExitLoop();
3429
3430 // Exit() is called in pmain
3431}
3432
3433////////////////////////////////////////////////////////////////////////////////
3434/// Scan recursively the datadir and unlink it if empty
3435/// Return kTRUE if it can be unlinked, kFALSE otherwise
3436
3438{
3439 if (!path || strlen(path) <= 0) return kFALSE;
3440
3441 Bool_t dorm = kTRUE;
3442 void *dirp = gSystem->OpenDirectory(path);
3443 if (dirp) {
3444 TString fpath;
3445 const char *ent = 0;
3446 while (dorm && (ent = gSystem->GetDirEntry(dirp))) {
3447 if (!strcmp(ent, ".") || !strcmp(ent, "..")) continue;
3448 fpath.Form("%s/%s", path, ent);
3449 FileStat_t st;
3450 if (gSystem->GetPathInfo(fpath, st) == 0 && R_ISDIR(st.fMode)) {
3451 dorm = UnlinkDataDir(fpath);
3452 } else {
3453 dorm = kFALSE;
3454 }
3455 }
3456 // Close the directory
3457 gSystem->FreeDirectory(dirp);
3458 } else {
3459 // Cannot open the directory
3460 dorm = kFALSE;
3461 }
3462
3463 // Do remove, if required
3464 if (dorm && gSystem->Unlink(path) != 0)
3465 Warning("UnlinkDataDir", "data directory '%s' is empty but could not be removed", path);
3466 // done
3467 return dorm;
3468}
3469
3470////////////////////////////////////////////////////////////////////////////////
3471/// Static function that returns kTRUE in case we are a PROOF server.
3472
3474{
3475 return gProofServ ? kTRUE : kFALSE;
3476}
3477
3478////////////////////////////////////////////////////////////////////////////////
3479/// Static function returning pointer to global object gProofServ.
3480/// Mainly for use via CINT, where the gProofServ symbol might be
3481/// deleted from the symbol table.
3482
3484{
3485 return gProofServ;
3486}
3487
3488////////////////////////////////////////////////////////////////////////////////
3489/// Setup authentication related stuff for old versions.
3490/// Provided for backward compatibility.
3491
3493{
3494 OldProofServAuthSetup_t oldAuthSetupHook = 0;
3495
3496 if (!oldAuthSetupHook) {
3497 // Load libraries needed for (server) authentication ...
3498 TString authlib = "libRootAuth";
3499 char *p = 0;
3500 // The generic one
3501 if ((p = gSystem->DynamicPathName(authlib, kTRUE))) {
3502 delete[] p;
3503 if (gSystem->Load(authlib) == -1) {
3504 Error("OldAuthSetup", "can't load %s",authlib.Data());
3505 return kFALSE;
3506 }
3507 } else {
3508 Error("OldAuthSetup", "can't locate %s",authlib.Data());
3509 return -1;
3510 }
3511 //
3512 // Locate OldProofServAuthSetup
3513 Func_t f = gSystem->DynFindSymbol(authlib,"OldProofServAuthSetup");
3514 if (f)
3515 oldAuthSetupHook = (OldProofServAuthSetup_t)(f);
3516 else {
3517 Error("OldAuthSetup", "can't find OldProofServAuthSetup");
3518 return -1;
3519 }
3520 }
3521 //
3522 // Setup
3523 return (*oldAuthSetupHook)(fSocket, IsMaster(), fProtocol,
3524 fUser, fOrdinal, conf);
3525}
3526
3527////////////////////////////////////////////////////////////////////////////////
3528/// Create a TProofQueryResult instance for this query.
3529
3531 const char *opt,
3532 TList *inlist, Long64_t fst,
3533 TDSet *dset, const char *selec,
3534 TObject *elist)
3535{
3536 // Increment sequential number
3537 Int_t seqnum = -1;
3538 if (fQMgr) {
3540 seqnum = fQMgr->SeqNum();
3541 }
3542
3543 // Locally we always use the current streamer
3544 Bool_t olds = (dset && dset->TestBit(TDSet::kWriteV3)) ? kTRUE : kFALSE;
3545 if (olds)
3546 dset->SetWriteV3(kFALSE);
3547
3548 // Create the instance and add it to the list
3549 TProofQueryResult *pqr = new TProofQueryResult(seqnum, opt, inlist, nent,
3550 fst, dset, selec, elist);
3551 // Title is the session identifier
3553
3554 // Restore old streamer info
3555 if (olds)
3556 dset->SetWriteV3(kTRUE);
3557
3558 return pqr;
3559}
3560
3561////////////////////////////////////////////////////////////////////////////////
3562/// Set query in running state.
3563
3565{
3566 // Record current position in the log file at start
3567 fflush(stdout);
3568 Int_t startlog = lseek(fileno(stdout), (off_t) 0, SEEK_END);
3569
3570 // Add some header to logs
3571 Printf(" ");
3572 Info("SetQueryRunning", "starting query: %d", pq->GetSeqNum());
3573
3574 // Build the list of loaded PAR packages
3575 TString parlist = "";
3576 fPackMgr->GetEnabledPackages(parlist);
3577
3578 if (fProof) {
3579 // Set in running state
3580 pq->SetRunning(startlog, parlist, fProof->GetParallel());
3581
3582 // Bytes and CPU at start (we will calculate the differential at end)
3583 pq->SetProcessInfo(pq->GetEntries(),
3585 } else {
3586 // Set in running state
3587 pq->SetRunning(startlog, parlist, -1);
3588
3589 // Bytes and CPU at start (we will calculate the differential at end)
3590 pq->SetProcessInfo(pq->GetEntries(), float(0.), 0);
3591 }
3592}
3593
3594////////////////////////////////////////////////////////////////////////////////
3595/// Handle archive request.
3596
3598{
3599 PDB(kGlobal, 1)
3600 Info("HandleArchive", "Enter");
3601
3602 TString queryref;
3603 TString path;
3604 (*mess) >> queryref >> path;
3605
3606 if (slb) slb->Form("%s %s", queryref.Data(), path.Data());
3607
3608 // If this is a set default action just save the default
3609 if (queryref == "Default") {
3610 fArchivePath = path;
3611 Info("HandleArchive",
3612 "default path set to %s", fArchivePath.Data());
3613 return;
3614 }
3615
3616 Int_t qry = -1;
3617 TString qdir;
3618 TProofQueryResult *pqr = fQMgr ? fQMgr->LocateQuery(queryref, qry, qdir) : 0;
3619 TProofQueryResult *pqm = pqr;
3620
3621 if (path.Length() <= 0) {
3622 if (fArchivePath.Length() <= 0) {
3623 Info("HandleArchive",
3624 "archive paths are not defined - do nothing");
3625 return;
3626 }
3627 if (qry > 0) {
3628 path.Form("%s/session-%s-%d.root",
3630 } else {
3631 path = queryref;
3632 path.ReplaceAll(":q","-");
3633 path.Insert(0, TString::Format("%s/",fArchivePath.Data()));
3634 path += ".root";
3635 }
3636 }
3637
3638 // Build file name for specific query
3639 if (!pqr || qry < 0) {
3640 TString fout = qdir;
3641 fout += "/query-result.root";
3642
3643 TFile *f = TFile::Open(fout,"READ");
3644 pqr = 0;
3645 if (f) {
3646 f->ReadKeys();
3647 TIter nxk(f->GetListOfKeys());
3648 TKey *k = 0;
3649 while ((k = (TKey *)nxk())) {
3650 if (!strcmp(k->GetClassName(), "TProofQueryResult")) {
3651 pqr = (TProofQueryResult *) f->Get(k->GetName());
3652 if (pqr)
3653 break;
3654 }
3655 }
3656 f->Close();
3657 delete f;
3658 } else {
3659 Info("HandleArchive",
3660 "file cannot be open (%s)",fout.Data());
3661 return;
3662 }
3663 }
3664
3665 if (pqr) {
3666
3667 PDB(kGlobal, 1) Info("HandleArchive",
3668 "archive path for query #%d: %s",
3669 qry, path.Data());
3670 TFile *farc = 0;
3671 if (gSystem->AccessPathName(path))
3672 farc = TFile::Open(path,"NEW");
3673 else
3674 farc = TFile::Open(path,"UPDATE");
3675 if (!farc || !(farc->IsOpen())) {
3676 Info("HandleArchive",
3677 "archive file cannot be open (%s)",path.Data());
3678 return;
3679 }
3680 farc->cd();
3681
3682 // Update query status
3683 pqr->SetArchived(path);
3684 if (pqm)
3685 pqm->SetArchived(path);
3686
3687 // Write to file
3688 pqr->Write();
3689
3690 // Update temporary files too
3691 if (qry > -1 && fQMgr)
3692 fQMgr->SaveQuery(pqr);
3693
3694 // Notify
3695 Info("HandleArchive",
3696 "results of query %s archived to file %s",
3697 queryref.Data(), path.Data());
3698 }
3699
3700 // Done
3701 return;
3702}
3703
3704////////////////////////////////////////////////////////////////////////////////
3705/// Get a map {server-name, list-of-files} for collection 'fc' to be used in
3706/// TPacketizerFile. Returns a pointer to the map (ownership of the caller).
3707/// Or (TMap *)0 and an error message in emsg.
3708
3710{
3711 TMap *fcmap = 0;
3712 emsg = "";
3713
3714 // Sanity checks
3715 if (!fc) {
3716 emsg.Form("file collection undefined!");
3717 return fcmap;
3718 }
3719
3720 // Prepare data set map
3721 fcmap = new TMap();
3722
3723 TIter nxf(fc->GetList());
3724 TFileInfo *fiind = 0;
3725 TString key;
3726 while ((fiind = (TFileInfo *)nxf())) {
3727 TUrl *xurl = fiind->GetCurrentUrl();
3728 // Find the key for this server
3729 key.Form("%s://%s", xurl->GetProtocol(), xurl->GetHostFQDN());
3730 if (xurl->GetPort() > 0)
3731 key += TString::Format(":%d", xurl->GetPort());
3732 // Get the map entry for this key
3733 TPair *ent = 0;
3734 THashList* l = 0;
3735 if ((ent = (TPair *) fcmap->FindObject(key.Data()))) {
3736 // Attach to the list
3737 l = (THashList *) ent->Value();
3738 } else {
3739 // Create list
3740 l = new THashList;
3741 l->SetOwner(kTRUE);
3742 // Add it to the map
3743 fcmap->Add(new TObjString(key.Data()), l);
3744 }
3745 // Add fileinfo with index to list
3746 l->Add(fiind);
3747 }
3748
3749 // Done
3750 return fcmap;
3751}
3752
3753////////////////////////////////////////////////////////////////////////////////
3754/// Handle processing request.
3755
3757{
3758 PDB(kGlobal, 1)
3759 Info("HandleProcess", "Enter");
3760
3761 // Nothing to do for slaves if we are not idle
3762 if (!IsTopMaster() && !IsIdle())
3763 return;
3764
3765 TDSet *dset;
3766 TString filename, opt;
3767 TList *input;
3768 Long64_t nentries, first;
3769 TEventList *evl = 0;
3770 TEntryList *enl = 0;
3771 Bool_t sync;
3772
3773 (*mess) >> dset >> filename >> input >> opt >> nentries >> first >> evl >> sync;
3774 // Get entry list information, if any (support started with fProtocol == 15)
3775 if ((mess->BufferSize() > mess->Length()) && fProtocol > 14)
3776 (*mess) >> enl;
3777 Bool_t hasNoData = (!dset || dset->TestBit(TDSet::kEmpty)) ? kTRUE : kFALSE;
3778
3779 // Priority to the entry list
3780 TObject *elist = (enl) ? (TObject *)enl : (TObject *)evl;
3781 if (enl && evl)
3782 // Cannot specify both at the same time
3783 SafeDelete(evl);
3784 if ((!hasNoData) && elist)
3785 dset->SetEntryList(elist);
3786
3787 if (IsTopMaster()) {
3788
3789 TString emsg;
3790 // Make sure the dataset contains the information needed
3791 if ((!hasNoData) && dset->GetListOfElements()->GetSize() == 0) {
3792 if (TProof::AssertDataSet(dset, input, fDataSetManager, emsg) != 0) {
3793 SendAsynMessage(TString::Format("AssertDataSet on %s: %s",
3794 fPrefix.Data(), emsg.Data()));
3795 Error("HandleProcess", "AssertDataSet: %s", emsg.Data());
3796 // To terminate collection
3797 if (sync) SendLogFile();
3798 return;
3799 }
3800 } else if (hasNoData) {
3801 // Check if we are required to process with TPacketizerFile a registered dataset
3802 TNamed *ftp = dynamic_cast<TNamed *>(input->FindObject("PROOF_FilesToProcess"));
3803 if (ftp) {
3804 TString dsn(ftp->GetTitle());
3805 if (!dsn.Contains(":") || dsn.BeginsWith("dataset:")) {
3806 dsn.ReplaceAll("dataset:", "");
3807 // Get the map for TPacketizerFile
3808 // Make sure we have something in input and a dataset manager
3809 if (!fDataSetManager) {
3810 emsg.Form("dataset manager not initialized!");
3811 } else {
3812 TFileCollection *fc = 0;
3813 // Get the dataset
3814 if (!(fc = fDataSetManager->GetDataSet(dsn))) {
3815 emsg.Form("requested dataset '%s' does not exists", dsn.Data());
3816 } else {
3817 TMap *fcmap = GetDataSetNodeMap(fc, emsg);
3818 if (fcmap) {
3819 input->Remove(ftp);
3820 delete ftp;
3821 fcmap->SetOwner(kTRUE);
3822 fcmap->SetName("PROOF_FilesToProcess");
3823 input->Add(fcmap);
3824 }
3825 }
3826 }
3827 if (!emsg.IsNull()) {
3828 SendAsynMessage(TString::Format("HandleProcess on %s: %s",
3829 fPrefix.Data(), emsg.Data()));
3830 Error("HandleProcess", "%s", emsg.Data());
3831 // To terminate collection
3832 if (sync) SendLogFile();
3833 return;
3834 }
3835 }
3836 }
3837 }
3838
3839 TProofQueryResult *pq = 0;
3840
3841 // Create instance of query results; we set ownership of the input list
3842 // to the TQueryResult object, to avoid too many instantiations
3843 pq = MakeQueryResult(nentries, opt, 0, first, 0, filename, 0);
3844
3845 // Prepare the input list and transfer it into the TQueryResult object
3846 if (dset) input->Add(dset);
3847 if (elist) input->Add(elist);
3848 pq->SetInputList(input, kTRUE);
3849
3850 // Clear the list
3851 input->Clear("nodelete");
3853
3854 // Save input data, if any
3855 if (TProof::SaveInputData(pq, fCacheDir.Data(), emsg) != 0)
3856 Warning("HandleProcess", "could not save input data: %s", emsg.Data());
3857
3858 // If not a draw action add the query to the main list
3859 if (!(pq->IsDraw())) {
3860 if (fQMgr) {
3861 if (fQMgr->Queries()) fQMgr->Queries()->Add(pq);
3862 // Also save it to queries dir
3863 fQMgr->SaveQuery(pq);
3864 }
3865 }
3866
3867 // Add anyhow to the waiting lists
3868 QueueQuery(pq);
3869
3870 // Call get Workers
3871 // if we are not idle the scheduler will just enqueue the query and
3872 // send a resume message later.
3873
3874 Bool_t enqueued = kFALSE;
3875 Int_t pc = 0;
3876 // if the session does not have workers and is in the dynamic mode
3877 if (fProof->UseDynamicStartup()) {
3878 // get the a list of workers and start them
3879 TList* workerList = new TList();
3880 EQueryAction retVal = GetWorkers(workerList, pc);
3881 if (retVal == TProofServ::kQueryStop) {
3882 Error("HandleProcess", "error getting list of worker nodes");
3883 // To terminate collection
3884 if (sync) SendLogFile();
3885 return;
3886 } else if (retVal == TProofServ::kQueryEnqueued) {
3887 // change to an asynchronous query
3888 enqueued = kTRUE;
3889 Info("HandleProcess", "query %d enqueued", pq->GetSeqNum());
3890 } else {
3891 Int_t ret = fProof->AddWorkers(workerList);
3892 if (ret < 0) {
3893 Error("HandleProcess", "Adding a list of worker nodes returned: %d",
3894 ret);
3895 // To terminate collection
3896 if (sync) SendLogFile();
3897 return;
3898 }
3899 }
3900 } else {
3901 EQueryAction retVal = GetWorkers(0, pc);
3902 if (retVal == TProofServ::kQueryStop) {
3903 Error("HandleProcess", "error getting list of worker nodes");
3904 // To terminate collection
3905 if (sync) SendLogFile();
3906 return;
3907 } else if (retVal == TProofServ::kQueryEnqueued) {
3908 // change to an asynchronous query
3909 enqueued = kTRUE;
3910 Info("HandleProcess", "query %d enqueued", pq->GetSeqNum());
3911 } else if (retVal != TProofServ::kQueryOK) {
3912 Error("HandleProcess", "unknown return value: %d", retVal);
3913 // To terminate collection
3914 if (sync) SendLogFile();
3915 return;
3916 }
3917 }
3918
3919 // If the client submission was asynchronous, signal the submission of
3920 // the query and communicate the assigned sequential number for later
3921 // identification
3923 if (!sync || enqueued) {
3924 m << pq->GetSeqNum() << kFALSE;
3925 fSocket->Send(m);
3926 }
3927
3928 // Nothing more to do if we are not idle
3929 if (!IsIdle()) {
3930 // Notify submission
3931 Info("HandleProcess",
3932 "query \"%s:%s\" submitted", pq->GetTitle(), pq->GetName());
3933 return;
3934 }
3935
3936 // Process
3937 // in the static mode, if a session is enqueued it will be processed after current query
3938 // (there is no way to enqueue if idle).
3939 // in the dynamic mode we will process here only if the session was idle and got workers!
3940 Bool_t doprocess = kFALSE;
3941 while (WaitingQueries() > 0 && !enqueued) {
3942 doprocess = kTRUE;
3943 //
3944 ProcessNext(slb);
3945 // avoid processing async queries sent during processing in dyn mode
3947 enqueued = kTRUE;
3948
3949 } // Loop on submitted queries
3950
3951 // Set idle
3952 SetIdle(kTRUE);
3953
3954 // Reset mergers
3956
3957 // kPROOF_SETIDLE sets the client to idle; in asynchronous mode clients monitor
3958 // TProof::IsIdle for to check the readiness of a query, so we need to send this
3959 // before to be sure thatn everything about a query is received by the client
3960 if (!sync) SendLogFile();
3961
3962 // Signal the client that we are idle
3963 if (doprocess) {
3964 m.Reset(kPROOF_SETIDLE);
3965 Bool_t waiting = (WaitingQueries() > 0) ? kTRUE : kFALSE;
3966 m << waiting;
3967 fSocket->Send(m);
3968 }
3969
3970 // In synchronous mode TProof::Collect is terminated by the reception of the
3971 // log file and subsequent submissions are controlled by TProof::IsIdle(), so
3972 // this must be last one to be sent
3973 if (sync) SendLogFile();
3974
3975 // Set idle
3976 SetIdle(kTRUE);
3977
3978 } else {
3979
3980 // Reset compute stopwatch: we include all what done from now on
3981 fCompute.Reset();
3982 fCompute.Start();
3983
3984 // Set not idle
3985 SetIdle(kFALSE);
3986
3987 // Cleanup the player
3988 Bool_t deleteplayer = kTRUE;
3989 MakePlayer();
3990
3991 // Setup data set
3992 if (dset && (dset->IsA() == TDSetProxy::Class()))
3993 ((TDSetProxy*)dset)->SetProofServ(this);
3994
3995 // Get input data, if any
3996 TString emsg;
3997 if (TProof::GetInputData(input, fCacheDir.Data(), emsg) != 0)
3998 Warning("HandleProcess", "could not get input data: %s", emsg.Data());
3999
4000 // Get query sequential number
4001 if (TProof::GetParameter(input, "PROOF_QuerySeqNum", fQuerySeqNum) != 0)
4002 Warning("HandleProcess", "could not get query sequential number!");
4003
4004 // Make the ordinal number available in the selector
4005 TObject *nord = 0;
4006 while ((nord = input->FindObject("PROOF_Ordinal")))
4007 input->Remove(nord);
4008 input->Add(new TNamed("PROOF_Ordinal", GetOrdinal()));
4009
4010 // Set input
4011 TIter next(input);
4012 TObject *o = 0;
4013 while ((o = next())) {
4014 PDB(kGlobal, 2) Info("HandleProcess", "adding: %s", o->GetName());
4015 fPlayer->AddInput(o);
4016 }
4017
4018 // Check if a TSelector object is passed via input list
4019 TObject *obj = 0;
4020 TSelector *selector_obj = 0;
4021 TIter nxt(input);
4022 while ((obj = nxt())){
4023 if (obj->InheritsFrom("TSelector")) {
4024 selector_obj = (TSelector *) obj;
4025 filename = selector_obj->ClassName();
4026 Info("HandleProcess", "selector obj for '%s' found", selector_obj->ClassName());
4027 break;
4028 }
4029 }
4030
4031 // Signal the master that we are starting processing
4033
4034 // Reset latency stopwatch
4035 fLatency.Reset();
4037
4038 // Process
4039 PDB(kGlobal, 1) Info("HandleProcess", "calling %s::Process()", fPlayer->IsA()->GetName());
4040
4041 if (selector_obj){
4042 Info("HandleProcess", "calling fPlayer->Process() with selector object: %s", selector_obj->ClassName());
4043 fPlayer->Process(dset, selector_obj, opt, nentries, first);
4044 }
4045 else {
4046 Info("HandleProcess", "calling fPlayer->Process() with selector name: %s", filename.Data());
4047 fPlayer->Process(dset, filename, opt, nentries, first);
4048 }
4049
4050 // Return number of events processed
4053 if (fProtocol > 18) {
4054 TProofProgressStatus* status =
4056 gPerfStats?gPerfStats->GetBytesRead():0);
4057 if (status)
4058 m << status << abort;
4059 if (slb)
4060 slb->Form("%d %lld %lld", fPlayer->GetExitStatus(),
4061 status->GetEntries(), status->GetBytesRead());
4062 SafeDelete(status);
4063 } else {
4064 m << fPlayer->GetEventsProcessed() << abort;
4065 if (slb)
4066 slb->Form("%d %lld -1", fPlayer->GetExitStatus(), fPlayer->GetEventsProcessed());
4067 }
4068
4069 fSocket->Send(m);
4070 PDB(kGlobal, 2)
4071 Info("TProofServ::Handleprocess",
4072 "worker %s has finished processing with %d objects in output list",
4074
4075 // Cleanup the input data set info
4076 SafeDelete(dset);
4077 SafeDelete(enl);
4078 SafeDelete(evl);
4079
4082 if (outok) {
4083 // Check if in controlled output sending mode or submerging
4084 Int_t cso = 0;
4085 Bool_t isSubMerging = kFALSE;
4086
4087 // Check if we are in merging mode (i.e. parameter PROOF_UseMergers exists)
4088 Int_t nm = 0;
4089 if (TProof::GetParameter(input, "PROOF_UseMergers", nm) == 0) {
4090 isSubMerging = (nm >= 0) ? kTRUE : kFALSE;
4091 }
4092 if (!isSubMerging) {
4093 cso = gEnv->GetValue("Proof.ControlSendOutput", 1);
4094 if (TProof::GetParameter(input, "PROOF_ControlSendOutput", cso) != 0)
4095 cso = gEnv->GetValue("Proof.ControlSendOutput", 1);
4096 }
4097
4098 if (cso > 0) {
4099
4100 // Control output sending mode: wait for the master to ask for the objects.
4101 // Allows controls of memory usage on the master.
4103 fSocket->Send(msg);
4104
4105 // Set idle
4106 SetIdle(kTRUE);
4107
4108 // Do not cleanup the player yet: it will be used in sending output activities
4109 deleteplayer = kFALSE;
4110
4111 PDB(kGlobal, 1)
4112 Info("HandleProcess", "controlled mode: worker %s has finished,"
4113 " sizes sent to master", fOrdinal.Data());
4114 } else {
4115
4116 // Check if we are in merging mode (i.e. parameter PROOF_UseMergers exists)
4118 if (isSubMerging)
4119 Info("HandleProcess", "submerging disabled because of high-memory case");
4120 isSubMerging = kFALSE;
4121 } else {
4122 PDB(kGlobal, 2) Info("HandleProcess", "merging mode check: %d", isSubMerging);
4123 }
4124
4125 if (!IsMaster() && isSubMerging) {
4126 // Worker in merging mode.
4127 //----------------------------
4128 // First, it reports only the size of its output to the master
4129 // + port on which it can possibly accept outputs from other workers if it becomes a merger
4130 // Master will later tell it where it should send the output (either to the master or to some merger)
4131 // or if it should become a merger
4132
4133 TMessage msg_osize(kPROOF_SUBMERGER);
4134 msg_osize << Int_t(TProof::kOutputSize);
4135 msg_osize << fPlayer->GetOutputList()->GetEntries();
4136
4138 Int_t merge_port = 0;
4139 if (fMergingSocket) {
4140 PDB(kGlobal, 2)
4141 Info("HandleProcess", "possible port for merging connections: %d",
4143 merge_port = fMergingSocket->GetLocalPort();
4144 }
4145 msg_osize << merge_port;
4146 fSocket->Send(msg_osize);
4147
4148 // Set idle
4149 SetIdle(kTRUE);
4150
4151 // Do not cleanup the player yet: it will be used in sub-merging activities
4152 deleteplayer = kFALSE;
4153
4154 PDB(kSubmerger, 2) Info("HandleProcess", "worker %s has finished", fOrdinal.Data());
4155
4156 } else {
4157 // Sub-master OR worker not in merging mode
4158 // ---------------------------------------------
4159 PDB(kGlobal, 2) Info("HandleProcess", "sending result directly to master");
4161 Warning("HandleProcess","problems sending output list");
4162
4163 // Masters reset the mergers, if any
4164 if (IsMaster()) fProof->ResetMergers();
4165
4166 // Signal the master that we are idle
4168
4169 // Set idle
4170 SetIdle(kTRUE);
4171
4172 // Notify the user
4173 SendLogFile();
4174 }
4175
4176
4177
4178 }
4179
4180 } else {
4181 // No output list
4183 Warning("HandleProcess","the output list is empty!");
4184 if (SendResults(fSocket) != 0)
4185 Warning("HandleProcess", "problems sending output list");
4186
4187 // Masters reset the mergers, if any
4188 if (IsMaster()) fProof->ResetMergers();
4189
4190 // Signal the master that we are idle
4192
4193 // Set idle
4194 SetIdle(kTRUE);
4195
4196 // Notify the user
4197 SendLogFile();
4198 }
4199
4200 // Prevent from double-deleting in input
4201 TIter nex(input);
4202 while ((obj = nex())) {
4203 if (obj->InheritsFrom("TSelector")) input->Remove(obj);
4204 }
4205
4206 // Make also sure the input list objects are deleted
4208
4209 // Remove possible inputs from a file and the file, if any
4210 TList *added = dynamic_cast<TList *>(input->FindObject("PROOF_InputObjsFromFile"));
4211 if (added) {
4212 if (added->GetSize() > 0) {
4213 // The file must be the last one
4214 TFile *f = dynamic_cast<TFile *>(added->Last());
4215 if (f) {
4216 added->Remove(f);
4217 TIter nxo(added);
4218 while ((o = nxo())) { input->Remove(o); }
4219 input->Remove(added);
4220 added->SetOwner(kFALSE);
4221 added->Clear();
4222 f->Close();
4223 delete f;
4224 }
4225 }
4226 SafeDelete(added);
4227 }
4228 input->SetOwner();
4230
4231 // Cleanup if required
4232 if (deleteplayer) DeletePlayer();
4233 }
4234
4235 PDB(kGlobal, 1) Info("HandleProcess", "done");
4236
4237 // Done
4238 return;
4239}
4240
4241////////////////////////////////////////////////////////////////////////////////
4242/// Sends all objects from the given list to the specified socket
4243
4245{
4246 PDB(kOutput, 2) Info("SendResults", "enter");
4247
4248 TString msg;
4249 if (fProtocol > 23 && outlist) {
4250 // Send objects in bunches of max fMsgSizeHWM bytes to optimize transfer
4251 // Objects are merged one-by-one by the client
4252 // Messages for objects
4254 // Objects in the output list
4255 Int_t olsz = outlist->GetSize();
4256 if (IsTopMaster() && pq) {
4257 msg.Form("%s: merging output objects ... done ",
4258 fPrefix.Data());
4259 SendAsynMessage(msg.Data());
4260 // Message for the client
4261 msg.Form("%s: objects merged; sending output: %d objs", fPrefix.Data(), olsz);
4262 SendAsynMessage(msg.Data(), kFALSE);
4263 // Send light query info
4264 mbuf << (Int_t) 0;
4265 mbuf.WriteObject(pq);
4266 if (sock->Send(mbuf) < 0) return -1;
4267 }
4268 // Objects in the output list
4269 Int_t ns = 0, np = 0;
4270 TIter nxo(outlist);
4271 TObject *o = 0;
4272 Int_t totsz = 0, objsz = 0;
4273 mbuf.Reset();
4274 while ((o = nxo())) {
4275 if (mbuf.Length() > fMsgSizeHWM) {
4276 PDB(kOutput, 1)
4277 Info("SendResults",
4278 "message has %d bytes: limit of %lld bytes reached - sending ...",
4279 mbuf.Length(), fMsgSizeHWM);
4280 // Compress the message, if required; for these messages we do it already
4281 // here so we get the size; TXSocket does not do it twice.
4282 if (GetCompressionLevel() > 0) {
4284 mbuf.Compress();
4285 objsz = mbuf.CompLength();
4286 } else {
4287 objsz = mbuf.Length();
4288 }
4289 totsz += objsz;
4290 if (IsTopMaster()) {
4291 msg.Form("%s: objects merged; sending obj %d/%d (%d bytes) ",
4292 fPrefix.Data(), ns, olsz, objsz);
4293 SendAsynMessage(msg.Data(), kFALSE);
4294 }
4295 if (sock->Send(mbuf) < 0) return -1;
4296 // Reset the message
4297 mbuf.Reset();
4298 np = 0;
4299 }
4300 ns++;
4301 np++;
4302 mbuf << (Int_t) ((ns >= olsz) ? 2 : 1);
4303 mbuf << o;
4304 }
4305 if (np > 0) {
4306 // Compress the message, if required; for these messages we do it already
4307 // here so we get the size; TXSocket does not do it twice.
4308 if (GetCompressionLevel() > 0) {
4310 mbuf.Compress();
4311 objsz = mbuf.CompLength();
4312 } else {
4313 objsz = mbuf.Length();
4314 }
4315 totsz += objsz;
4316 if (IsTopMaster()) {
4317 msg.Form("%s: objects merged; sending obj %d/%d (%d bytes) ",
4318 fPrefix.Data(), ns, olsz, objsz);
4319 SendAsynMessage(msg.Data(), kFALSE);
4320 }
4321 if (sock->Send(mbuf) < 0) return -1;
4322 }
4323 if (IsTopMaster()) {
4324 // Send total size
4325 msg.Form("%s: grand total: sent %d objects, size: %d bytes ",
4326 fPrefix.Data(), olsz, totsz);
4327 SendAsynMessage(msg.Data());
4328 }
4329 } else if (fProtocol > 10 && outlist) {
4330
4331 // Send objects one-by-one to optimize transfer and merging
4332 // Messages for objects
4334 // Objects in the output list
4335 Int_t olsz = outlist->GetSize();
4336 if (IsTopMaster() && pq) {
4337 msg.Form("%s: merging output objects ... done ",
4338 fPrefix.Data());
4339 SendAsynMessage(msg.Data());
4340 // Message for the client
4341 msg.Form("%s: objects merged; sending output: %d objs", fPrefix.Data(), olsz);
4342 SendAsynMessage(msg.Data(), kFALSE);
4343 // Send light query info
4344 mbuf << (Int_t) 0;
4345 mbuf.WriteObject(pq);
4346 if (sock->Send(mbuf) < 0) return -1;
4347 }
4348
4349 Int_t ns = 0;
4350 Int_t totsz = 0, objsz = 0;
4351 TIter nxo(fPlayer->GetOutputList());
4352 TObject *o = 0;
4353 while ((o = nxo())) {
4354 ns++;
4355 mbuf.Reset();
4356 Int_t type = (Int_t) ((ns >= olsz) ? 2 : 1);
4357 mbuf << type;
4358 mbuf.WriteObject(o);
4359 // Compress the message, if required; for these messages we do it already
4360 // here so we get the size; TXSocket does not do it twice.
4361 if (GetCompressionLevel() > 0) {
4363 mbuf.Compress();
4364 objsz = mbuf.CompLength();
4365 } else {
4366 objsz = mbuf.Length();
4367 }
4368 totsz += objsz;
4369 if (IsTopMaster()) {
4370 msg.Form("%s: objects merged; sending obj %d/%d (%d bytes) ",
4371 fPrefix.Data(), ns, olsz, objsz);
4372 SendAsynMessage(msg.Data(), kFALSE);
4373 }
4374 if (sock->Send(mbuf) < 0) return -1;
4375 }
4376 // Total size
4377 if (IsTopMaster()) {
4378 // Send total size
4379 msg.Form("%s: grand total: sent %d objects, size: %d bytes ",
4380 fPrefix.Data(), olsz, totsz);
4381 SendAsynMessage(msg.Data());
4382 }
4383
4384 } else if (IsTopMaster() && fProtocol > 6 && outlist) {
4385
4386 // Buffer to be sent
4388 mbuf.WriteObject(pq);
4389 // Sizes
4390 Int_t blen = mbuf.CompLength();
4391 Int_t olsz = outlist->GetSize();
4392 // Message for the client
4393 msg.Form("%s: sending output: %d objs, %d bytes", fPrefix.Data(), olsz, blen);
4394 SendAsynMessage(msg.Data(), kFALSE);
4395 if (sock->Send(mbuf) < 0) return -1;
4396
4397 } else {
4398 if (outlist) {
4399 PDB(kGlobal, 2) Info("SendResults", "sending output list");
4400 } else {
4401 PDB(kGlobal, 2) Info("SendResults", "notifying failure or abort");
4402 }
4403 if (sock->SendObject(outlist, kPROOF_OUTPUTLIST) < 0) return -1;
4404 }
4405
4406 PDB(kOutput,2) Info("SendResults", "done");
4407
4408 // Done
4409 return 0;
4410}
4411
4412////////////////////////////////////////////////////////////////////////////////
4413/// process the next query from the queue of submitted jobs.
4414/// to be called on the top master only.
4415
4417{
4418 TDSet *dset = 0;
4419 TString filename, opt;
4420 TList *input = 0;
4421 Long64_t nentries = -1, first = 0;
4422
4423 // TObject *elist = 0;
4424 TProofQueryResult *pq = 0;
4425
4426 TObject* obj = 0;
4427 TSelector* selector_obj = 0;
4428
4429 // Process
4430
4431 // Reset compute stopwatch: we include all what done from now on
4432 fCompute.Reset();
4433 fCompute.Start();
4434
4435 // Get next query info (also removes query from the list)
4436 pq = NextQuery();
4437 if (pq) {
4438
4439 // Set not idle
4440 SetIdle(kFALSE);
4441 opt = pq->GetOptions();
4442 input = pq->GetInputList();
4443 nentries = pq->GetEntries();
4444 first = pq->GetFirst();
4445 filename = pq->GetSelecImp()->GetName();
4446 Ssiz_t id = opt.Last('#');
4447 if (id != kNPOS && id < opt.Length() - 1) {
4448 filename += opt(id + 1, opt.Length());
4449 // Remove it from 'opt' so user found on the workers what they specified
4450 opt.Remove(id);
4451 }
4452 // Attach to data set and entry- (or event-) list (if any)
4453 TObject *o = 0;
4454 if ((o = pq->GetInputObject("TDSet"))) {
4455 dset = (TDSet *) o;
4456 } else {
4457 // Should never get here
4458 Error("ProcessNext", "no TDset object: cannot continue");
4459 return;
4460 }
4461 // elist = 0;
4462 // if ((o = pq->GetInputObject("TEntryList")))
4463 // elist = o;
4464 // else if ((o = pq->GetInputObject("TEventList")))
4465 // elist = o;
4466
4467 // Expand selector files
4468 if (pq->GetSelecImp()) {
4469 gSystem->Exec(TString::Format("%s %s", kRM, pq->GetSelecImp()->GetName()));
4470 pq->GetSelecImp()->SaveSource(pq->GetSelecImp()->GetName());
4471 }
4472 if (pq->GetSelecHdr() &&
4473 !strstr(pq->GetSelecHdr()->GetName(), "TProofDrawHist")) {
4474 gSystem->Exec(TString::Format("%s %s", kRM, pq->GetSelecHdr()->GetName()));
4475 pq->GetSelecHdr()->SaveSource(pq->GetSelecHdr()->GetName());
4476 }
4477
4478 // Taking out a TSelector object from input list
4479 TIter nxt(input);
4480 while ((obj = nxt())){
4481 if (obj->InheritsFrom("TSelector") &&
4482 !strcmp(pq->GetSelecImp()->GetName(), obj->ClassName())) {
4483 selector_obj = (TSelector *) obj;
4484 Info("ProcessNext", "found object for selector '%s'", obj->ClassName());
4485 break;
4486 }
4487 }
4488
4489 } else {
4490 // Should never get here
4491 Error("ProcessNext", "empty waiting queries list!");
4492 return;
4493 }
4494
4495 // Set in running state
4496 SetQueryRunning(pq);
4497
4498 // Save to queries dir, if not standard draw
4499 if (fQMgr) {
4500 if (!(pq->IsDraw()))
4501 fQMgr->SaveQuery(pq);
4502 else
4504 fQMgr->ResetTime();
4505 }
4506
4507 // Signal the client that we are starting a new query
4509 m << TString(pq->GetSelecImp()->GetName())
4510 << dset->GetNumOfFiles()
4511 << pq->GetFirst() << pq->GetEntries();
4512 fSocket->Send(m);
4513
4514 // Create player
4515 MakePlayer();
4516
4517 // Add query results to the player lists
4519
4520 // Set query currently processed
4522
4523 // Setup data set
4524 if (dset->IsA() == TDSetProxy::Class())
4525 ((TDSetProxy*)dset)->SetProofServ(this);
4526
4527 // Add the unique query tag as TNamed object to the input list
4528 // so that it is available in TSelectors for monitoring
4529 TString qid = TString::Format("%s:%s",pq->GetTitle(),pq->GetName());
4530 input->Add(new TNamed("PROOF_QueryTag", qid.Data()));
4531 // ... and the sequential number
4532 fQuerySeqNum = pq->GetSeqNum();
4533 input->Add(new TParameter<Int_t>("PROOF_QuerySeqNum", fQuerySeqNum));
4534
4535 // Check whether we have to enforce the use of submergers, but only if the user did
4536 // not express itself on the subject
4537 if (gEnv->Lookup("Proof.UseMergers") && !input->FindObject("PROOF_UseMergers")) {
4538 Int_t smg = gEnv->GetValue("Proof.UseMergers",-1);
4539 if (smg >= 0) {
4540 input->Add(new TParameter<Int_t>("PROOF_UseMergers", smg));
4541 PDB(kSubmerger, 2) Info("ProcessNext", "PROOF_UseMergers set to %d", smg);
4542 if (gEnv->Lookup("Proof.MergersByHost")) {
4543 Int_t mbh = gEnv->GetValue("Proof.MergersByHost", 0);
4544 if (mbh != 0) {
4545 // Administrator settings have the priority
4546 TObject *o = 0;
4547 if ((o = input->FindObject("PROOF_MergersByHost"))) { input->Remove(o); delete o; }
4548 input->Add(new TParameter<Int_t>("PROOF_MergersByHost", mbh));
4549 PDB(kSubmerger, 2) Info("ProcessNext", "submergers setup by host/node");
4550 }
4551 }
4552 }
4553 }
4554
4555 // Set input
4556 TIter next(input);
4557 TObject *o = 0;
4558 while ((o = next())) {
4559 PDB(kGlobal, 2) Info("ProcessNext", "adding: %s", o->GetName());
4560 fPlayer->AddInput(o);
4561 }
4562
4563 // Remove the list of the missing files from the original list, if any
4564 if ((o = input->FindObject("MissingFiles"))) input->Remove(o);
4565
4566 // Process
4567 PDB(kGlobal, 1) Info("ProcessNext", "calling %s::Process()", fPlayer->IsA()->GetName());
4568 if (selector_obj){
4569 Info("ProcessNext", "calling fPlayer->Process() with selector object: %s", selector_obj->ClassName());
4570 fPlayer->Process(dset, selector_obj, opt, nentries, first);
4571 }
4572 else {
4573 Info("ProcessNext", "calling fPlayer->Process() with selector name: %s", filename.Data());
4574 fPlayer->Process(dset, filename, opt, nentries, first);
4575 }
4576
4577 // This is the end of merging
4579
4580 // Return number of events processed
4581 Bool_t abort =
4584 m.Reset(kPROOF_STOPPROCESS);
4585 // message sent from worker to the master
4586 if (fProtocol > 18) {
4588 m << status << abort;
4589 status = 0; // the status belongs to the player.
4590 } else if (fProtocol > 8) {
4591 m << fPlayer->GetEventsProcessed() << abort;
4592 } else {
4594 }
4595 fSocket->Send(m);
4596 }
4597
4598 // Register any dataset produced during this processing, if required
4600 TNamed *psr = (TNamed *) fPlayer->GetOutputList()->FindObject("PROOFSERV_RegisterDataSet");
4601 if (psr) {
4602 TString emsg;
4604 Warning("ProcessNext", "problems registering produced datasets: %s", emsg.Data());
4605 do {
4606 fPlayer->GetOutputList()->Remove(psr);
4607 delete psr;
4608 } while ((psr = (TNamed *) fPlayer->GetOutputList()->FindObject("PROOFSERV_RegisterDataSet")));
4609 }
4610 }
4611
4612 // Complete filling of the TQueryResult instance
4613 if (fQMgr && !pq->IsDraw()) {
4614 if (!abort) fProof->AskStatistics();
4615 if (fQMgr->FinalizeQuery(pq, fProof, fPlayer))
4617 }
4618
4619 // If we were requested to save results on the master and we are not in save-to-file mode
4620 // then we save the results
4621 if (IsTopMaster() && fPlayer->GetOutputList()) {
4622 Bool_t save = kTRUE;
4623 TIter nxo(fPlayer->GetOutputList());
4624 TObject *xo = 0;
4625 while ((xo = nxo())) {
4626 if (xo->InheritsFrom("TProofOutputFile") && xo->TestBit(TProofOutputFile::kSwapFile)) {
4627 save = kFALSE;
4628 break;
4629 }
4630 }
4631 if (save) {
4632 TNamed *nof = (TNamed *) input->FindObject("PROOF_DefaultOutputOption");