Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
TProofServ.cxx
Go to the documentation of this file.
1// @(#)root/proof:$Id$
2// Author: Fons Rademakers 16/02/97
3
4/*************************************************************************
5 * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12/** \class TProofServ
13\ingroup proofkernel
14
15Class providing the PROOF server. It can act either as the master
16server or as a slave server, depending on its startup arguments. It
17receives and handles message coming from the client or from the
18master server.
19
20*/
21
22#include "RConfigure.h"
23#include <ROOT/RConfig.hxx>
24#include "Riostream.h"
25
26#ifdef WIN32
27 #include <process.h>
28 #include <io.h>
29 #include "snprintf.h"
30 typedef long off_t;
31#endif
32#include <errno.h>
33#include <time.h>
34#include <fcntl.h>
35#include <sys/types.h>
36#include <sys/stat.h>
37#ifndef WIN32
38#include <sys/wait.h>
39#endif
40#include <cstdlib>
41
42// To handle exceptions
43#include <exception>
44#include <new>
45
46using namespace std;
47
48#if (defined(__FreeBSD__) && (__FreeBSD__ < 4)) || \
49 (defined(__APPLE__) && (!defined(MAC_OS_X_VERSION_10_3) || \
50 (MAC_OS_X_VERSION_MAX_ALLOWED < MAC_OS_X_VERSION_10_3)))
51#include <sys/file.h>
52#define lockf(fd, op, sz) flock((fd), (op))
53#ifndef F_LOCK
54#define F_LOCK (LOCK_EX | LOCK_NB)
55#endif
56#ifndef F_ULOCK
57#define F_ULOCK LOCK_UN
58#endif
59#endif
60
61#include "TProofServ.h"
62#include "TDSetProxy.h"
63#include "TEnv.h"
64#include "TError.h"
65#include "TEventList.h"
66#include "TEntryList.h"
67#include "TException.h"
68#include "TFile.h"
69#include "THashList.h"
70#include "TInterpreter.h"
71#include "TKey.h"
72#include "TMessage.h"
73#include "TVirtualPerfStats.h"
74#include "TProofDebug.h"
75#include "TProof.h"
76#include "TVirtualProofPlayer.h"
77#include "TProofQueryResult.h"
78#include "TQueryResultManager.h"
79#include "TRegexp.h"
80#include "TROOT.h"
81#include "TObjArray.h"
82#include "TSocket.h"
83#include "TStopwatch.h"
84#include "TSystem.h"
85#include "TTimeStamp.h"
86#include "TUrl.h"
87#include "TPluginManager.h"
88#include "TObjString.h"
89#include "compiledata.h"
91#include "TProofNodeInfo.h"
92#include "TFileInfo.h"
93#include "TClass.h"
94#include "TSQLServer.h"
95#include "TSQLResult.h"
96#include "TSQLRow.h"
97#include "TPRegexp.h"
98#include "TParameter.h"
99#include "TMap.h"
100#include "TSortedList.h"
101#include "TFileCollection.h"
102#include "TLockFile.h"
103#include "TDataSetManagerFile.h"
104#include "TProofProgressStatus.h"
105#include "TServerSocket.h"
106#include "TMonitor.h"
107#include "TProofOutputFile.h"
108#include "TSelector.h"
109#include "TPackMgr.h"
110
111// global proofserv handle
113
114// debug hook
115static volatile Int_t gProofServDebug = 1;
116
117// Syslog control
120TString TProofServ::fgSysLogEntity("undef:default");
121
122// File where to log: default stderr
124
125// Integrate with crash reporter.
126#ifdef __APPLE__
127extern "C" {
128static const char *__crashreporter_info__ = 0;
129asm(".desc ___crashreporter_info__, 0x10");
130}
131#endif
132
133// To control allowed actions while processing
135
136// Last message and entry before exceptions
139
140// Memory controllers
145
146// Async Logger
147static void SendAsynMsg(const char *msg) {
149}
150
151//----- Termination signal handler ---------------------------------------------
152////////////////////////////////////////////////////////////////////////////////
153
156public:
159 Bool_t Notify() override;
160};
161
162////////////////////////////////////////////////////////////////////////////////
163/// Handle this interrupt
164
166{
167 Printf("Received SIGTERM: terminating");
169 return kTRUE;
170}
171
172//----- Interrupt signal handler -----------------------------------------------
173////////////////////////////////////////////////////////////////////////////////
174
177public:
180 Bool_t Notify() override;
181};
182
183////////////////////////////////////////////////////////////////////////////////
184/// Handle this interrupt
185
187{
189 if (TROOT::Initialized()) {
190 Throw(GetSignal());
191 }
192 return kTRUE;
193}
194
195//----- SigPipe signal handler -------------------------------------------------
196////////////////////////////////////////////////////////////////////////////////
197
200public:
202 { fServ = s; }
203 Bool_t Notify() override;
204};
205
206////////////////////////////////////////////////////////////////////////////////
207/// Handle this signal
208
210{
212 return kTRUE;
213}
214
215//----- Input handler for messages from parent or master -----------------------
216////////////////////////////////////////////////////////////////////////////////
217
220public:
222 { fServ = s; }
223 Bool_t Notify() override;
224 Bool_t ReadNotify() override { return Notify(); }
225};
226
227////////////////////////////////////////////////////////////////////////////////
228/// Handle this input
229
231{
233 return kTRUE;
234}
235
236TString TProofServLogHandler::fgPfx = ""; // Default prefix to be prepended to messages
237Int_t TProofServLogHandler::fgCmdRtn = 0; // Return code of the command execution (available only
238 // after closing the pipe)
239////////////////////////////////////////////////////////////////////////////////
240/// Execute 'cmd' in a pipe and handle output messages from the related file
241
243 TSocket *s, const char *pfx)
244 : TFileHandler(-1, 1), fSocket(s), fPfx(pfx)
245{
247 fgCmdRtn = 0;
248 fFile = 0;
249 if (s && cmd) {
250 fFile = gSystem->OpenPipe(cmd, "r");
251 if (fFile) {
252 SetFd(fileno(fFile));
253 // Notify what already in the file
254 Notify();
255 // Used in the destructor
257 } else {
258 fSocket = 0;
259 Error("TProofServLogHandler", "executing command in pipe");
260 fgCmdRtn = -1;
261 }
262 } else {
263 Error("TProofServLogHandler",
264 "undefined command (%p) or socket (%p)", (int *)cmd, s);
265 }
266}
267////////////////////////////////////////////////////////////////////////////////
268/// Handle available message from the open file 'f'
269
271 : TFileHandler(-1, 1), fSocket(s), fPfx(pfx)
272{
274 fgCmdRtn = 0;
275 fFile = 0;
276 if (s && f) {
277 fFile = f;
278 SetFd(fileno(fFile));
279 // Notify what already in the file
280 Notify();
281 } else {
282 Error("TProofServLogHandler", "undefined file (%p) or socket (%p)", f, s);
283 }
284}
285////////////////////////////////////////////////////////////////////////////////
286/// Handle available message in the open file
287
289{
290 if (TestBit(kFileIsPipe) && fFile) {
292#ifdef WIN32
293 fgCmdRtn = rc;
294#else
295 fgCmdRtn = WIFEXITED(rc) ? WEXITSTATUS(rc) : -1;
296#endif
297 }
298 fFile = 0;
299 fSocket = 0;
301}
302////////////////////////////////////////////////////////////////////////////////
303/// Handle available message in the open file
304
306{
307 if (IsValid()) {
309 // Read buffer
310 char line[4096];
311 char *plf = 0;
312 while (fgets(line, sizeof(line), fFile)) {
313 if ((plf = strchr(line, '\n')))
314 *plf = 0;
315 // Create log string
316 TString log;
317 if (fPfx.Length() > 0) {
318 // Prepend prefix specific to this instance
319 log.Form("%s: %s", fPfx.Data(), line);
320 } else if (fgPfx.Length() > 0) {
321 // Prepend default prefix
322 log.Form("%s: %s", fgPfx.Data(), line);
323 } else {
324 // Nothing to prepend
325 log = line;
326 }
327 // Send the message one level up
328 m.Reset(kPROOF_MESSAGE);
329 m << log;
330 fSocket->Send(m);
331 }
332 }
333 return kTRUE;
334}
335////////////////////////////////////////////////////////////////////////////////
336/// Static method to set the default prefix
337
339{
340 fgPfx = pfx;
341}
342////////////////////////////////////////////////////////////////////////////////
343/// Static method to get the return code from the execution of a command via
344/// the pipe. This is always 0 when the log handler is not used with a pipe
345
347{
348 return fgCmdRtn;
349}
350
351////////////////////////////////////////////////////////////////////////////////
352/// Init a guard for executing a command in a pipe
353
355 const char *pfx, Bool_t on)
356{
357 fExecHandler = 0;
358 if (cmd && on) {
359 fExecHandler = new TProofServLogHandler(cmd, s, pfx);
360 if (fExecHandler->IsValid()) {
362 } else {
363 Error("TProofServLogHandlerGuard","invalid handler");
364 }
365 } else {
366 if (on)
367 Error("TProofServLogHandlerGuard","undefined command");
368 }
369}
370
371////////////////////////////////////////////////////////////////////////////////
372/// Init a guard for executing a command in a pipe
373
375 const char *pfx, Bool_t on)
376{
377 fExecHandler = 0;
378 if (f && on) {
379 fExecHandler = new TProofServLogHandler(f, s, pfx);
380 if (fExecHandler->IsValid()) {
382 } else {
383 Error("TProofServLogHandlerGuard","invalid handler");
384 }
385 } else {
386 if (on)
387 Error("TProofServLogHandlerGuard","undefined file");
388 }
389}
390
391////////////////////////////////////////////////////////////////////////////////
392/// Close a guard for executing a command in a pipe
393
395{
399 }
400}
401
402//--- Special timer to control delayed shutdowns ----------------------------//
403////////////////////////////////////////////////////////////////////////////////
404/// Construtor
405
407 : TTimer(delay, kFALSE), fProofServ(p)
408{
409 fTimeout = gEnv->GetValue("ProofServ.ShutdownTimeout", 20);
410 // Backward compaitibility: until 5.32 the variable was called ProofServ.ShutdonwTimeout
411 fTimeout = gEnv->GetValue("ProofServ.ShutdonwTimeout", fTimeout);
412}
413
414////////////////////////////////////////////////////////////////////////////////
415/// Handle expiration of the shutdown timer. In the case of low activity the
416/// process will be aborted.
417
419{
420 if (gDebug > 0)
421 printf("TShutdownTimer::Notify: checking activity on the input socket\n");
422
423 // Check activity on the socket
424 TSocket *xs = 0;
425 if (fProofServ && (xs = fProofServ->GetSocket())) {
426 TTimeStamp now;
427 TTimeStamp ts = xs->GetLastUsage();
428 Long_t dt = (Long_t)(now.GetSec() - ts.GetSec()) * 1000 +
429 (Long_t)(now.GetNanoSec() - ts.GetNanoSec()) / 1000000 ;
430 if (dt > fTimeout * 60000) {
431 printf("TShutdownTimer::Notify: input socket: %p: did not show any activity"
432 " during the last %d mins: aborting\n", xs, fTimeout);
433 // At this point we lost our controller: we need to abort to avoid
434 // hidden timeouts or loops
435 gSystem->Abort();
436 } else {
437 if (gDebug > 0)
438 printf("TShutdownTimer::Notify: input socket: %p: show activity"
439 " %ld secs ago\n", xs, dt / 60000);
440 }
441 }
442 // Needed for the next shot
443 Reset();
444 return kTRUE;
445}
446
447//--- Synchronous timer used to reap children processes change of state ------//
448////////////////////////////////////////////////////////////////////////////////
449/// Destructor
450
452{
453 if (fChildren) {
455 delete fChildren;
456 fChildren = 0;
457 }
458}
459
460////////////////////////////////////////////////////////////////////////////////
461/// Add an entry for 'pid' in the internal list
462
464{
465 if (pid > 0) {
466 if (!fChildren)
467 fChildren = new TList;
468 TString spid;
469 spid.Form("%d", pid);
470 fChildren->Add(new TParameter<Int_t>(spid.Data(), pid));
471 TurnOn();
472 }
473}
474
475////////////////////////////////////////////////////////////////////////////////
476/// Check if any of the registered children has changed its state.
477/// Unregister those that are gone.
478
480{
481 if (fChildren) {
482 TIter nxp(fChildren);
483 TParameter<Int_t> *p = 0;
484 while ((p = (TParameter<Int_t> *)nxp())) {
485 int status;
486#ifndef WIN32
487 pid_t pid;
488 do {
489 pid = waitpid(p->GetVal(), &status, WNOHANG);
490 } while (pid < 0 && errno == EINTR);
491#else
492 intptr_t pid;
493 pid = _cwait(&status, (intptr_t)p->GetVal(), 0);
494#endif
495 if (pid > 0 && pid == p->GetVal()) {
496 // Remove from the list
498 delete p;
499 }
500 }
501 }
502
503 // Stop the timer if no children
504 if (!fChildren || fChildren->GetSize() <= 0) {
505 Stop();
506 } else {
507 // Needed for the next shot
508 Reset();
509 }
510 return kTRUE;
511}
512
513//--- Special timer to terminate idle sessions ----------------------------//
514////////////////////////////////////////////////////////////////////////////////
515/// Handle expiration of the idle timer. The session will just be terminated.
516
518{
519 Info ("Notify", "session idle for more then %lld secs: terminating", Long64_t(fTime)/1000);
520
521 if (fProofServ) {
522 // Set the status to timed-out
523 Int_t uss_rc = -1;
524 if ((uss_rc = fProofServ->UpdateSessionStatus(4)) != 0)
525 Warning("Notify", "problems updating session status (errno: %d)", -uss_rc);
526 // Send a terminate request
527 TString msg;
528 if (fProofServ->GetProtocol() < 29) {
529 msg.Form("\n//\n// PROOF session at %s (%s) terminated because idle for more than %lld secs\n"
530 "// Please IGNORE any error message possibly displayed below\n//",
532 } else {
533 msg.Form("\n//\n// PROOF session at %s (%s) terminated because idle for more than %lld secs\n//",
535 }
538 Reset();
539 Stop();
540 } else {
541 Warning("Notify", "fProofServ undefined!");
542 Start(-1, kTRUE);
543 }
544 return kTRUE;
545}
546
548
549// Hook to the constructor. This is needed to avoid using the plugin manager
550// which may create problems in multi-threaded environments.
551extern "C" {
552 TApplication *GetTProofServ(Int_t *argc, char **argv, FILE *flog)
553 { return new TProofServ(argc, argv, flog); }
554}
555
556////////////////////////////////////////////////////////////////////////////////
557/// Main constructor. Create an application environment. The TProofServ
558/// environment provides an eventloop via inheritance of TApplication.
559/// Actual server creation work is done in CreateServer() to allow
560/// overloading.
561
562TProofServ::TProofServ(Int_t *argc, char **argv, FILE *flog)
563 : TApplication("proofserv", argc, argv, 0, -1)
564{
565 // If test and tty, we are done
566 Bool_t xtest = (argc && *argc == 1) ? kTRUE : kFALSE;
567 if (xtest) {
568 Printf("proofserv: command line testing: OK");
569 exit(0);
570 }
571
572 // Read session specific rootrc file
573 TString rcfile = gSystem->Getenv("ROOTRCFILE") ? gSystem->Getenv("ROOTRCFILE")
574 : "session.rootrc";
576 gEnv->ReadFile(rcfile, kEnvChange);
577
578 // Upper limit on Virtual Memory (in kB)
579 fgVirtMemMax = gEnv->GetValue("Proof.VirtMemMax",-1);
580 if (fgVirtMemMax < 0 && gSystem->Getenv("PROOF_VIRTMEMMAX")) {
581 Long_t mmx = strtol(gSystem->Getenv("PROOF_VIRTMEMMAX"), 0, 10);
582 if (mmx < kMaxLong && mmx > 0)
583 fgVirtMemMax = mmx * 1024;
584 }
585 // Old variable for backward compatibility
586 if (fgVirtMemMax < 0 && gSystem->Getenv("ROOTPROOFASHARD")) {
587 Long_t mmx = strtol(gSystem->Getenv("ROOTPROOFASHARD"), 0, 10);
588 if (mmx < kMaxLong && mmx > 0)
589 fgVirtMemMax = mmx * 1024;
590 }
591 // Upper limit on Resident Memory (in kB)
592 fgResMemMax = gEnv->GetValue("Proof.ResMemMax",-1);
593 if (fgResMemMax < 0 && gSystem->Getenv("PROOF_RESMEMMAX")) {
594 Long_t mmx = strtol(gSystem->Getenv("PROOF_RESMEMMAX"), 0, 10);
595 if (mmx < kMaxLong && mmx > 0)
596 fgResMemMax = mmx * 1024;
597 }
598 // Thresholds for warnings and stop processing
599 fgMemStop = gEnv->GetValue("Proof.MemStop", 0.95);
600 fgMemHWM = gEnv->GetValue("Proof.MemHWM", 0.80);
601 if (fgVirtMemMax > 0 || fgResMemMax > 0) {
602 if ((fgMemStop < 0.) || (fgMemStop > 1.)) {
603 Warning("TProofServ", "requested memory fraction threshold to stop processing"
604 " (MemStop) out of range [0,1] - ignoring");
605 fgMemStop = 0.95;
606 }
607 if ((fgMemHWM < 0.) || (fgMemHWM > fgMemStop)) {
608 Warning("TProofServ", "requested memory fraction threshold for warning and finer monitoring"
609 " (MemHWM) out of range [0,MemStop] - ignoring");
610 fgMemHWM = 0.80;
611 }
612 }
613
614 // Wait (loop) to allow debugger to connect
615 Bool_t test = (argc && *argc >= 4 && !strcmp(argv[3], "test")) ? kTRUE : kFALSE;
616 if ((gEnv->GetValue("Proof.GdbHook",0) == 3 && !test) ||
617 (gEnv->GetValue("Proof.GdbHook",0) == 4 && test)) {
618 while (gProofServDebug)
619 ;
620 }
621
622 // Test instance
623 if (argc && *argc >= 4)
624 if (!strcmp(argv[3], "test"))
625 fService = "prooftest";
626
627 // crude check on number of arguments
628 if (argc && *argc < 2) {
629 Error("TProofServ", "Must have at least 1 arguments (see proofd).");
630 exit(1);
631 }
632
633 // Set global to this instance
634 gProofServ = this;
635
636 // Log control flags
638
639 // Abort on higher than kSysError's and set error handler
641 SetErrorHandlerFile(stderr);
643
644 fNcmd = 0;
645 fGroupPriority = 100;
647 fProtocol = 0;
648 fOrdinal = gEnv->GetValue("ProofServ.Ordinal", "-1");
649 fGroupId = -1;
650 fGroupSize = 0;
651 fRealTime = 0.0;
652 fCpuTime = 0.0;
653 fProof = 0;
654 fPlayer = 0;
655 fSocket = 0;
656
657 fTotSessions = -1;
658 fActSessions = -1;
659 fEffSessions = -1.;
660 fPackMgr = 0;
661
662 fLogFile = flog;
663 fLogFileDes = -1;
664
665 fArchivePath = "";
666 // Init lockers
667 fCacheLock = 0;
668 fQueryLock = 0;
669
670 fQMgr = 0;
672 fIdle = kTRUE;
673 fQuerySeqNum = -1;
674
675 fQueuedMsg = new TList;
676
678
679 fShutdownTimer = 0;
680 fReaperTimer = 0;
681 fIdleTOTimer = 0;
682
683 fDataSetManager = 0; // Initialized in Setup()
684 fDataSetStgRepo = 0; // Initialized in Setup()
685
686 fInputHandler = 0;
687
688 // Quotas disabled by default
689 fMaxQueries = -1;
690 fMaxBoxSize = -1;
691 fHWMBoxSize = -1;
692
693 // Submerger quantities
694 fMergingSocket = 0;
695 fMergingMonitor = 0;
696 fMergedWorkers = 0;
697
698 // Bit to flg high-memory footprint
700
701 // Max message size
702 fMsgSizeHWM = gEnv->GetValue("ProofServ.MsgSizeHWM", 1000000);
703
704 // Message compression
705 fCompressMsg = gEnv->GetValue("ProofServ.CompressMessage", 0);
706
707 gProofDebugLevel = gEnv->GetValue("Proof.DebugLevel",0);
709
711 if (gProofDebugLevel > 0)
712 Info("TProofServ", "DebugLevel %d Mask 0x%x", gProofDebugLevel, gProofDebugMask);
713
714 // Max log file size
715 fLogFileMaxSize = -1;
716 TString logmx = gEnv->GetValue("ProofServ.LogFileMaxSize", "");
717 if (!logmx.IsNull()) {
718 Long64_t xf = 1;
719 if (!logmx.IsDigit()) {
720 if (logmx.EndsWith("K")) {
721 xf = 1024;
722 logmx.Remove(TString::kTrailing, 'K');
723 } else if (logmx.EndsWith("M")) {
724 xf = 1024*1024;
725 logmx.Remove(TString::kTrailing, 'M');
726 } else if (logmx.EndsWith("G")) {
727 xf = 1024*1024*1024;
728 logmx.Remove(TString::kTrailing, 'G');
729 }
730 }
731 if (logmx.IsDigit()) {
732 fLogFileMaxSize = logmx.Atoi() * xf;
733 if (fLogFileMaxSize > 0)
734 Info("TProofServ", "keeping the log file size within %lld bytes", fLogFileMaxSize);
735 } else {
736 logmx = gEnv->GetValue("ProofServ.LogFileMaxSize", "");
737 Warning("TProofServ", "bad formatted log file size limit ignored: '%s'", logmx.Data());
738 }
739 }
740
741 // Parse options
742 GetOptions(argc, argv);
743
744 // Default prefix in the form '<role>-<ordinal>'
745 fPrefix = (IsMaster() ? "Mst-" : "Wrk-");
746 if (test) fPrefix = "Test";
747 if (fOrdinal != "-1")
748 fPrefix += fOrdinal;
750
751 // Syslog control
752 TString slog = gEnv->GetValue("ProofServ.LogToSysLog", "");
753 if (!(slog.IsNull())) {
754 if (slog.IsDigit()) {
755 fgLogToSysLog = slog.Atoi();
756 } else {
757 char c = (slog[0] == 'M' || slog[0] == 'm') ? 'm' : 'a';
758 c = (slog[0] == 'W' || slog[0] == 'w') ? 'w' : c;
759 Bool_t dosyslog = ((c == 'm' && IsMaster()) ||
760 (c == 'w' && !IsMaster()) || c == 'a') ? kTRUE : kFALSE;
761 if (dosyslog) {
762 slog.Remove(0,1);
763 if (slog.IsDigit()) fgLogToSysLog = slog.Atoi();
764 if (fgLogToSysLog <= 0)
765 Warning("TProofServ", "request for syslog logging ineffective!");
766 }
767 }
768 }
769 // Initialize proper service if required
770 if (fgLogToSysLog > 0) {
771 fgSysLogService = (IsMaster()) ? "proofm" : "proofw";
772 if (fOrdinal != "-1") fgSysLogService += TString::Format("-%s", fOrdinal.Data());
774 }
775
776 // Enable optimized sending of streamer infos to use embedded backward/forward
777 // compatibility support between different ROOT versions and different versions of
778 // users classes
779 Bool_t enableSchemaEvolution = gEnv->GetValue("Proof.SchemaEvolution",1);
780 if (enableSchemaEvolution) {
782 } else {
783 Info("TProofServ", "automatic schema evolution in TMessage explicitly disabled");
784 }
785}
786
787////////////////////////////////////////////////////////////////////////////////
788/// Finalize the server setup. If master, create the TProof instance to talk
789/// to the worker or submaster nodes.
790/// Return 0 on success, -1 on error
791
793{
794 // Get socket to be used (setup in proofd)
795 TString opensock = gSystem->Getenv("ROOTOPENSOCK");
796 if (opensock.Length() <= 0)
797 opensock = gEnv->GetValue("ProofServ.OpenSock", "-1");
798 Int_t sock = opensock.Atoi();
799 if (sock <= 0) {
800 Fatal("CreateServer", "Invalid socket descriptor number (%d)", sock);
801 return -1;
802 }
803 fSocket = new TSocket(sock);
804
805 // Set compression level, if any
807
808 // debug hooks
809 if (IsMaster()) {
810 // wait (loop) in master to allow debugger to connect
811 if (gEnv->GetValue("Proof.GdbHook",0) == 1) {
812 while (gProofServDebug)
813 ;
814 }
815 } else {
816 // wait (loop) in slave to allow debugger to connect
817 if (gEnv->GetValue("Proof.GdbHook",0) == 2) {
818 while (gProofServDebug)
819 ;
820 }
821 }
822
823 if (gProofDebugLevel > 0)
824 Info("CreateServer", "Service %s ConfDir %s IsMaster %d\n",
826
827 if (Setup() != 0) {
828 // Setup failure
829 LogToMaster();
830 SendLogFile();
831 Terminate(0);
832 return -1;
833 }
834
835 // Set the default prefix in the form '<role>-<ordinal>' (it was already done
836 // in the constructor, but for standard PROOF the ordinal number is only set in
837 // Setup(), so we need to do it again here)
838 TString pfx = (IsMaster() ? "Mst-" : "Wrk-");
839 pfx += GetOrdinal();
841
842 if (!fLogFile) {
844 // If for some reason we failed setting a redirection file for the logs
845 // we cannot continue
846 if (!fLogFile || (fLogFileDes = fileno(fLogFile)) < 0) {
847 LogToMaster();
848 SendLogFile(-98);
849 Terminate(0);
850 return -1;
851 }
852 } else {
853 // Use the file already open by pmain
854 if ((fLogFileDes = fileno(fLogFile)) < 0) {
855 LogToMaster();
856 SendLogFile(-98);
857 Terminate(0);
858 return -1;
859 }
860 }
861
862 // Send message of the day to the client
863 if (IsMaster()) {
864 if (CatMotd() == -1) {
865 LogToMaster();
866 SendLogFile(-99);
867 Terminate(0);
868 return -1;
869 }
870 }
871
872 // Everybody expects std::iostream to be available, so load it...
873 ProcessLine("#include <iostream>", kTRUE);
874 ProcessLine("#include <string>",kTRUE); // for std::string std::iostream.
875
876 // The following libs are also useful to have, make sure they are loaded...
877 //gROOT->LoadClass("TMinuit", "Minuit");
878 //gROOT->LoadClass("TPostScript", "Postscript");
879
880 // Load user functions
881 const char *logon;
882 logon = gEnv->GetValue("Proof.Load", (char *)0);
883 if (logon) {
884 char *mac = gSystem->Which(TROOT::GetMacroPath(), logon, kReadPermission);
885 if (mac)
886 ProcessLine(TString::Format(".L %s", logon), kTRUE);
887 delete [] mac;
888 }
889
890 // Execute logon macro
891 logon = gEnv->GetValue("Proof.Logon", (char *)0);
892 if (logon && !NoLogOpt()) {
893 char *mac = gSystem->Which(TROOT::GetMacroPath(), logon, kReadPermission);
894 if (mac)
895 ProcessFile(logon);
896 delete [] mac;
897 }
898
899 // Save current interpreter context
900 gInterpreter->SaveContext();
901 gInterpreter->SaveGlobalsContext();
902
903 // Install interrupt and message input handlers
906 fInputHandler = new TProofServInputHandler(this, sock);
908
909 // if master, start slave servers
910 if (IsMaster()) {
911 TString master = "proof://__master__";
913 if (a.IsValid()) {
914 master += ":";
915 master += a.GetPort();
916 }
917
918 // Get plugin manager to load appropriate TProof from
919 TPluginManager *pm = gROOT->GetPluginManager();
920 if (!pm) {
921 Error("CreateServer", "no plugin manager found");
922 SendLogFile(-99);
923 Terminate(0);
924 return -1;
925 }
926
927 // Find the appropriate handler
928 TPluginHandler *h = pm->FindHandler("TProof", fConfFile);
929 if (!h) {
930 Error("CreateServer", "no plugin found for TProof with a"
931 " config file of '%s'", fConfFile.Data());
932 SendLogFile(-99);
933 Terminate(0);
934 return -1;
935 }
936
937 // load the plugin
938 if (h->LoadPlugin() == -1) {
939 Error("CreateServer", "plugin for TProof could not be loaded");
940 SendLogFile(-99);
941 Terminate(0);
942 return -1;
943 }
944
945 // make instance of TProof
946 fProof = reinterpret_cast<TProof*>(h->ExecPlugin(5, master.Data(),
947 fConfFile.Data(),
948 GetConfDir(),
949 fLogLevel, 0));
950 if (!fProof || !fProof->IsValid()) {
951 Error("CreateServer", "plugin for TProof could not be executed");
953 SendLogFile(-99);
954 Terminate(0);
955 return -1;
956 }
957 // Find out if we are a master in direct contact only with workers
959
960 SendLogFile();
961 }
962
963 // Setup the shutdown timer
964 if (!fShutdownTimer) {
965 // Check activity on socket every 5 mins
966 fShutdownTimer = new TShutdownTimer(this, 300000);
968 }
969
970 // Check if schema evolution is effective: clients running versions <=17 do not
971 // support that: send a warning message
972 if (fProtocol <= 17) {
973 TString msg;
974 msg.Form("Warning: client version is too old: automatic schema evolution is ineffective.\n"
975 " This may generate compatibility problems between streamed objects.\n"
976 " The advise is to move to ROOT >= 5.21/02 .");
977 SendAsynMessage(msg.Data());
978 }
979
980 // Setup the idle timer
981 if (IsMaster() && !fIdleTOTimer) {
982 // Check activity on socket every 5 mins
983 Int_t idle_to = gEnv->GetValue("ProofServ.IdleTimeout", -1);
984 if (idle_to > 0) {
985 fIdleTOTimer = new TIdleTOTimer(this, idle_to * 1000);
987 if (gProofDebugLevel > 0)
988 Info("CreateServer", " idle timer started (%d secs)", idle_to);
989 } else if (gProofDebugLevel > 0) {
990 Info("CreateServer", " idle timer not started (no idle timeout requested)");
991 }
992 }
993
994 // Done
995 return 0;
996}
997
998////////////////////////////////////////////////////////////////////////////////
999/// Cleanup. Not really necessary since after this dtor there is no
1000/// live anyway.
1001
1003{
1011 close(fLogFileDes);
1012}
1013
1014////////////////////////////////////////////////////////////////////////////////
1015/// Print message of the day (in the file pointed by the env PROOFMOTD
1016/// or from fConfDir/etc/proof/motd). The motd is not shown more than
1017/// once a day. If the file pointed by env PROOFNOPROOF exists (or the
1018/// file fConfDir/etc/proof/noproof exists), show its contents and close
1019/// the connection.
1020
1022{
1023 TString lastname;
1024 FILE *motd;
1025 Bool_t show = kFALSE;
1026
1027 // If we are disabled just print the message and close the connection
1028 TString motdname(GetConfDir());
1029 // The env variable PROOFNOPROOF allows to put the file in an alternative
1030 // location not overwritten by a new installation
1031 if (gSystem->Getenv("PROOFNOPROOF")) {
1032 motdname = gSystem->Getenv("PROOFNOPROOF");
1033 } else {
1034 motdname += "/etc/proof/noproof";
1035 }
1036 if ((motd = fopen(motdname, "r"))) {
1037 Int_t c;
1038 printf("\n");
1039 while ((c = getc(motd)) != EOF)
1040 putchar(c);
1041 fclose(motd);
1042 printf("\n");
1043
1044 return -1;
1045 }
1046
1047 // get last modification time of the file ~/proof/.prooflast
1048 lastname = TString(GetWorkDir()) + "/.prooflast";
1049 gSystem->ExpandPathName(lastname);
1050 Long64_t size;
1051 Long_t id, flags, modtime, lasttime = 0;
1052 if (gSystem->GetPathInfo(lastname.Data(), &id, &size, &flags, &lasttime) == 1)
1053 lasttime = 0;
1054
1055 // show motd at least once per day
1056 if (time(0) - lasttime > (time_t)86400)
1057 show = kTRUE;
1058
1059 // The env variable PROOFMOTD allows to put the file in an alternative
1060 // location not overwritten by a new installation
1061 if (gSystem->Getenv("PROOFMOTD")) {
1062 motdname = gSystem->Getenv("PROOFMOTD");
1063 } else {
1064 motdname = GetConfDir();
1065 motdname += "/etc/proof/motd";
1066 }
1067 if (gSystem->GetPathInfo(motdname, &id, &size, &flags, &modtime) == 0) {
1068 if (modtime > lasttime || show) {
1069 if ((motd = fopen(motdname, "r"))) {
1070 Int_t c;
1071 printf("\n");
1072 while ((c = getc(motd)) != EOF)
1073 putchar(c);
1074 fclose(motd);
1075 printf("\n");
1076 }
1077 }
1078 }
1079
1080 if (lasttime)
1081 gSystem->Unlink(lastname.Data());
1082 Int_t fd = creat(lastname.Data(), 0600);
1083 if (fd >= 0) close(fd);
1084
1085 return 0;
1086}
1087
1088////////////////////////////////////////////////////////////////////////////////
1089/// Get object with name "name;cycle" (e.g. "aap;2") from master or client.
1090/// This method is called by TDirectory::Get() in case the object can not
1091/// be found locally.
1092
1093TObject *TProofServ::Get(const char *namecycle)
1094{
1095 if (fSocket->Send(namecycle, kPROOF_GETOBJECT) < 0) {
1096 Error("Get", "problems sending request");
1097 return (TObject *)0;
1098 }
1099
1100 TObject *idcur = 0;
1101
1102 Bool_t notdone = kTRUE;
1103 while (notdone) {
1104 TMessage *mess = 0;
1105 if (fSocket->Recv(mess) < 0)
1106 return 0;
1107 Int_t what = mess->What();
1108 if (what == kMESS_OBJECT) {
1109 idcur = mess->ReadObject(mess->GetClass());
1110 notdone = kFALSE;
1111 } else {
1112 Int_t xrc = HandleSocketInput(mess, kFALSE);
1113 if (xrc == -1) {
1114 Error("Get", "command %d cannot be executed while processing", what);
1115 } else if (xrc == -2) {
1116 Error("Get", "unknown command %d ! Protocol error?", what);
1117 }
1118 }
1119 delete mess;
1120 }
1121
1122 return idcur;
1123}
1124
1125////////////////////////////////////////////////////////////////////////////////
1126/// Reset the compute time
1127
1129{
1130 fCompute.Stop();
1131 if (fPlayer) {
1133 if (status) status->SetLearnTime(fCompute.RealTime());
1134 Info("RestartComputeTime", "compute time restarted after %f secs (%d entries)",
1136 }
1138}
1139
1140////////////////////////////////////////////////////////////////////////////////
1141/// Get next range of entries to be processed on this server.
1142
1144{
1145 Long64_t bytesRead = 0;
1146
1147 if (gPerfStats) bytesRead = gPerfStats->GetBytesRead();
1148
1149 if (fCompute.Counter() > 0)
1150 fCompute.Stop();
1151
1153 Double_t cputime = fCompute.CpuTime();
1154 Double_t realtime = fCompute.RealTime();
1155
1156 if (fProtocol > 18) {
1157 req << fLatency.RealTime();
1158 TProofProgressStatus *status = 0;
1159 if (fPlayer) {
1161 status = fPlayer->GetProgressStatus();
1162 } else {
1163 Error("GetNextPacket", "no progress status object");
1164 return 0;
1165 }
1166 // the CPU and wallclock proc times are kept in the TProofServ and here
1167 // added to the status object in the fPlayer.
1168 if (status->GetEntries() > 0) {
1169 PDB(kLoop, 2) status->Print(GetOrdinal());
1170 status->IncProcTime(realtime);
1171 status->IncCPUTime(cputime);
1172 }
1173 // Flag cases with problems in opening files
1174 if (totalEntries < 0) status->SetBit(TProofProgressStatus::kFileNotOpen);
1175 // Add to the message
1176 req << status;
1177 // Send tree cache information
1178 Long64_t cacheSize = (fPlayer) ? fPlayer->GetCacheSize() : -1;
1179 Int_t learnent = (fPlayer) ? fPlayer->GetLearnEntries() : -1;
1180 req << cacheSize << learnent;
1181
1182 // Sent over the number of entries in the file, used by packetizer do not relying
1183 // on initial validation. Also, -1 means that the file could not be open, which is
1184 // used to flag files as missing
1185 req << totalEntries;
1186
1187 // Send the time spent in saving the partial result to file
1188 if (fProtocol > 34) req << fSaveOutput.RealTime();
1189
1190 PDB(kLoop, 1) {
1191 PDB(kLoop, 2) status->Print();
1192 Info("GetNextPacket","cacheSize: %lld, learnent: %d", cacheSize, learnent);
1193 }
1194 // Reset the status bits
1197 status = 0; // status is owned by the player.
1198 } else {
1199 req << fLatency.RealTime() << realtime << cputime
1200 << bytesRead << totalEntries;
1201 if (fPlayer)
1202 req << fPlayer->GetEventsProcessed();
1203 }
1204
1205 fLatency.Start();
1206 Int_t rc = fSocket->Send(req);
1207 if (rc <= 0) {
1208 Error("GetNextPacket","Send() failed, returned %d", rc);
1209 return 0;
1210 }
1211
1212 // Save the current output
1213 if (fPlayer) {
1216 Warning("GetNextPacket", "problems saving partial results");
1217 fSaveOutput.Stop();
1218 }
1219
1220 TDSetElement *e = 0;
1221 Bool_t notdone = kTRUE;
1222 while (notdone) {
1223
1224 TMessage *mess;
1225 if ((rc = fSocket->Recv(mess)) <= 0) {
1226 fLatency.Stop();
1227 Error("GetNextPacket","Recv() failed, returned %d", rc);
1228 return 0;
1229 }
1230
1231 Int_t xrc = 0;
1232 TString file, dir, obj;
1233
1234 Int_t what = mess->What();
1235
1236 switch (what) {
1237 case kPROOF_GETPACKET:
1238
1239 fLatency.Stop();
1240 (*mess) >> e;
1241 if (e != 0) {
1242 fCompute.Start();
1243 PDB(kLoop, 2) Info("GetNextPacket", "'%s' '%s' '%s' %lld %lld",
1244 e->GetFileName(), e->GetDirectory(),
1245 e->GetObjName(), e->GetFirst(),e->GetNum());
1246 } else {
1247 PDB(kLoop, 2) Info("GetNextPacket", "Done");
1248 }
1249 notdone = kFALSE;
1250 break;
1251
1252 case kPROOF_STOPPROCESS:
1253 // if a kPROOF_STOPPROCESS message is returned to kPROOF_GETPACKET
1254 // GetNextPacket() will return 0 and the TPacketizer and hence
1255 // TEventIter will be stopped
1256 fLatency.Stop();
1257 PDB(kLoop, 2) Info("GetNextPacket:kPROOF_STOPPROCESS","received");
1258 break;
1259
1260 default:
1261 xrc = HandleSocketInput(mess, kFALSE);
1262 if (xrc == -1) {
1263 Error("GetNextPacket", "command %d cannot be executed while processing", what);
1264 } else if (xrc == -2) {
1265 Error("GetNextPacket", "unknown command %d ! Protocol error?", what);
1266 }
1267 break;
1268 }
1269
1270 delete mess;
1271
1272 }
1273
1274 // Done
1275 return e;
1276}
1277
1278////////////////////////////////////////////////////////////////////////////////
1279/// Get and handle command line options. Fixed format:
1280/// "proofserv"|"proofslave" `<confdir>`
1281
1282void TProofServ::GetOptions(Int_t *argc, char **argv)
1283{
1284 Bool_t xtest = (argc && *argc > 3 && !strcmp(argv[3], "test")) ? kTRUE : kFALSE;
1285
1286 // If test and tty
1287 if (xtest && !(isatty(0) == 0 || isatty(1) == 0)) {
1288 Printf("proofserv: command line testing: OK");
1289 exit(0);
1290 }
1291
1292 if (!argc || (argc && *argc <= 1)) {
1293 Fatal("GetOptions", "Must be started from proofd with arguments");
1294 exit(1);
1295 }
1296
1297 if (!strcmp(argv[1], "proofserv")) {
1299 fEndMaster = kTRUE;
1300 } else if (!strcmp(argv[1], "proofslave")) {
1303 } else {
1304 Fatal("GetOptions", "Must be started as 'proofserv' or 'proofslave'");
1305 exit(1);
1306 }
1307
1308 fService = argv[1];
1309
1310 // Confdir
1311 if (!(gSystem->Getenv("ROOTCONFDIR"))) {
1312 Fatal("GetOptions", "ROOTCONFDIR shell variable not set");
1313 exit(1);
1314 }
1315 fConfDir = gSystem->Getenv("ROOTCONFDIR");
1316}
1317
1318////////////////////////////////////////////////////////////////////////////////
1319/// Handle input coming from the client or from the master server.
1320
1322{
1323 // The idle timeout guard: stops the timer and restarts when we return from here
1325
1326 Bool_t all = (fgRecursive > 0) ? kFALSE : kTRUE;
1327 fgRecursive++;
1328
1329 TMessage *mess;
1330 Int_t rc = 0;
1331 TString exmsg;
1332
1333 // Check log file length (before the action, so we have the chance to keep the
1334 // latest logs)
1336
1337 try {
1338
1339 // Get message
1340 if (fSocket->Recv(mess) <= 0 || !mess) {
1341 // Pending: do something more intelligent here
1342 // but at least get a message in the log file
1343 Error("HandleSocketInput", "retrieving message from input socket");
1344 Terminate(0);
1345 return;
1346 }
1347 Int_t what = mess->What();
1348 PDB(kCollect, 1)
1349 Info("HandleSocketInput", "got type %d from '%s'", what, fSocket->GetTitle());
1350
1351 fNcmd++;
1352
1353 if (fProof) fProof->SetActive();
1354
1355 Bool_t doit = kTRUE;
1356
1357 while (doit) {
1358
1359 // Process the message
1360 rc = HandleSocketInput(mess, all);
1361 if (rc < 0) {
1362 TString emsg;
1363 if (rc == -1) {
1364 emsg.Form("HandleSocketInput: command %d cannot be executed while processing", what);
1365 } else if (rc == -3) {
1366 emsg.Form("HandleSocketInput: message %d undefined! Protocol error?", what);
1367 } else {
1368 emsg.Form("HandleSocketInput: unknown command %d! Protocol error?", what);
1369 }
1370 SendAsynMessage(emsg.Data());
1371 } else if (rc == 2) {
1372 // Add to the queue
1373 fQueuedMsg->Add(mess);
1374 PDB(kGlobal, 1)
1375 Info("HandleSocketInput", "message of type %d enqueued; sz: %d",
1376 what, fQueuedMsg->GetSize());
1377 mess = 0;
1378 }
1379
1380 // Still something to do?
1381 doit = 0;
1382 if (fgRecursive == 1 && fQueuedMsg->GetSize() > 0) {
1383 // Add to the queue
1384 PDB(kCollect, 1)
1385 Info("HandleSocketInput", "processing enqueued message of type %d; left: %d",
1386 what, fQueuedMsg->GetSize());
1387 all = 1;
1388 SafeDelete(mess);
1389 mess = (TMessage *) fQueuedMsg->First();
1390 if (mess) fQueuedMsg->Remove(mess);
1391 doit = 1;
1392 }
1393 }
1394
1395 } catch (std::bad_alloc &) {
1396 // Memory allocation problem:
1397 exmsg.Form("caught exception 'bad_alloc' (memory leak?) %s %lld",
1399 } catch (std::exception &exc) {
1400 // Standard exception caught
1401 exmsg.Form("caught standard exception '%s' %s %lld",
1402 exc.what(), fgLastMsg.Data(), fgLastEntry);
1403 } catch (int i) {
1404 // Other exception caught
1405 exmsg.Form("caught exception throwing %d %s %lld",
1406 i, fgLastMsg.Data(), fgLastEntry);
1407 } catch (const char *str) {
1408 // Other exception caught
1409 exmsg.Form("caught exception throwing '%s' %s %lld",
1410 str, fgLastMsg.Data(), fgLastEntry);
1411 } catch (...) {
1412 // Caught other exception
1413 exmsg.Form("caught exception <unknown> %s %lld",
1415 }
1416
1417 // Terminate on exception
1418 if (!exmsg.IsNull()) {
1419 // Save info in the log file too
1420 Error("HandleSocketInput", "%s", exmsg.Data());
1421 // Try to warn the user
1422 SendAsynMessage(TString::Format("%s: %s", GetOrdinal(), exmsg.Data()));
1423 // Terminate
1424 Terminate(0);
1425 }
1426
1427 // Terminate also if a high memory footprint was detected before the related
1428 // exception was thrwon
1430 // Save info in the log file too
1431 exmsg.Form("high-memory footprint detected during Process(...) - terminating");
1432 Error("HandleSocketInput", "%s", exmsg.Data());
1433 // Try to warn the user
1434 SendAsynMessage(TString::Format("%s: %s", GetOrdinal(), exmsg.Data()));
1435 // Terminate
1436 Terminate(0);
1437 }
1438
1439 fgRecursive--;
1440
1441 if (fProof) {
1442 // If something wrong went on during processing and we do not have
1443 // any worker anymore, we shutdown this session
1444 Bool_t masterOnly = gEnv->GetValue("Proof.MasterOnly", kFALSE);
1445 Bool_t dynamicStartup = gEnv->GetValue("Proof.DynamicStartup", kFALSE);
1447 if (rc == 0 && ngwrks == 0 && !masterOnly && !dynamicStartup) {
1448 SendAsynMessage(" *** No workers left: cannot continue! Terminating ... *** ");
1449 Terminate(0);
1450 }
1452 // Reset PROOF to running state
1454 }
1455
1456 // Cleanup
1457 SafeDelete(mess);
1458}
1459
1460////////////////////////////////////////////////////////////////////////////////
1461/// Process input coming from the client or from the master server.
1462/// If 'all' is kFALSE, process only those messages that can be handled
1463/// during query processing.
1464/// Returns -1 if the message could not be processed, <-1 if something went
1465/// wrong. Returns 1 if the action may have changed the parallel state.
1466/// Returns 2 if the message has to be enqueued.
1467/// Returns 0 otherwise
1468
1470{
1471 static TStopwatch timer;
1472 char str[2048];
1473 Bool_t aborted = kFALSE;
1474
1475 if (!mess) return -3;
1476
1477 Int_t what = mess->What();
1478 PDB(kCollect, 1)
1479 Info("HandleSocketInput", "processing message type %d from '%s'",
1480 what, fSocket->GetTitle());
1481
1482 timer.Start();
1483
1484 Int_t rc = 0, lirc = 0;
1485 TString slb;
1486 TString *pslb = (fgLogToSysLog > 0) ? &slb : (TString *)0;
1487
1488 switch (what) {
1489
1490 case kMESS_CINT:
1491 if (all) {
1492 mess->ReadString(str, sizeof(str));
1493 // Make sure that the relevant files are available
1494 TString fn;
1495
1496 Bool_t hasfn = TProof::GetFileInCmd(str, fn);
1497
1498 if (IsParallel() && fProof && !fProof->UseDynamicStartup()) {
1499 fProof->SendCommand(str);
1500 } else {
1501 PDB(kGlobal, 1)
1502 Info("HandleSocketInput:kMESS_CINT", "processing: %s...", str);
1503 TString ocwd;
1504 if (hasfn) {
1505 fCacheLock->Lock();
1506 ocwd = gSystem->WorkingDirectory();
1508 }
1509 ProcessLine(str);
1510 if (hasfn) {
1511 gSystem->ChangeDirectory(ocwd);
1512 fCacheLock->Unlock();
1513 }
1514 }
1515
1516 LogToMaster();
1517 } else {
1518 rc = -1;
1519 }
1520 SendLogFile();
1521 if (pslb) slb = str;
1522 break;
1523
1524 case kMESS_STRING:
1525 if (all) {
1526 mess->ReadString(str, sizeof(str));
1527 } else {
1528 rc = -1;
1529 }
1530 break;
1531
1532 case kMESS_OBJECT:
1533 if (all) {
1534 mess->ReadObject(mess->GetClass());
1535 } else {
1536 rc = -1;
1537 }
1538 break;
1539
1540 case kPROOF_GROUPVIEW:
1541 if (all) {
1542 mess->ReadString(str, sizeof(str));
1543 // coverity[secure_coding]
1544 sscanf(str, "%d %d", &fGroupId, &fGroupSize);
1545 } else {
1546 rc = -1;
1547 }
1548 break;
1549
1550 case kPROOF_LOGLEVEL:
1551 { UInt_t mask;
1552 mess->ReadString(str, sizeof(str));
1553 sscanf(str, "%d %u", &fLogLevel, &mask);
1554 Bool_t levelchanged = (fLogLevel != gProofDebugLevel) ? kTRUE : kFALSE;
1557 if (levelchanged)
1558 Info("HandleSocketInput:kPROOF_LOGLEVEL", "debug level set to %d (mask: 0x%x)",
1560 if (IsMaster())
1562 }
1563 break;
1564
1565 case kPROOF_PING:
1566 { if (IsMaster())
1567 fProof->Ping();
1568 // do nothing (ping is already acknowledged)
1569 }
1570 break;
1571
1572 case kPROOF_PRINT:
1573 mess->ReadString(str, sizeof(str));
1574 Print(str);
1575 LogToMaster();
1576 SendLogFile();
1577 break;
1578
1579 case kPROOF_RESET:
1580 if (all) {
1581 mess->ReadString(str, sizeof(str));
1582 Reset(str);
1583 } else {
1584 rc = -1;
1585 }
1586 break;
1587
1588 case kPROOF_STATUS:
1589 Warning("HandleSocketInput:kPROOF_STATUS",
1590 "kPROOF_STATUS message is obsolete");
1592 Warning("HandleSocketInput:kPROOF_STATUS", "problem sending of request");
1593 break;
1594
1595 case kPROOF_GETSTATS:
1597 break;
1598
1599 case kPROOF_GETPARALLEL:
1600 SendParallel();
1601 break;
1602
1603 case kPROOF_STOP:
1604 if (all) {
1605 if (IsMaster()) {
1606 TString ord;
1607 *mess >> ord;
1608 PDB(kGlobal, 1)
1609 Info("HandleSocketInput:kPROOF_STOP", "request for worker %s", ord.Data());
1610 if (fProof) fProof->TerminateWorker(ord);
1611 } else {
1612 PDB(kGlobal, 1)
1613 Info("HandleSocketInput:kPROOF_STOP", "got request to terminate");
1614 Terminate(0);
1615 }
1616 } else {
1617 rc = -1;
1618 }
1619 break;
1620
1621 case kPROOF_STOPPROCESS:
1622 if (all) {
1623 // this message makes only sense when the query is being processed,
1624 // however the message can also be received if the user pressed
1625 // ctrl-c, so ignore it!
1626 PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_STOPPROCESS","enter");
1627 } else {
1628 Long_t timeout = -1;
1629 (*mess) >> aborted;
1630 if (fProtocol > 9)
1631 (*mess) >> timeout;
1632 PDB(kGlobal, 1)
1633 Info("HandleSocketInput:kPROOF_STOPPROCESS",
1634 "recursive mode: enter %d, %ld", aborted, timeout);
1635 if (fProof)
1636 // On the master: propagate further
1637 fProof->StopProcess(aborted, timeout);
1638 else
1639 // Worker: actually stop processing
1640 if (fPlayer)
1641 fPlayer->StopProcess(aborted, timeout);
1642 }
1643 break;
1644
1645 case kPROOF_PROCESS:
1646 {
1648 PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_PROCESS","enter");
1649 HandleProcess(mess, pslb);
1650 // The log file is send either in HandleProcess or HandleSubmergers.
1651 // The reason is that the order of various messages depend on the
1652 // processing mode (sync/async) and/or merging mode
1653 }
1654 break;
1655
1656 case kPROOF_SENDOUTPUT:
1657 {
1658 PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_SENDOUTPUT",
1659 "worker was asked to send output to master");
1660 Int_t sorc = 0;
1661 if (SendResults(fSocket, fPlayer->GetOutputList()) != 0) {
1662 Error("HandleSocketInput:kPROOF_SENDOUTPUT", "problems sending output list");
1663 sorc = 1;
1664 }
1665 // Signal the master that we are idle
1667 SetIdle(kTRUE);
1668 DeletePlayer();
1669 SendLogFile(sorc);
1670 }
1671 break;
1672
1673 case kPROOF_QUERYLIST:
1674 {
1675 HandleQueryList(mess);
1676 // Notify
1677 SendLogFile();
1678 }
1679 break;
1680
1681 case kPROOF_REMOVE:
1682 {
1683 HandleRemove(mess, pslb);
1684 // Notify
1685 SendLogFile();
1686 }
1687 break;
1688
1689 case kPROOF_RETRIEVE:
1690 {
1691 HandleRetrieve(mess, pslb);
1692 // Notify
1693 SendLogFile();
1694 }
1695 break;
1696
1697 case kPROOF_ARCHIVE:
1698 {
1699 HandleArchive(mess, pslb);
1700 // Notify
1701 SendLogFile();
1702 }
1703 break;
1704
1705 case kPROOF_MAXQUERIES:
1706 { PDB(kGlobal, 1)
1707 Info("HandleSocketInput:kPROOF_MAXQUERIES", "Enter");
1709 m << fMaxQueries;
1710 fSocket->Send(m);
1711 // Notify
1712 SendLogFile();
1713 }
1714 break;
1715
1717 if (all) {
1718 PDB(kGlobal, 1)
1719 Info("HandleSocketInput:kPROOF_CLEANUPSESSION", "Enter");
1720 TString stag;
1721 (*mess) >> stag;
1722 if (fQMgr && fQMgr->CleanupSession(stag) == 0) {
1723 Printf("Session %s cleaned up", stag.Data());
1724 } else {
1725 Printf("Could not cleanup session %s", stag.Data());
1726 }
1727 } else {
1728 rc = -1;
1729 }
1730 // Notify
1731 SendLogFile();
1732 break;
1733
1734 case kPROOF_GETENTRIES:
1735 { PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETENTRIES", "Enter");
1736 Bool_t isTree;
1738 TString dir;
1739 TString objname("undef");
1740 Long64_t entries = -1;
1741
1742 if (all) {
1743 (*mess) >> isTree >> filename >> dir >> objname;
1744 PDB(kGlobal, 2) Info("HandleSocketInput:kPROOF_GETENTRIES",
1745 "Report size of object %s (%s) in dir %s in file %s",
1746 objname.Data(), isTree ? "T" : "O",
1747 dir.Data(), filename.Data());
1748 entries = TDSet::GetEntries(isTree, filename, dir, objname);
1749 PDB(kGlobal, 2) Info("HandleSocketInput:kPROOF_GETENTRIES",
1750 "Found %lld %s", entries, isTree ? "entries" : "objects");
1751 } else {
1752 rc = -1;
1753 }
1755 answ << entries << objname;
1756 SendLogFile(); // in case of error messages
1757 fSocket->Send(answ);
1758 PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETENTRIES", "Done");
1759 }
1760 break;
1761
1762 case kPROOF_CHECKFILE:
1763 if (!all && fProtocol <= 19) {
1764 // Come back later
1765 rc = 2;
1766 } else {
1767 // Handle file checking request
1768 HandleCheckFile(mess, pslb);
1769 FlushLogFile(); // Avoid sending (error) messages at next action
1770 }
1771 break;
1772
1773 case kPROOF_SENDFILE:
1774 if (!all && fProtocol <= 19) {
1775 // Come back later
1776 rc = 2;
1777 } else {
1778 mess->ReadString(str, sizeof(str));
1779 Long_t size;
1780 Int_t bin, fw = 1;
1781 char name[1024];
1782 if (fProtocol > 5) {
1783 sscanf(str, "%1023s %d %ld %d", name, &bin, &size, &fw);
1784 } else {
1785 sscanf(str, "%1023s %d %ld", name, &bin, &size);
1786 }
1787 TString fnam(name);
1788 Bool_t copytocache = kTRUE;
1789 if (fnam.BeginsWith("cache:")) {
1790 fnam.ReplaceAll("cache:", TString::Format("%s/", fCacheDir.Data()));
1791 copytocache = kFALSE;
1792 }
1793
1794 Int_t rfrc = 0;
1795 if (size > 0) {
1796 rfrc = ReceiveFile(fnam, bin ? kTRUE : kFALSE, size);
1797 } else {
1798 // Take it from the cache
1799 if (!fnam.BeginsWith(fCacheDir.Data())) {
1800 fnam.Insert(0, TString::Format("%s/", fCacheDir.Data()));
1801 }
1802 }
1803 if (rfrc == 0) {
1804 // copy file to cache if not a PAR file
1805 if (copytocache && size > 0 && !fPackMgr->IsInDir(name))
1806 gSystem->Exec(TString::Format("%s %s %s", kCP, fnam.Data(), fCacheDir.Data()));
1807 if (IsMaster() && fw == 1) {
1809 if (bin)
1810 opt |= TProof::kBinary;
1811 PDB(kGlobal, 1)
1812 Info("HandleSocketInput","forwarding file: %s", fnam.Data());
1813 if (fProof->SendFile(fnam, opt, (copytocache ? "cache" : "")) < 0) {
1814 Error("HandleSocketInput", "forwarding file: %s", fnam.Data());
1815 }
1816 }
1818 } else {
1819 // There was an error
1820 SendLogFile(1);
1821 }
1822 }
1823 break;
1824
1825 case kPROOF_LOGFILE:
1826 {
1827 Int_t start, end;
1828 (*mess) >> start >> end;
1829 PDB(kGlobal, 1)
1830 Info("HandleSocketInput:kPROOF_LOGFILE",
1831 "Logfile request - byte range: %d - %d", start, end);
1832
1833 LogToMaster();
1834 SendLogFile(0, start, end);
1835 }
1836 break;
1837
1838 case kPROOF_PARALLEL:
1839 if (all) {
1840 if (IsMaster()) {
1841 Int_t nodes;
1842 Bool_t random = kFALSE;
1843 (*mess) >> nodes;
1844 if ((mess->BufferSize() > mess->Length()))
1845 (*mess) >> random;
1846 if (fProof) fProof->SetParallel(nodes, random);
1847 rc = 1;
1848 }
1849 } else {
1850 rc = -1;
1851 }
1852 // Notify
1853 SendLogFile();
1854 break;
1855
1856 case kPROOF_CACHE:
1857 if (!all && fProtocol <= 19) {
1858 // Come back later
1859 rc = 2;
1860 } else {
1862 PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_CACHE","enter");
1863 Int_t hcrc = HandleCache(mess, pslb);
1864 // Notify
1865 SendLogFile(hcrc);
1866 }
1867 break;
1868
1869 case kPROOF_WORKERLISTS:
1870 { Int_t wlrc = -1;
1871 if (all) {
1872 if (IsMaster())
1873 wlrc = HandleWorkerLists(mess);
1874 else
1875 Warning("HandleSocketInput:kPROOF_WORKERLISTS",
1876 "Action meaning-less on worker nodes: protocol error?");
1877 } else {
1878 rc = -1;
1879 }
1880 // Notify
1881 SendLogFile(wlrc);
1882 }
1883 break;
1884
1886 if (all) {
1887 PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETSLAVEINFO", "Enter");
1888 if (IsMaster()) {
1889
1890 Bool_t ok = kTRUE;
1891 // if the session does not have workers and is in the dynamic mode
1892 if (fProof->UseDynamicStartup()) {
1893 ok = kFALSE;
1894 // get the a list of workers and start them
1895 Int_t pc = 0;
1896 TList* workerList = new TList();
1897 EQueryAction retVal = GetWorkers(workerList, pc);
1898 if (retVal != TProofServ::kQueryStop && retVal != TProofServ::kQueryEnqueued) {
1899 Int_t ret = fProof->AddWorkers(workerList);
1900 if (ret < 0) {
1901 Error("HandleSocketInput:kPROOF_GETSLAVEINFO",
1902 "adding a list of worker nodes returned: %d", ret);
1903 }
1904 } else {
1905 Error("HandleSocketInput:kPROOF_GETSLAVEINFO",
1906 "getting list of worker nodes returned: %d", retVal);
1907 }
1908 ok = kTRUE;
1909 }
1910 if (ok) {
1911 TList *info = fProof->GetListOfSlaveInfos();
1913 answ << info;
1914 fSocket->Send(answ);
1915 // stop the workers
1917 }
1918 } else {
1920 TList *info = new TList;
1922 SysInfo_t si;
1923 gSystem->GetSysInfo(&si);
1924 wi->SetSysInfo(si);
1925 info->Add(wi);
1926 answ << (TList *)info;
1927 fSocket->Send(answ);
1928 info->SetOwner(kTRUE);
1929 delete info;
1930 }
1931
1932 PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETSLAVEINFO", "Done");
1933 } else {
1935 answ << (TList *)0;
1936 fSocket->Send(answ);
1937 rc = -1;
1938 }
1939 break;
1940
1942 if (all) {
1943 PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETTREEHEADER", "Enter");
1944
1946 if (p) {
1947 p->HandleGetTreeHeader(mess);
1948 delete p;
1949 } else {
1950 Error("HandleSocketInput:kPROOF_GETTREEHEADER", "could not create TProofPlayer instance!");
1951 }
1952
1953 PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETTREEHEADER", "Done");
1954 } else {
1956 answ << TString("Failed") << (TObject *)0;
1957 fSocket->Send(answ);
1958 rc = -1;
1959 }
1960 break;
1961
1963 { PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETOUTPUTLIST", "Enter");
1964 TList* outputList = 0;
1965 if (IsMaster()) {
1966 outputList = fProof->GetOutputList();
1967 if (!outputList)
1968 outputList = new TList();
1969 } else {
1970 outputList = new TList();
1971 if (fProof->GetPlayer()) {
1972 TList *olist = fProof->GetPlayer()->GetOutputList();
1973 TIter next(olist);
1974 TObject *o;
1975 while ( (o = next()) ) {
1976 outputList->Add(new TNamed(o->GetName(), ""));
1977 }
1978 }
1979 }
1980 outputList->SetOwner();
1982 answ << outputList;
1983 fSocket->Send(answ);
1984 delete outputList;
1985 PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETOUTPUTLIST", "Done");
1986 }
1987 break;
1988
1990 if (all) {
1991 PDB(kGlobal, 1)
1992 Info("HandleSocketInput:kPROOF_VALIDATE_DSET", "Enter");
1993
1994 TDSet* dset = 0;
1995 (*mess) >> dset;
1996
1997 if (IsMaster()) fProof->ValidateDSet(dset);
1998 else dset->Validate();
1999
2001 answ << dset;
2002 fSocket->Send(answ);
2003 delete dset;
2004 PDB(kGlobal, 1)
2005 Info("HandleSocketInput:kPROOF_VALIDATE_DSET", "Done");
2006 } else {
2007 rc = -1;
2008 }
2009 // Notify
2010 SendLogFile();
2011 break;
2012
2013 case kPROOF_DATA_READY:
2014 if (all) {
2015 PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_DATA_READY", "Enter");
2017 if (IsMaster()) {
2018 Long64_t totalbytes = 0, bytesready = 0;
2019 Bool_t dataready = fProof->IsDataReady(totalbytes, bytesready);
2020 answ << dataready << totalbytes << bytesready;
2021 } else {
2022 Error("HandleSocketInput:kPROOF_DATA_READY",
2023 "This message should not be sent to slaves");
2024 answ << kFALSE << Long64_t(0) << Long64_t(0);
2025 }
2026 fSocket->Send(answ);
2027 PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_DATA_READY", "Done");
2028 } else {
2030 answ << kFALSE << Long64_t(0) << Long64_t(0);
2031 fSocket->Send(answ);
2032 rc = -1;
2033 }
2034 // Notify
2035 SendLogFile();
2036 break;
2037
2038 case kPROOF_DATASETS:
2039 { Int_t dsrc = -1;
2040 if (fProtocol > 16) {
2041 dsrc = HandleDataSets(mess, pslb);
2042 } else {
2043 Error("HandleSocketInput", "old client: no or incompatible dataset support");
2044 }
2045 SendLogFile(dsrc);
2046 }
2047 break;
2048
2049 case kPROOF_SUBMERGER:
2050 { HandleSubmerger(mess);
2051 }
2052 break;
2053
2055 if (all) {
2056 lirc = HandleLibIncPath(mess);
2057 } else {
2058 rc = -1;
2059 }
2060 // Notify the client
2061 if (lirc > 0) SendLogFile();
2062 break;
2063
2064 case kPROOF_REALTIMELOG:
2065 { Bool_t on;
2066 (*mess) >> on;
2067 PDB(kGlobal, 1)
2068 Info("HandleSocketInput:kPROOF_REALTIMELOG",
2069 "setting real-time logging %s", (on ? "ON" : "OFF"));
2070 fRealTimeLog = on;
2071 // Forward the request to lower levels
2072 if (IsMaster())
2074 }
2075 break;
2076
2077 case kPROOF_FORK:
2078 if (all) {
2079 HandleFork(mess);
2080 LogToMaster();
2081 } else {
2082 rc = -1;
2083 }
2084 SendLogFile();
2085 break;
2086
2088 if (all) {
2089 // This message resumes the session; should not come during processing.
2090
2091 if (WaitingQueries() == 0) {
2092 Error("HandleSocketInput", "no queries enqueued");
2093 break;
2094 }
2095
2096 // Similar to handle process
2097 // get the list of workers and start them
2098 TList *workerList = (fProof->UseDynamicStartup()) ? new TList : (TList *)0;
2099 Int_t pc = 0;
2100 EQueryAction retVal = GetWorkers(workerList, pc, kTRUE);
2101
2102 if (retVal == TProofServ::kQueryOK) {
2103 Int_t ret = 0;
2104 if (workerList && (ret = fProof->AddWorkers(workerList)) < 0) {
2105 Error("HandleSocketInput", "adding a list of worker nodes returned: %d", ret);
2106 } else {
2107 ProcessNext(pslb);
2108 // Set idle
2109 SetIdle(kTRUE);
2110 // Signal the client that we are idle
2112 Bool_t waiting = (WaitingQueries() > 0) ? kTRUE : kFALSE;
2113 m << waiting;
2114 fSocket->Send(m);
2115 }
2116 } else {
2117 if (retVal == TProofServ::kQueryStop) {
2118 Error("HandleSocketInput", "error getting list of worker nodes");
2119 } else if (retVal != TProofServ::kQueryEnqueued) {
2120 Warning("HandleSocketInput", "query was re-queued!");
2121 } else {
2122 Error("HandleSocketInput", "unexpected answer: %d", retVal);
2123 break;
2124 }
2125 }
2126
2127 }
2128 break;
2129
2130 case kPROOF_GOASYNC:
2131 { // The client requested to switch to asynchronous mode:
2132 // communicate the sequential number of the running query for later
2133 // identification, if any
2134 if (!IsIdle() && fPlayer) {
2135 // Get query currently being processed
2138 m << pq->GetSeqNum() << kFALSE;
2139 fSocket->Send(m);
2140 } else {
2141 // Idle or undefined: nothing to do; ignore
2142 SendAsynMessage("Processing request to go asynchronous:"
2143 " idle or undefined player - ignoring");
2144 }
2145 }
2146 break;
2147
2148 case kPROOF_ECHO:
2149 { // Echo request: an object has been sent along. If the object is a
2150 // string, it is simply echoed back to the client from the master
2151 // and each worker. Elsewhere, the output of TObject::Print() is
2152 // sent. Received object is disposed after usage.
2153
2154 TObject *obj = mess->ReadObject(0x0); // class type ignored
2155
2156 if (IsMaster()) {
2157 // We are on master
2158 // dbTODO: forward on dynamic startup when wrks are up
2159 if (IsParallel() && fProof && !fProof->UseDynamicStartup()) {
2160 fProof->Echo(obj); // forward to lower layer
2161 }
2162 }
2163
2165 TString smsg;
2166
2167 if (obj->InheritsFrom(TObjString::Class())) {
2168 // It's a string: echo it
2169 smsg.Form("Echo response from %s:%s: %s",
2171 ((TObjString *)obj)->String().Data());
2172 }
2173 else {
2174 // Not a string: collect Print() output and send it
2175
2176 // Output to tempfile
2177 TString tmpfn = "echo-out-";
2178 FILE *tf = gSystem->TempFileName(tmpfn, fDataDir);
2179 if (!tf || (gSystem->RedirectOutput(tmpfn.Data()) == -1)) {
2180 Error("HandleSocketInput", "Can't redirect output");
2181 if (tf) {
2182 fclose(tf);
2183 gSystem->Unlink(tmpfn);
2184 }
2185 rc = -1;
2186 delete obj;
2187 break;
2188 }
2189 //cout << obj->ClassName() << endl;
2190 obj->Print();
2191 gSystem->RedirectOutput(0x0); // restore
2192 fclose(tf);
2193
2194 // Read file back and send it via message
2195 smsg.Form("*** Echo response from %s:%s ***\n",
2197 TMacro *fr = new TMacro();
2198 fr->ReadFile(tmpfn);
2199 TIter nextLine(fr->GetListOfLines());
2201 while (( line = (TObjString *)nextLine() )) {
2202 smsg.Append( line->String() );
2203 }
2204
2205 // Close the reader (TMacro) and remove file
2206 delete fr;
2207 gSystem->Unlink(tmpfn);
2208 }
2209
2210 // Send message and dispose object
2211 rmsg << smsg;
2212 GetSocket()->Send(rmsg);
2213 delete obj;
2214 }
2215 break;
2216
2217 default:
2218 Error("HandleSocketInput", "unknown command %d", what);
2219 rc = -2;
2220 break;
2221 }
2222
2223 fRealTime += (Float_t)timer.RealTime();
2224 fCpuTime += (Float_t)timer.CpuTime();
2225
2226 if (!(slb.IsNull()) || fgLogToSysLog > 1) {
2227 TString s;
2228 s.Form("%s %d %.3f %.3f %s", fgSysLogEntity.Data(),
2229 what, timer.RealTime(), timer.CpuTime(), slb.Data());
2231 }
2232
2233 // Done
2234 return rc;
2235}
2236
2237////////////////////////////////////////////////////////////////////////////////
2238/// Accept and merge results from a set of workers
2239
2241{
2242 TMessage *mess = new TMessage();
2243 Int_t mergedWorkers = 0;
2244
2245 PDB(kSubmerger, 1) Info("AcceptResults", "enter");
2246
2247 // Overall result of this procedure
2249
2250 fMergingMonitor = new TMonitor();
2252
2253 Int_t numworkers = 0;
2254 while (fMergingMonitor->GetActive() > 0 && mergedWorkers < connections) {
2255
2257 if (!s) {
2258 Info("AcceptResults", "interrupt!");
2259 result = kFALSE;
2260 break;
2261 }
2262
2263 if (s == fMergingSocket) {
2264 // New incoming connection
2265 TSocket *sw = fMergingSocket->Accept();
2266 if (sw && sw != (TSocket *)(-1)) {
2267 fMergingMonitor->Add(sw);
2268
2269 PDB(kSubmerger, 2)
2270 Info("AcceptResults", "connection from a worker accepted on merger %s ",
2271 fOrdinal.Data());
2272 // All assigned workers are connected
2273 if (++numworkers >= connections)
2275 } else {
2276 PDB(kSubmerger, 1)
2277 Info("AcceptResults", "spurious signal found of merging socket");
2278 }
2279 } else {
2280 if (s->Recv(mess) < 0) {
2281 Error("AcceptResults", "problems receiving message");
2282 continue;
2283 }
2284 PDB(kSubmerger, 2)
2285 Info("AcceptResults", "message received: %d ", (mess ? mess->What() : 0));
2286 if (!mess) {
2287 Error("AcceptResults", "message received: %p ", mess);
2288 continue;
2289 }
2290 Int_t type = 0;
2291
2292 // Read output objec(s) from the received message
2293 while ((mess->BufferSize() > mess->Length())) {
2294 (*mess) >> type;
2295
2296 PDB(kSubmerger, 2) Info("AcceptResults", " type %d ", type);
2297 if (type == 2) {
2298 mergedWorkers++;
2299 PDB(kSubmerger, 2)
2300 Info("AcceptResults",
2301 "a new worker has been mergerd. Total merged workers: %d",
2302 mergedWorkers);
2303 }
2304 TObject *o = mess->ReadObject(TObject::Class());
2305 if (mergerPlayer->AddOutputObject(o) == 1) {
2306 // Remove the object if it has been merged
2307 PDB(kSubmerger, 2) Info("AcceptResults", "removing %p (has been merged)", o);
2308 SafeDelete(o);
2309 } else
2310 PDB(kSubmerger, 2) Info("AcceptResults", "%p not merged yet", o);
2311 }
2312 }
2313 }
2315
2317 Int_t size = sockets->GetSize();
2318 for (Int_t i =0; i< size; ++i){
2319 ((TSocket*)(sockets->At(i)))->Close();
2320 PDB(kSubmerger, 2) Info("AcceptResults", "closing socket");
2321 delete ((TSocket*)(sockets->At(i)));
2322 }
2323 delete sockets;
2324
2327
2328 PDB(kSubmerger, 2) Info("AcceptResults", "exit: %d", result);
2329 return result;
2330}
2331
2332////////////////////////////////////////////////////////////////////////////////
2333/// Handle Out-Of-Band data sent by the master or client.
2334
2336{
2337 char oob_byte;
2338 Int_t n, nch, wasted = 0;
2339
2340 const Int_t kBufSize = 1024;
2341 char waste[kBufSize];
2342
2343 // Real-time notification of messages
2345
2346 PDB(kGlobal, 5)
2347 Info("HandleUrgentData", "handling oob...");
2348
2349 // Receive the OOB byte
2350 while ((n = fSocket->RecvRaw(&oob_byte, 1, kOob)) < 0) {
2351 if (n == -2) { // EWOULDBLOCK
2352 //
2353 // The OOB data has not yet arrived: flush the input stream
2354 //
2355 // In some systems (Solaris) regular recv() does not return upon
2356 // receipt of the oob byte, which makes the below call to recv()
2357 // block indefinitely if there are no other data in the queue.
2358 // FIONREAD ioctl can be used to check if there are actually any
2359 // data to be flushed. If not, wait for a while for the oob byte
2360 // to arrive and try to read it again.
2361 //
2363 if (nch == 0) {
2364 gSystem->Sleep(1000);
2365 continue;
2366 }
2367
2368 if (nch > kBufSize) nch = kBufSize;
2369 n = fSocket->RecvRaw(waste, nch);
2370 if (n <= 0) {
2371 Error("HandleUrgentData", "error receiving waste");
2372 break;
2373 }
2374 wasted = 1;
2375 } else {
2376 Error("HandleUrgentData", "error receiving OOB");
2377 return;
2378 }
2379 }
2380
2381 PDB(kGlobal, 5)
2382 Info("HandleUrgentData", "got OOB byte: %d\n", oob_byte);
2383
2384 if (fProof) fProof->SetActive();
2385
2386 switch (oob_byte) {
2387
2389 Info("HandleUrgentData", "*** Hard Interrupt");
2390
2391 // If master server, propagate interrupt to slaves
2392 if (IsMaster())
2394
2395 // Flush input socket
2396 while (1) {
2397 Int_t atmark;
2398
2399 fSocket->GetOption(kAtMark, atmark);
2400
2401 if (atmark) {
2402 // Send the OOB byte back so that the client knows where
2403 // to stop flushing its input stream of obsolete messages
2404 n = fSocket->SendRaw(&oob_byte, 1, kOob);
2405 if (n <= 0)
2406 Error("HandleUrgentData", "error sending OOB");
2407 break;
2408 }
2409
2410 // find out number of bytes to read before atmark
2412 if (nch == 0) {
2413 gSystem->Sleep(1000);
2414 continue;
2415 }
2416
2417 if (nch > kBufSize) nch = kBufSize;
2418 n = fSocket->RecvRaw(waste, nch);
2419 if (n <= 0) {
2420 Error("HandleUrgentData", "error receiving waste (2)");
2421 break;
2422 }
2423 }
2424
2425 SendLogFile();
2426
2427 break;
2428
2430 Info("HandleUrgentData", "Soft Interrupt");
2431
2432 // If master server, propagate interrupt to slaves
2433 if (IsMaster())
2435
2436 if (wasted) {
2437 Error("HandleUrgentData", "soft interrupt flushed stream");
2438 break;
2439 }
2440
2441 Interrupt();
2442
2443 SendLogFile();
2444
2445 break;
2446
2448 Info("HandleUrgentData", "Shutdown Interrupt");
2449
2450 // If master server, propagate interrupt to slaves
2451 if (IsMaster())
2453
2454 Terminate(0);
2455
2456 break;
2457
2458 default:
2459 Error("HandleUrgentData", "unexpected OOB byte");
2460 break;
2461 }
2462
2464}
2465
2466////////////////////////////////////////////////////////////////////////////////
2467/// Called when the client is not alive anymore (i.e. when kKeepAlive
2468/// has failed).
2469
2471{
2472 // Real-time notification of messages
2474
2475 if (IsMaster()) {
2476 // Check if we are here because client is closed. Try to ping client,
2477 // if that works it we are here because some slave died
2478 if (fSocket->Send(kPROOF_PING | kMESS_ACK) < 0) {
2479 Info("HandleSigPipe", "keepAlive probe failed");
2480 // Tell slaves we are going to close since there is no client anymore
2481
2482 fProof->SetActive();
2485 Terminate(0);
2486 }
2487 } else {
2488 Info("HandleSigPipe", "keepAlive probe failed");
2489 Terminate(0); // will not return from here....
2490 }
2491}
2492
2493////////////////////////////////////////////////////////////////////////////////
2494/// True if in parallel mode.
2495
2497{
2498 if (IsMaster() && fProof)
2499 return fProof->IsParallel() || fProof->UseDynamicStartup() ;
2500
2501 // false in case we are a slave
2502 return kFALSE;
2503}
2504
2505////////////////////////////////////////////////////////////////////////////////
2506/// Print status of slave server.
2507
2509{
2510 if (IsMaster() && fProof)
2512 else
2513 Printf("This is worker %s", gSystem->HostName());
2514}
2515
2516////////////////////////////////////////////////////////////////////////////////
2517/// Redirect stdout to a log file. This log file will be flushed to the
2518/// client or master after each command.
2519
2520void TProofServ::RedirectOutput(const char *dir, const char *mode)
2521{
2522 char logfile[512];
2523
2524 TString sdir = (dir && strlen(dir) > 0) ? dir : fSessionDir.Data();
2525 if (IsMaster()) {
2526 snprintf(logfile, 512, "%s/master-%s.log", sdir.Data(), fOrdinal.Data());
2527 } else {
2528 snprintf(logfile, 512, "%s/worker-%s.log", sdir.Data(), fOrdinal.Data());
2529 }
2530
2531 if ((freopen(logfile, mode, stdout)) == 0)
2532 SysError("RedirectOutput", "could not freopen stdout (%s)", logfile);
2533
2534 if ((dup2(fileno(stdout), fileno(stderr))) < 0)
2535 SysError("RedirectOutput", "could not redirect stderr");
2536
2537 if ((fLogFile = fopen(logfile, "r")) == 0)
2538 SysError("RedirectOutput", "could not open logfile '%s'", logfile);
2539
2540 // from this point on stdout and stderr are properly redirected
2541 if (fProtocol < 4 && fWorkDir != TString::Format("~/%s", kPROOF_WorkDir)) {
2542 Warning("RedirectOutput", "no way to tell master (or client) where"
2543 " to upload packages");
2544 }
2545}
2546
2547////////////////////////////////////////////////////////////////////////////////
2548/// Reset PROOF environment to be ready for execution of next command.
2549
2550void TProofServ::Reset(const char *dir)
2551{
2552 // First go to new directory. Check first that we got a reasonable path;
2553 // in PROOF-Lite it may not be the case
2554 TString dd(dir);
2555 if (!dd.BeginsWith("proofserv")) {
2556 Int_t ic = dd.Index(":");
2557 if (ic != kNPOS)
2558 dd.Replace(0, ic, "proofserv");
2559 }
2560 gDirectory->cd(dd.Data());
2561
2562 // Clear interpreter environment.
2563 gROOT->Reset();
2564
2565 // Make sure current directory is empty (don't delete anything when
2566 // we happen to be in the ROOT memory only directory!?)
2567 if (gDirectory != gROOT) {
2568 gDirectory->Delete();
2569 }
2570
2572}
2573
2574////////////////////////////////////////////////////////////////////////////////
2575/// Receive a file, either sent by a client or a master server.
2576/// If bin is true it is a binary file, other wise it is an ASCII
2577/// file and we need to check for Windows \\r tokens. Returns -1 in
2578/// case of error, 0 otherwise.
2579
2581{
2582 if (size <= 0) return 0;
2583
2584 // open file, overwrite already existing file
2585 Int_t fd = open(file, O_CREAT | O_TRUNC | O_WRONLY, 0600);
2586 if (fd < 0) {
2587 SysError("ReceiveFile", "error opening file %s", file);
2588 return -1;
2589 }
2590
2591 const Int_t kMAXBUF = 16384; //32768 //16384 //65536;
2592 char buf[kMAXBUF], cpy[kMAXBUF];
2593
2594 Int_t left, r;
2595 Long64_t filesize = 0;
2596
2597 while (filesize < size) {
2598 left = Int_t(size - filesize);
2599 if (left > kMAXBUF)
2600 left = kMAXBUF;
2601 r = fSocket->RecvRaw(&buf, left);
2602 if (r > 0) {
2603 char *p = buf;
2604
2605 filesize += r;
2606 while (r) {
2607 Int_t w;
2608
2609 if (!bin) {
2610 Int_t k = 0, i = 0, j = 0;
2611 char *q;
2612 while (i < r) {
2613 if (p[i] == '\r') {
2614 i++;
2615 k++;
2616 }
2617 cpy[j++] = buf[i++];
2618 }
2619 q = cpy;
2620 r -= k;
2621 w = write(fd, q, r);
2622 } else {
2623 w = write(fd, p, r);
2624 }
2625
2626 if (w < 0) {
2627 SysError("ReceiveFile", "error writing to file %s", file);
2628 close(fd);
2629 return -1;
2630 }
2631 r -= w;
2632 p += w;
2633 }
2634 } else if (r < 0) {
2635 Error("ReceiveFile", "error during receiving file %s", file);
2636 close(fd);
2637 return -1;
2638 }
2639 }
2640
2641 close(fd);
2642
2643 if (chmod(file, 0644) != 0)
2644 Warning("ReceiveFile", "error setting mode 0644 on file %s", file);
2645
2646 return 0;
2647}
2648
2649////////////////////////////////////////////////////////////////////////////////
2650/// Main server eventloop.
2651
2653{
2654 // Setup the server
2655 if (CreateServer() == 0) {
2656
2657 // Run the main event loop
2658 TApplication::Run(retrn);
2659 }
2660}
2661
2662////////////////////////////////////////////////////////////////////////////////
2663/// Send log file to master.
2664/// If start > -1 send only bytes in the range from start to end,
2665/// if end <= start send everything from start.
2666
2668{
2669 // Determine the number of bytes left to be read from the log file.
2670 fflush(stdout);
2671
2672 // On workers we do not send the logs to masters (to avoid duplication of
2673 // text) unless asked explicitly, e.g. after an Exec(...) request.
2674 if (!IsMaster()) {
2675 if (!fSendLogToMaster) {
2676 FlushLogFile();
2677 } else {
2678 // Decide case by case
2680 }
2681 }
2682
2683 off_t ltot=0, lnow=0;
2684 Int_t left = -1;
2685 Bool_t adhoc = kFALSE;
2686
2687 if (fLogFileDes > -1) {
2688 ltot = lseek(fileno(stdout), (off_t) 0, SEEK_END);
2689 lnow = lseek(fLogFileDes, (off_t) 0, SEEK_CUR);
2690
2691 if (ltot >= 0 && lnow >= 0) {
2692 if (start > -1) {
2693 lseek(fLogFileDes, (off_t) start, SEEK_SET);
2694 if (end <= start || end > ltot)
2695 end = ltot;
2696 left = (Int_t)(end - start);
2697 if (end < ltot)
2698 left++;
2699 adhoc = kTRUE;
2700 } else {
2701 left = (Int_t)(ltot - lnow);
2702 }
2703 }
2704 }
2705
2706 if (left > 0) {
2707 if (fSocket->Send(left, kPROOF_LOGFILE) < 0) {
2708 SysError("SendLogFile", "error sending kPROOF_LOGFILE");
2709 return;
2710 }
2711
2712 const Int_t kMAXBUF = 32768; //16384 //65536;
2713 char buf[kMAXBUF];
2714 Int_t wanted = (left > kMAXBUF) ? kMAXBUF : left;
2715 Int_t len;
2716 do {
2717 while ((len = read(fLogFileDes, buf, wanted)) < 0 &&
2718 TSystem::GetErrno() == EINTR)
2720
2721 if (len < 0) {
2722 SysError("SendLogFile", "error reading log file");
2723 break;
2724 }
2725
2726 if (end == ltot && len == wanted)
2727 buf[len-1] = '\n';
2728
2729 if (fSocket->SendRaw(buf, len) < 0) {
2730 SysError("SendLogFile", "error sending log file");
2731 break;
2732 }
2733
2734 // Update counters
2735 left -= len;
2736 wanted = (left > kMAXBUF) ? kMAXBUF : left;
2737
2738 } while (len > 0 && left > 0);
2739 }
2740
2741 // Restore initial position if partial send
2742 if (adhoc && lnow >=0 )
2743 lseek(fLogFileDes, lnow, SEEK_SET);
2744
2746 if (IsMaster())
2747 mess << status << (fProof ? fProof->GetParallel() : 0);
2748 else
2749 mess << status << (Int_t) 1;
2750
2751 if (fSocket->Send(mess) < 0) {
2752 SysError("SendLogFile", "error sending kPROOF_LOGDONE");
2753 return;
2754 }
2755
2756 PDB(kGlobal, 1) Info("SendLogFile", "kPROOF_LOGDONE sent");
2757}
2758
2759////////////////////////////////////////////////////////////////////////////////
2760/// Send statistics of slave server to master or client.
2761
2763{
2764 Long64_t bytesread = TFile::GetFileBytesRead();
2765 Float_t cputime = fCpuTime, realtime = fRealTime;
2766 if (IsMaster()) {
2767 bytesread = fProof->GetBytesRead();
2768 cputime = fProof->GetCpuTime();
2769 }
2770
2772 TString workdir = gSystem->WorkingDirectory(); // expect TString on other side
2773 mess << bytesread << realtime << cputime << workdir;
2774 if (fProtocol >= 4) mess << TString(gProofServ->GetWorkDir());
2775 mess << TString(gProofServ->GetImage());
2776 fSocket->Send(mess);
2777}
2778
2779////////////////////////////////////////////////////////////////////////////////
2780/// Send number of parallel nodes to master or client.
2781
2783{
2784 Int_t nparallel = 0;
2785 if (IsMaster()) {
2786 PDB(kGlobal, 2)
2787 Info("SendParallel", "Will invoke AskParallel()");
2789 PDB(kGlobal, 2)
2790 Info("SendParallel", "Will invoke GetParallel()");
2791 nparallel = fProof->GetParallel();
2792 } else {
2793 nparallel = 1;
2794 }
2795
2797 mess << nparallel << async;
2798 fSocket->Send(mess);
2799}
2800
2801////////////////////////////////////////////////////////////////////////////////
2802/// Print the ProofServ logo on standard output.
2803/// Return 0 on success, -1 on failure
2804
2806{
2807 char str[512];
2808
2809 if (IsMaster()) {
2810 snprintf(str, 512, "**** Welcome to the PROOF server @ %s ****", gSystem->HostName());
2811 } else {
2812 snprintf(str, 512, "**** PROOF slave server @ %s started ****", gSystem->HostName());
2813 }
2814
2815 if (fSocket->Send(str) != 1+static_cast<Int_t>(strlen(str))) {
2816 Error("Setup", "failed to send proof server startup message");
2817 return -1;
2818 }
2819
2820 // exchange protocol level between client and master and between
2821 // master and slave
2822 Int_t what;
2823 if (fSocket->Recv(fProtocol, what) != 2*sizeof(Int_t)) {
2824 Error("Setup", "failed to receive remote proof protocol");
2825 return -1;
2826 }
2827 if (fSocket->Send(kPROOF_Protocol, kROOTD_PROTOCOL) != 2*sizeof(Int_t)) {
2828 Error("Setup", "failed to send local proof protocol");
2829 return -1;
2830 }
2831
2832 // If old version, setup authentication related stuff
2833 if (fProtocol < 5) {
2834 TString wconf;
2835 if (OldAuthSetup(wconf) != 0) {
2836 Error("Setup", "OldAuthSetup: failed to setup authentication");
2837 return -1;
2838 }
2839 if (IsMaster()) {
2840 fConfFile = wconf;
2841 fWorkDir.Form("~/%s", kPROOF_WorkDir);
2842 } else {
2843 if (fProtocol < 4) {
2844 fWorkDir.Form("~/%s", kPROOF_WorkDir);
2845 } else {
2846 fWorkDir = wconf;
2847 if (fWorkDir.IsNull()) fWorkDir.Form("~/%s", kPROOF_WorkDir);
2848 }
2849 }
2850 } else {
2851
2852 // Receive some useful information
2853 TMessage *mess;
2854 if ((fSocket->Recv(mess) <= 0) || !mess) {
2855 Error("Setup", "failed to receive ordinal and config info");
2856 return -1;
2857 }
2858 if (IsMaster()) {
2859 (*mess) >> fUser >> fOrdinal >> fConfFile;
2860 fWorkDir = gEnv->GetValue("ProofServ.Sandbox", TString::Format("~/%s", kPROOF_WorkDir));
2861 } else {
2862 (*mess) >> fUser >> fOrdinal >> fWorkDir;
2863 if (fWorkDir.IsNull())
2864 fWorkDir = gEnv->GetValue("ProofServ.Sandbox", TString::Format("~/%s", kPROOF_WorkDir));
2865 }
2866 // Set the correct prefix
2867 if (fOrdinal != "-1")
2868 fPrefix += fOrdinal;
2870 delete mess;
2871 }
2872
2873 if (IsMaster()) {
2874
2875 // strip off any prooftype directives
2876 TString conffile = fConfFile;
2877 conffile.Remove(0, 1 + conffile.Index(":"));
2878
2879 // parse config file to find working directory
2880 TProofResourcesStatic resources(fConfDir, conffile);
2881 if (resources.IsValid()) {
2882 if (resources.GetMaster()) {
2883 TString tmpWorkDir = resources.GetMaster()->GetWorkDir();
2884 if (tmpWorkDir != "")
2885 fWorkDir = tmpWorkDir;
2886 }
2887 } else {
2888 Info("Setup", "invalid config file %s (missing or unreadable",
2889 resources.GetFileName().Data());
2890 }
2891 }
2892
2893 // Set $HOME and $PATH. The HOME directory was already set to the
2894 // user's home directory by proofd.
2895 gSystem->Setenv("HOME", gSystem->HomeDirectory());
2896
2897 // Add user name in case of non default workdir
2898 if (fWorkDir.BeginsWith("/") &&
2900 if (!fWorkDir.EndsWith("/"))
2901 fWorkDir += "/";
2903 if (u) {
2904 fWorkDir += u->fUser;
2905 delete u;
2906 }
2907 }
2908
2909 // Goto to the main PROOF working directory
2911 if (gProofDebugLevel > 0)
2912 Info("Setup", "working directory set to %s", fWorkDir.Data());
2913
2914 // host first name
2915 TString host = gSystem->HostName();
2916 if (host.Index(".") != kNPOS)
2917 host.Remove(host.Index("."));
2918
2919 // Session tag
2920 fSessionTag.Form("%s-%s-%ld-%d", fOrdinal.Data(), host.Data(),
2921 (Long_t)TTimeStamp().GetSec(),gSystem->GetPid());
2923
2924 // create session directory and make it the working directory
2926 if (IsMaster())
2927 fSessionDir += "/master-";
2928 else
2929 fSessionDir += "/slave-";
2931
2932 // Common setup
2933 if (SetupCommon() != 0) {
2934 Error("Setup", "common setup failed");
2935 return -1;
2936 }
2937
2938 // Incoming OOB should generate a SIGURG
2940
2941 // Send packets off immediately to reduce latency
2943
2944 // Check every two hours if client is still alive
2946
2947 // Done
2948 return 0;
2949}
2950
2951////////////////////////////////////////////////////////////////////////////////
2952/// Common part (between TProofServ and TXProofServ) of the setup phase.
2953/// Return 0 on success, -1 on error
2954
2956{
2957 // deny write access for group and world
2958 gSystem->Umask(022);
2959
2960#ifdef R__UNIX
2961 // Add bindir to PATH
2962 TString path(gSystem->Getenv("PATH"));
2963 TString bindir(TROOT::GetBinDir());
2964 // Augment PATH, if required
2965 // ^<compiler>, <compiler>, ^<sysbin>, <sysbin>
2966 TString paths = gEnv->GetValue("ProofServ.BinPaths", "");
2967 if (paths.Length() > 0) {
2968 Int_t icomp = 0;
2969 if (paths.Contains("^<compiler>"))
2970 icomp = 1;
2971 else if (paths.Contains("<compiler>"))
2972 icomp = -1;
2973 if (icomp != 0) {
2974# ifdef COMPILER
2975 TString compiler = COMPILER;
2976 if (compiler.Index("is ") != kNPOS)
2977 compiler.Remove(0, compiler.Index("is ") + 3);
2978 compiler = gSystem->GetDirName(compiler);
2979 if (icomp == 1) {
2980 if (!bindir.IsNull()) bindir += ":";
2981 bindir += compiler;
2982 } else if (icomp == -1) {
2983 if (!path.IsNull()) path += ":";
2984 path += compiler;
2985 }
2986#endif
2987 }
2988 Int_t isysb = 0;
2989 if (paths.Contains("^<sysbin>"))
2990 isysb = 1;
2991 else if (paths.Contains("<sysbin>"))
2992 isysb = -1;
2993 if (isysb != 0) {
2994 if (isysb == 1) {
2995 if (!bindir.IsNull()) bindir += ":";
2996 bindir += "/bin:/usr/bin:/usr/local/bin";
2997 } else if (isysb == -1) {
2998 if (!path.IsNull()) path += ":";
2999 path += "/bin:/usr/bin:/usr/local/bin";
3000 }
3001 }
3002 }
3003 // Final insert
3004 if (!bindir.IsNull()) bindir += ":";
3005 path.Insert(0, bindir);
3006 gSystem->Setenv("PATH", path);
3007#endif
3008
3012 Error("SetupCommon", "can not change to PROOF directory %s",
3013 fWorkDir.Data());
3014 return -1;
3015 }
3016 } else {
3021 Error("SetupCommon", "can not change to PROOF directory %s",
3022 fWorkDir.Data());
3023 return -1;
3024 }
3025 }
3026 }
3027
3028 // Set group
3029 fGroup = gEnv->GetValue("ProofServ.ProofGroup", "default");
3030
3031 // Check and make sure "cache" directory exists
3032 fCacheDir = gEnv->GetValue("ProofServ.CacheDir",
3037 if (gProofDebugLevel > 0)
3038 Info("SetupCommon", "cache directory set to %s", fCacheDir.Data());
3039 fCacheLock =
3040 new TProofLockPath(TString::Format("%s/%s%s",
3042 TString(fCacheDir).ReplaceAll("/","%").Data()));
3043 // Make also sure the cache path is in the macro path
3045
3046 // Check and make sure "packages" directory exists
3047 TString packdir = gEnv->GetValue("ProofServ.PackageDir",
3049 ResolveKeywords(packdir);
3050 if (gSystem->AccessPathName(packdir))
3051 gSystem->mkdir(packdir, kTRUE);
3052 fPackMgr = new TPackMgr(packdir);
3054 // Notification message
3055 TString noth;
3056 const char *k = (IsMaster()) ? "Mst" : "Wrk";
3057 noth.Form("%s-%s", k, fOrdinal.Data());
3058 fPackMgr->SetPrefix(noth.Data());
3059 if (gProofDebugLevel > 0)
3060 Info("SetupCommon", "package directory set to %s", packdir.Data());
3061
3062 // Check and make sure "data" directory exists
3063 fDataDir = gEnv->GetValue("ProofServ.DataDir","");
3064 Ssiz_t isep = kNPOS;
3065 if (fDataDir.IsNull()) {
3066 // Use default
3067 fDataDir.Form("%s/%s/<ord>/<stag>", fWorkDir.Data(), kPROOF_DataDir);
3068 } else if ((isep = fDataDir.Last(' ')) != kNPOS) {
3069 fDataDirOpts = fDataDir(isep + 1, fDataDir.Length());
3070 fDataDir.Remove(isep);
3071 }
3074 if (gSystem->mkdir(fDataDir, kTRUE) != 0) {
3075 Warning("SetupCommon", "problems creating path '%s' (errno: %d)",
3077 }
3078 if (gProofDebugLevel > 0)
3079 Info("SetupCommon", "data directory set to %s", fDataDir.Data());
3080
3081 // Check and apply possible options
3082 TString dataDirOpts = gEnv->GetValue("ProofServ.DataDirOpts","");
3083 if (!dataDirOpts.IsNull()) {
3084 // Do they apply to this server type
3085 Bool_t doit = kTRUE;
3086 if ((IsMaster() && !dataDirOpts.Contains("M")) ||
3087 (!IsMaster() && !dataDirOpts.Contains("W"))) doit = kFALSE;
3088 if (doit) {
3089 // Get the wanted mode
3090 UInt_t m = 0755;
3091 if (dataDirOpts.Contains("g")) m = 0775;
3092 if (dataDirOpts.Contains("a") || dataDirOpts.Contains("o")) m = 0777;
3093 if (gProofDebugLevel > 0)
3094 Info("SetupCommon", "requested mode for data directories is '%o'", m);
3095 // Loop over paths
3096 FileStat_t st;
3097 TString p, subp;
3098 Int_t from = 0;
3099 if (fDataDir.BeginsWith("/")) p = "/";
3100 while (fDataDir.Tokenize(subp, from, "/")) {
3101 if (subp.IsNull()) continue;
3102 p += subp;
3103 if (gSystem->GetPathInfo(p, st) == 0) {
3104 if (st.fUid == (Int_t) gSystem->GetUid() && st.fGid == (Int_t) gSystem->GetGid()) {
3105 if (gSystem->Chmod(p.Data(), m) != 0) {
3106 Warning("SetupCommon", "problems setting mode '%o' on path '%s' (errno: %d)",
3107 m, p.Data(), TSystem::GetErrno());
3108 break;
3109 }
3110 }
3111 p += "/";
3112 } else {
3113 Warning("SetupCommon", "problems stat-ing path '%s' (errno: %d; datadir: %s)",
3114 p.Data(), TSystem::GetErrno(), fDataDir.Data());
3115 break;
3116 }
3117 }
3118 }
3119 }
3120
3121 // List of directories where to look for global packages
3122 TString globpack = gEnv->GetValue("Proof.GlobalPackageDirs","");
3123
3124 ResolveKeywords(globpack);
3125 Int_t nglb = TPackMgr::RegisterGlobalPath(globpack);
3126 Info("SetupCommon", " %d global package directories registered", nglb);
3127 FlushLogFile();
3128
3129 // Check the session dir
3135 Error("SetupCommon", "can not change to working directory '%s'",
3136 fSessionDir.Data());
3137 return -1;
3138 }
3139 }
3140 gSystem->Setenv("PROOF_SANDBOX", fSessionDir);
3141 if (gProofDebugLevel > 0)
3142 Info("SetupCommon", "session dir is '%s'", fSessionDir.Data());
3143
3144 // On masters, check and make sure that "queries" and "datasets"
3145 // directories exist
3146 if (IsMaster()) {
3147
3148 // Make sure that the 'queries' dir exist
3154 fQueryDir += TString("/session-") + fTopSessionTag;
3157 if (gProofDebugLevel > 0)
3158 Info("SetupCommon", "queries dir is %s", fQueryDir.Data());
3159
3160 // Create 'queries' locker instance and lock it
3161 fQueryLock = new TProofLockPath(TString::Format("%s/%s%s-%s",
3164 TString(fQueryDir).ReplaceAll("/","%").Data()));
3165 fQueryLock->Lock();
3166 // Create the query manager
3168 fQueryLock, 0);
3169 }
3170
3171 // Server image
3172 fImage = gEnv->GetValue("ProofServ.Image", "");
3173
3174 // Get the group priority
3175 if (IsMaster()) {
3176 // Send session tag to client
3178 m << fTopSessionTag << fGroup << fUser;
3179 fSocket->Send(m);
3180 // Group priority
3182 // Dataset manager instance via plug-in
3183 TPluginHandler *h = 0;
3184 TString dsms = gEnv->GetValue("Proof.DataSetManager", "");
3185 if (!dsms.IsNull()) {
3186 TString dsm;
3187 Int_t from = 0;
3188 while (dsms.Tokenize(dsm, from, ",")) {
3190 Warning("SetupCommon", "a valid dataset manager already initialized");
3191 Warning("SetupCommon", "support for multiple managers not yet available");
3192 break;
3193 }
3194 // Get plugin manager to load the appropriate TDataSetManager
3195 if (gROOT->GetPluginManager()) {
3196 // Find the appropriate handler
3197 h = gROOT->GetPluginManager()->FindHandler("TDataSetManager", dsm);
3198 if (h && h->LoadPlugin() != -1) {
3199 // make instance of the dataset manager
3201 reinterpret_cast<TDataSetManager*>(h->ExecPlugin(3, fGroup.Data(),
3202 fUser.Data(), dsm.Data()));
3203 }
3204 }
3205 }
3206 // Check the result of the dataset manager initialization
3208 Warning("SetupCommon", "dataset manager plug-in initialization failed");
3209 SendAsynMessage("TXProofServ::SetupCommon: dataset manager plug-in initialization failed");
3211 }
3212 } else {
3213 // Initialize the default dataset manager
3214 TString opts("Av:");
3215 TString dsetdir = gEnv->GetValue("ProofServ.DataSetDir", "");
3216 if (dsetdir.IsNull()) {
3217 // Use the default in the sandbox
3218 dsetdir.Form("%s/%s", fWorkDir.Data(), kPROOF_DataSetDir);
3221 opts += "Sb:";
3222 }
3223 // Find the appropriate handler
3224 if (!h) {
3225 h = gROOT->GetPluginManager()->FindHandler("TDataSetManager", "file");
3226 if (h && h->LoadPlugin() == -1) h = 0;
3227 }
3228 if (h) {
3229 // make instance of the dataset manager
3230 TString oo = TString::Format("dir:%s opt:%s", dsetdir.Data(), opts.Data());
3231 fDataSetManager = reinterpret_cast<TDataSetManager*>(h->ExecPlugin(3,
3232 fGroup.Data(), fUser.Data(), oo.Data()));
3233 }
3235 Warning("SetupCommon", "default dataset manager plug-in initialization failed");
3237 }
3238 }
3239 // Dataset manager for staging requests
3240 TString dsReqCfg = gEnv->GetValue("Proof.DataSetStagingRequests", "");
3241 if (!dsReqCfg.IsNull()) {
3242 TPMERegexp reReqDir("(^| )(dir:)?([^ ]+)( |$)");
3243
3244 if (reReqDir.Match(dsReqCfg) == 5) {
3245 TString dsDirFmt;
3246 dsDirFmt.Form("dir:%s perms:open", reReqDir[3].Data());
3247 fDataSetStgRepo = new TDataSetManagerFile("_stage_", "_stage_",
3248 dsDirFmt);
3249 if (fDataSetStgRepo &&
3251 Warning("SetupCommon",
3252 "failed init of dataset staging requests repository");
3254 }
3255 } else {
3256 Warning("SetupCommon",
3257 "specify, with [dir:]<path>, a valid path for staging requests");
3258 }
3259 } else if (gProofDebugLevel > 0) {
3260 Warning("SetupCommon", "no repository for staging requests available");
3261 }
3262 }
3263
3264 // Quotas
3265 TString quotas = gEnv->GetValue(TString::Format("ProofServ.UserQuotas.%s", fUser.Data()),"");
3266 if (quotas.IsNull())
3267 quotas = gEnv->GetValue(TString::Format("ProofServ.UserQuotasByGroup.%s", fGroup.Data()),"");
3268 if (quotas.IsNull())
3269 quotas = gEnv->GetValue("ProofServ.UserQuotas", "");
3270 if (!quotas.IsNull()) {
3271 // Parse it; format ("maxquerykept=10 hwmsz=800m maxsz=1g")
3272 TString tok;
3273 Ssiz_t from = 0;
3274 while (quotas.Tokenize(tok, from, " ")) {
3275 // Set max number of query results to keep
3276 if (tok.BeginsWith("maxquerykept=")) {
3277 tok.ReplaceAll("maxquerykept=","");
3278 if (tok.IsDigit())
3279 fMaxQueries = tok.Atoi();
3280 else
3281 Info("SetupCommon",
3282 "parsing 'maxquerykept' :ignoring token %s : not a digit", tok.Data());
3283 }
3284 // Set High-Water-Mark or max on the sandbox size
3285 const char *ksz[2] = {"hwmsz=", "maxsz="};
3286 for (Int_t j = 0; j < 2; j++) {
3287 if (tok.BeginsWith(ksz[j])) {
3288 tok.ReplaceAll(ksz[j],"");
3289 Long64_t fact = -1;
3290 if (!tok.IsDigit()) {
3291 // Parse (k, m, g)
3292 tok.ToLower();
3293 const char *s[3] = {"k", "m", "g"};
3294 Int_t i = 0, ki = 1024;
3295 while (fact < 0) {
3296 if (tok.EndsWith(s[i++]))
3297 fact = ki;
3298 else
3299 ki *= 1024;
3300 }
3301 tok.Remove(tok.Length()-1);
3302 }
3303 if (tok.IsDigit()) {
3304 if (j == 0)
3305 fHWMBoxSize = (fact > 0) ? tok.Atoi() * fact : tok.Atoi();
3306 else
3307 fMaxBoxSize = (fact > 0) ? tok.Atoi() * fact : tok.Atoi();
3308 } else {
3309 TString ssz(ksz[j], strlen(ksz[j])-1);
3310 Info("SetupCommon", "parsing '%s' : ignoring token %s", ssz.Data(), tok.Data());
3311 }
3312 }
3313 }
3314 }
3315 }
3316
3317 // Apply quotas, if any
3318 if (IsMaster() && fQMgr)
3320 Warning("SetupCommon", "problems applying fMaxQueries");
3321
3322 // Send "ROOTversion|ArchCompiler" flag
3323 if (fProtocol > 12) {
3324 TString vac = gROOT->GetVersion();
3325 vac += TString::Format(":%s", gROOT->GetGitCommit());
3326 TString rtag = gEnv->GetValue("ProofServ.RootVersionTag", "");
3327 if (rtag.Length() > 0)
3328 vac += TString::Format(":%s", rtag.Data());
3331 m << vac;
3332 fSocket->Send(m);
3333 }
3334
3335 // Set user vars in TProof
3336 TString all_vars(gSystem->Getenv("PROOF_ALLVARS"));
3337 TString name;
3338 Int_t from = 0;
3339 while (all_vars.Tokenize(name, from, ",")) {
3340 if (!name.IsNull()) {
3343 }
3344 }
3345
3346 if (fgLogToSysLog > 0) {
3347 // Set the syslog entity (all the information is available now)
3348 if (!(fUser.IsNull()) && !(fGroup.IsNull())) {
3349 fgSysLogEntity.Form("%s:%s", fUser.Data(), fGroup.Data());
3350 } else if (!(fUser.IsNull()) && fGroup.IsNull()) {
3351 fgSysLogEntity.Form("%s:default", fUser.Data());
3352 } else if (fUser.IsNull() && !(fGroup.IsNull())) {
3353 fgSysLogEntity.Form("undef:%s", fGroup.Data());
3354 }
3355 // Log the beginning of this session
3356 TString s;
3357 s.Form("%s 0 %.3f %.3f", fgSysLogEntity.Data(), fRealTime, fCpuTime);
3359 }
3360
3361 if (gProofDebugLevel > 0)
3362 Info("SetupCommon", "successfully completed");
3363
3364 // Done
3365 return 0;
3366}
3367
3368////////////////////////////////////////////////////////////////////////////////
3369/// Terminate the proof server.
3370
3372{
3373 if (fgLogToSysLog > 0) {
3374 TString s;
3375 s.Form("%s -1 %.3f %.3f %d", fgSysLogEntity.Data(), fRealTime, fCpuTime, status);
3377 }
3378
3379 // Notify the memory footprint
3380 ProcInfo_t pi;
3381 if (!gSystem->GetProcInfo(&pi)){
3382 Info("Terminate", "process memory footprint: %ld/%ld kB virtual, %ld/%ld kB resident ",
3384 }
3385
3386 // Cleanup session directory
3387 if (status == 0) {
3388 // make sure we remain in a "connected" directory
3390 // needed in case fSessionDir is on NFS ?!
3391 gSystem->MakeDirectory(fSessionDir+"/.delete");
3393 }
3394
3395 // Cleanup queries directory if empty
3396 if (IsMaster()) {
3397 if (!(fQMgr && fQMgr->Queries() && fQMgr->Queries()->GetSize())) {
3398 // make sure we remain in a "connected" directory
3400 // needed in case fQueryDir is on NFS ?!
3401 gSystem->MakeDirectory(fQueryDir+"/.delete");
3403 // Remove lock file
3404 if (fQueryLock)
3406 }
3407
3408 // Unlock the query dir owned by this session
3409 if (fQueryLock)
3410 fQueryLock->Unlock();
3411 }
3412
3413 // Cleanup data directory if empty
3416 Info("Terminate", "data directory '%s' has been removed", fDataDir.Data());
3417 }
3418
3419 // Remove input handler to avoid spurious signals in socket
3420 // selection for closing activities executed upon exit()
3422 TObject *fh = 0;
3423 while ((fh = next())) {
3424 TProofServInputHandler *ih = dynamic_cast<TProofServInputHandler *>(fh);
3425 if (ih)
3427 }
3428
3429 // Stop processing events
3430 gSystem->ExitLoop();
3431
3432 // Exit() is called in pmain
3433}
3434
3435////////////////////////////////////////////////////////////////////////////////
3436/// Scan recursively the datadir and unlink it if empty
3437/// Return kTRUE if it can be unlinked, kFALSE otherwise
3438
3440{
3441 if (!path || strlen(path) <= 0) return kFALSE;
3442
3443 Bool_t dorm = kTRUE;
3444 void *dirp = gSystem->OpenDirectory(path);
3445 if (dirp) {
3446 TString fpath;
3447 const char *ent = 0;
3448 while (dorm && (ent = gSystem->GetDirEntry(dirp))) {
3449 if (!strcmp(ent, ".") || !strcmp(ent, "..")) continue;
3450 fpath.Form("%s/%s", path, ent);
3451 FileStat_t st;
3452 if (gSystem->GetPathInfo(fpath, st) == 0 && R_ISDIR(st.fMode)) {
3453 dorm = UnlinkDataDir(fpath);
3454 } else {
3455 dorm = kFALSE;
3456 }
3457 }
3458 // Close the directory
3459 gSystem->FreeDirectory(dirp);
3460 } else {
3461 // Cannot open the directory
3462 dorm = kFALSE;
3463 }
3464
3465 // Do remove, if required
3466 if (dorm && gSystem->Unlink(path) != 0)
3467 Warning("UnlinkDataDir", "data directory '%s' is empty but could not be removed", path);
3468 // done
3469 return dorm;
3470}
3471
3472////////////////////////////////////////////////////////////////////////////////
3473/// Static function that returns kTRUE in case we are a PROOF server.
3474
3476{
3477 return gProofServ ? kTRUE : kFALSE;
3478}
3479
3480////////////////////////////////////////////////////////////////////////////////
3481/// Static function returning pointer to global object gProofServ.
3482/// Mainly for use via CINT, where the gProofServ symbol might be
3483/// deleted from the symbol table.
3484
3486{
3487 return gProofServ;
3488}
3489
3490////////////////////////////////////////////////////////////////////////////////
3491/// Setup authentication related stuff for old versions.
3492/// Provided for backward compatibility.
3493
3495{
3496 OldProofServAuthSetup_t oldAuthSetupHook = 0;
3497
3498 if (!oldAuthSetupHook) {
3499 // Load libraries needed for (server) authentication ...
3500 TString authlib = "libRootAuth";
3501 char *p = 0;
3502 // The generic one
3503 if ((p = gSystem->DynamicPathName(authlib, kTRUE))) {
3504 delete[] p;
3505 if (gSystem->Load(authlib) == -1) {
3506 Error("OldAuthSetup", "can't load %s",authlib.Data());
3507 return kFALSE;
3508 }
3509 } else {
3510 Error("OldAuthSetup", "can't locate %s",authlib.Data());
3511 return -1;
3512 }
3513 //
3514 // Locate OldProofServAuthSetup
3515 Func_t f = gSystem->DynFindSymbol(authlib,"OldProofServAuthSetup");
3516 if (f)
3517 oldAuthSetupHook = (OldProofServAuthSetup_t)(f);
3518 else {
3519 Error("OldAuthSetup", "can't find OldProofServAuthSetup");
3520 return -1;
3521 }
3522 }
3523 //
3524 // Setup
3525 return (*oldAuthSetupHook)(fSocket, IsMaster(), fProtocol,
3526 fUser, fOrdinal, conf);
3527}
3528
3529////////////////////////////////////////////////////////////////////////////////
3530/// Create a TProofQueryResult instance for this query.
3531
3533 const char *opt,
3534 TList *inlist, Long64_t fst,
3535 TDSet *dset, const char *selec,
3536 TObject *elist)
3537{
3538 // Increment sequential number
3539 Int_t seqnum = -1;
3540 if (fQMgr) {
3542 seqnum = fQMgr->SeqNum();
3543 }
3544
3545 // Locally we always use the current streamer
3546 Bool_t olds = (dset && dset->TestBit(TDSet::kWriteV3)) ? kTRUE : kFALSE;
3547 if (olds)
3548 dset->SetWriteV3(kFALSE);
3549
3550 // Create the instance and add it to the list
3551 TProofQueryResult *pqr = new TProofQueryResult(seqnum, opt, inlist, nent,
3552 fst, dset, selec, elist);
3553 // Title is the session identifier
3555
3556 // Restore old streamer info
3557 if (olds)
3558 dset->SetWriteV3(kTRUE);
3559
3560 return pqr;
3561}
3562
3563////////////////////////////////////////////////////////////////////////////////
3564/// Set query in running state.
3565
3567{
3568 // Record current position in the log file at start
3569 fflush(stdout);
3570 Int_t startlog = lseek(fileno(stdout), (off_t) 0, SEEK_END);
3571
3572 // Add some header to logs
3573 Printf(" ");
3574 Info("SetQueryRunning", "starting query: %d", pq->GetSeqNum());
3575
3576 // Build the list of loaded PAR packages
3577 TString parlist = "";
3578 fPackMgr->GetEnabledPackages(parlist);
3579
3580 if (fProof) {
3581 // Set in running state
3582 pq->SetRunning(startlog, parlist, fProof->GetParallel());
3583
3584 // Bytes and CPU at start (we will calculate the differential at end)
3585 pq->SetProcessInfo(pq->GetEntries(),
3587 } else {
3588 // Set in running state
3589 pq->SetRunning(startlog, parlist, -1);
3590
3591 // Bytes and CPU at start (we will calculate the differential at end)
3592 pq->SetProcessInfo(pq->GetEntries(), float(0.), 0);
3593 }
3594}
3595
3596////////////////////////////////////////////////////////////////////////////////
3597/// Handle archive request.
3598
3600{
3601 PDB(kGlobal, 1)
3602 Info("HandleArchive", "Enter");
3603
3604 TString queryref;
3605 TString path;
3606 (*mess) >> queryref >> path;
3607
3608 if (slb) slb->Form("%s %s", queryref.Data(), path.Data());
3609
3610 // If this is a set default action just save the default
3611 if (queryref == "Default") {
3612 fArchivePath = path;
3613 Info("HandleArchive",
3614 "default path set to %s", fArchivePath.Data());
3615 return;
3616 }
3617
3618 Int_t qry = -1;
3619 TString qdir;
3620 TProofQueryResult *pqr = fQMgr ? fQMgr->LocateQuery(queryref, qry, qdir) : 0;
3621 TProofQueryResult *pqm = pqr;
3622
3623 if (path.Length() <= 0) {
3624 if (fArchivePath.Length() <= 0) {
3625 Info("HandleArchive",
3626 "archive paths are not defined - do nothing");
3627 return;
3628 }
3629 if (qry > 0) {
3630 path.Form("%s/session-%s-%d.root",
3632 } else {
3633 path = queryref;
3634 path.ReplaceAll(":q","-");
3635 path.Insert(0, TString::Format("%s/",fArchivePath.Data()));
3636 path += ".root";
3637 }
3638 }
3639
3640 // Build file name for specific query
3641 if (!pqr || qry < 0) {
3642 TString fout = qdir;
3643 fout += "/query-result.root";
3644
3645 TFile *f = TFile::Open(fout,"READ");
3646 pqr = 0;
3647 if (f) {
3648 f->ReadKeys();
3649 TIter nxk(f->GetListOfKeys());
3650 TKey *k = 0;
3651 while ((k = (TKey *)nxk())) {
3652 if (!strcmp(k->GetClassName(), "TProofQueryResult")) {
3653 pqr = (TProofQueryResult *) f->Get(k->GetName());
3654 if (pqr)
3655 break;
3656 }
3657 }
3658 f->Close();
3659 delete f;
3660 } else {
3661 Info("HandleArchive",
3662 "file cannot be open (%s)",fout.Data());
3663 return;
3664 }
3665 }
3666
3667 if (pqr) {
3668
3669 PDB(kGlobal, 1) Info("HandleArchive",
3670 "archive path for query #%d: %s",
3671 qry, path.Data());
3672 TFile *farc = 0;
3673 if (gSystem->AccessPathName(path))
3674 farc = TFile::Open(path,"NEW");
3675 else
3676 farc = TFile::Open(path,"UPDATE");
3677 if (!farc || !(farc->IsOpen())) {
3678 Info("HandleArchive",
3679 "archive file cannot be open (%s)",path.Data());
3680 return;
3681 }
3682 farc->cd();
3683
3684 // Update query status
3685 pqr->SetArchived(path);
3686 if (pqm)
3687 pqm->SetArchived(path);
3688
3689 // Write to file
3690 pqr->Write();
3691
3692 // Update temporary files too
3693 if (qry > -1 && fQMgr)
3694 fQMgr->SaveQuery(pqr);
3695
3696 // Notify
3697 Info("HandleArchive",
3698 "results of query %s archived to file %s",
3699 queryref.Data(), path.Data());
3700 }
3701
3702 // Done
3703 return;
3704}
3705
3706////////////////////////////////////////////////////////////////////////////////
3707/// Get a map {server-name, list-of-files} for collection 'fc' to be used in
3708/// TPacketizerFile. Returns a pointer to the map (ownership of the caller).
3709/// Or (TMap *)0 and an error message in emsg.
3710
3712{
3713 TMap *fcmap = 0;
3714 emsg = "";
3715
3716 // Sanity checks
3717 if (!fc) {
3718 emsg.Form("file collection undefined!");
3719 return fcmap;
3720 }
3721
3722 // Prepare data set map
3723 fcmap = new TMap();
3724
3725 TIter nxf(fc->GetList());
3726 TFileInfo *fiind = 0;
3727 TString key;
3728 while ((fiind = (TFileInfo *)nxf())) {
3729 TUrl *xurl = fiind->GetCurrentUrl();
3730 // Find the key for this server
3731 key.Form("%s://%s", xurl->GetProtocol(), xurl->GetHostFQDN());
3732 if (xurl->GetPort() > 0)
3733 key += TString::Format(":%d", xurl->GetPort());
3734 // Get the map entry for this key
3735 TPair *ent = 0;
3736 THashList* l = 0;
3737 if ((ent = (TPair *) fcmap->FindObject(key.Data()))) {
3738 // Attach to the list
3739 l = (THashList *) ent->Value();
3740 } else {
3741 // Create list
3742 l = new THashList;
3743 l->SetOwner(kTRUE);
3744 // Add it to the map
3745 fcmap->Add(new TObjString(key.Data()), l);
3746 }
3747 // Add fileinfo with index to list
3748 l->Add(fiind);
3749 }
3750
3751 // Done
3752 return fcmap;
3753}
3754
3755////////////////////////////////////////////////////////////////////////////////
3756/// Handle processing request.
3757
3759{
3760 PDB(kGlobal, 1)
3761 Info("HandleProcess", "Enter");
3762
3763 // Nothing to do for slaves if we are not idle
3764 if (!IsTopMaster() && !IsIdle())
3765 return;
3766
3767 TDSet *dset;
3768 TString filename, opt;
3769 TList *input;
3770 Long64_t nentries, first;
3771 TEventList *evl = 0;
3772 TEntryList *enl = 0;
3773 Bool_t sync;
3774
3775 (*mess) >> dset >> filename >> input >> opt >> nentries >> first >> evl >> sync;
3776 // Get entry list information, if any (support started with fProtocol == 15)
3777 if ((mess->BufferSize() > mess->Length()) && fProtocol > 14)
3778 (*mess) >> enl;
3779 Bool_t hasNoData = (!dset || dset->TestBit(TDSet::kEmpty)) ? kTRUE : kFALSE;
3780
3781 // Priority to the entry list
3782 TObject *elist = (enl) ? (TObject *)enl : (TObject *)evl;
3783 if (enl && evl)
3784 // Cannot specify both at the same time
3785 SafeDelete(evl);
3786 if ((!hasNoData) && elist)
3787 dset->SetEntryList(elist);
3788
3789 if (IsTopMaster()) {
3790
3791 TString emsg;
3792 // Make sure the dataset contains the information needed
3793 if ((!hasNoData) && dset->GetListOfElements()->GetSize() == 0) {
3794 if (TProof::AssertDataSet(dset, input, fDataSetManager, emsg) != 0) {
3795 SendAsynMessage(TString::Format("AssertDataSet on %s: %s",
3796 fPrefix.Data(), emsg.Data()));
3797 Error("HandleProcess", "AssertDataSet: %s", emsg.Data());
3798 // To terminate collection
3799 if (sync) SendLogFile();
3800 return;
3801 }
3802 } else if (hasNoData) {
3803 // Check if we are required to process with TPacketizerFile a registered dataset
3804 TNamed *ftp = dynamic_cast<TNamed *>(input->FindObject("PROOF_FilesToProcess"));
3805 if (ftp) {
3806 TString dsn(ftp->GetTitle());
3807 if (!dsn.Contains(":") || dsn.BeginsWith("dataset:")) {
3808 dsn.ReplaceAll("dataset:", "");
3809 // Get the map for TPacketizerFile
3810 // Make sure we have something in input and a dataset manager
3811 if (!fDataSetManager) {
3812 emsg.Form("dataset manager not initialized!");
3813 } else {
3814 TFileCollection *fc = 0;
3815 // Get the dataset
3816 if (!(fc = fDataSetManager->GetDataSet(dsn))) {
3817 emsg.Form("requested dataset '%s' does not exists", dsn.Data());
3818 } else {
3819 TMap *fcmap = GetDataSetNodeMap(fc, emsg);
3820 if (fcmap) {
3821 input->Remove(ftp);
3822 delete ftp;
3823 fcmap->SetOwner(kTRUE);
3824 fcmap->SetName("PROOF_FilesToProcess");
3825 input->Add(fcmap);
3826 }
3827 }
3828 }
3829 if (!emsg.IsNull()) {
3830 SendAsynMessage(TString::Format("HandleProcess on %s: %s",
3831 fPrefix.Data(), emsg.Data()));
3832 Error("HandleProcess", "%s", emsg.Data());
3833 // To terminate collection
3834 if (sync) SendLogFile();
3835 return;
3836 }
3837 }
3838 }
3839 }
3840
3841 TProofQueryResult *pq = 0;
3842
3843 // Create instance of query results; we set ownership of the input list
3844 // to the TQueryResult object, to avoid too many instantiations
3845 pq = MakeQueryResult(nentries, opt, 0, first, 0, filename, 0);
3846
3847 // Prepare the input list and transfer it into the TQueryResult object
3848 if (dset) input->Add(dset);
3849 if (elist) input->Add(elist);
3850 pq->SetInputList(input, kTRUE);
3851
3852 // Clear the list
3853 input->Clear("nodelete");
3855
3856 // Save input data, if any
3857 if (TProof::SaveInputData(pq, fCacheDir.Data(), emsg) != 0)
3858 Warning("HandleProcess", "could not save input data: %s", emsg.Data());
3859
3860 // If not a draw action add the query to the main list
3861 if (!(pq->IsDraw())) {
3862 if (fQMgr) {
3863 if (fQMgr->Queries()) fQMgr->Queries()->Add(pq);
3864 // Also save it to queries dir
3865 fQMgr->SaveQuery(pq);
3866 }
3867 }
3868
3869 // Add anyhow to the waiting lists
3870 QueueQuery(pq);
3871
3872 // Call get Workers
3873 // if we are not idle the scheduler will just enqueue the query and
3874 // send a resume message later.
3875
3876 Bool_t enqueued = kFALSE;
3877 Int_t pc = 0;
3878 // if the session does not have workers and is in the dynamic mode
3879 if (fProof->UseDynamicStartup()) {
3880 // get the a list of workers and start them
3881 TList* workerList = new TList();
3882 EQueryAction retVal = GetWorkers(workerList, pc);
3883 if (retVal == TProofServ::kQueryStop) {
3884 Error("HandleProcess", "error getting list of worker nodes");
3885 // To terminate collection
3886 if (sync) SendLogFile();
3887 return;
3888 } else if (retVal == TProofServ::kQueryEnqueued) {
3889 // change to an asynchronous query
3890 enqueued = kTRUE;
3891 Info("HandleProcess", "query %d enqueued", pq->GetSeqNum());
3892 } else {
3893 Int_t ret = fProof->AddWorkers(workerList);
3894 if (ret < 0) {
3895 Error("HandleProcess", "Adding a list of worker nodes returned: %d",
3896 ret);
3897 // To terminate collection
3898 if (sync) SendLogFile();
3899 return;
3900 }
3901 }
3902 } else {
3903 EQueryAction retVal = GetWorkers(0, pc);
3904 if (retVal == TProofServ::kQueryStop) {
3905 Error("HandleProcess", "error getting list of worker nodes");
3906 // To terminate collection
3907 if (sync) SendLogFile();
3908 return;
3909 } else if (retVal == TProofServ::kQueryEnqueued) {
3910 // change to an asynchronous query
3911 enqueued = kTRUE;
3912 Info("HandleProcess", "query %d enqueued", pq->GetSeqNum());
3913 } else if (retVal != TProofServ::kQueryOK) {
3914 Error("HandleProcess", "unknown return value: %d", retVal);
3915 // To terminate collection
3916 if (sync) SendLogFile();
3917 return;
3918 }
3919 }
3920
3921 // If the client submission was asynchronous, signal the submission of
3922 // the query and communicate the assigned sequential number for later
3923 // identification
3925 if (!sync || enqueued) {
3926 m << pq->GetSeqNum() << kFALSE;
3927 fSocket->Send(m);
3928 }
3929
3930 // Nothing more to do if we are not idle
3931 if (!IsIdle()) {
3932 // Notify submission
3933 Info("HandleProcess",
3934 "query \"%s:%s\" submitted", pq->GetTitle(), pq->GetName());
3935 return;
3936 }
3937
3938 // Process
3939 // in the static mode, if a session is enqueued it will be processed after current query
3940 // (there is no way to enqueue if idle).
3941 // in the dynamic mode we will process here only if the session was idle and got workers!
3942 Bool_t doprocess = kFALSE;
3943 while (WaitingQueries() > 0 && !enqueued) {
3944 doprocess = kTRUE;
3945 //
3946 ProcessNext(slb);
3947 // avoid processing async queries sent during processing in dyn mode
3949 enqueued = kTRUE;
3950
3951 } // Loop on submitted queries
3952
3953 // Set idle
3954 SetIdle(kTRUE);
3955
3956 // Reset mergers
3958
3959 // kPROOF_SETIDLE sets the client to idle; in asynchronous mode clients monitor
3960 // TProof::IsIdle for to check the readiness of a query, so we need to send this
3961 // before to be sure thatn everything about a query is received by the client
3962 if (!sync) SendLogFile();
3963
3964 // Signal the client that we are idle
3965 if (doprocess) {
3966 m.Reset(kPROOF_SETIDLE);
3967 Bool_t waiting = (WaitingQueries() > 0) ? kTRUE : kFALSE;
3968 m << waiting;
3969 fSocket->Send(m);
3970 }
3971
3972 // In synchronous mode TProof::Collect is terminated by the reception of the
3973 // log file and subsequent submissions are controlled by TProof::IsIdle(), so
3974 // this must be last one to be sent
3975 if (sync) SendLogFile();
3976
3977 // Set idle
3978 SetIdle(kTRUE);
3979
3980 } else {
3981
3982 // Reset compute stopwatch: we include all what done from now on
3983 fCompute.Reset();
3984 fCompute.Start();
3985
3986 // Set not idle
3987 SetIdle(kFALSE);
3988
3989 // Cleanup the player
3990 Bool_t deleteplayer = kTRUE;
3991 MakePlayer();
3992
3993 // Setup data set
3994 if (dset && (dset->IsA() == TDSetProxy::Class()))
3995 ((TDSetProxy*)dset)->SetProofServ(this);
3996
3997 // Get input data, if any
3998 TString emsg;
3999 if (TProof::GetInputData(input, fCacheDir.Data(), emsg) != 0)
4000 Warning("HandleProcess", "could not get input data: %s", emsg.Data());
4001
4002 // Get query sequential number
4003 if (TProof::GetParameter(input, "PROOF_QuerySeqNum", fQuerySeqNum) != 0)
4004 Warning("HandleProcess", "could not get query sequential number!");
4005
4006 // Make the ordinal number available in the selector
4007 TObject *nord = 0;
4008 while ((nord = input->FindObject("PROOF_Ordinal")))
4009 input->Remove(nord);
4010 input->Add(new TNamed("PROOF_Ordinal", GetOrdinal()));
4011
4012 // Set input
4013 TIter next(input);
4014 TObject *o = 0;
4015 while ((o = next())) {
4016 PDB(kGlobal, 2) Info("HandleProcess", "adding: %s", o->GetName());
4017 fPlayer->AddInput(o);
4018 }
4019
4020 // Check if a TSelector object is passed via input list
4021 TObject *obj = 0;
4022 TSelector *selector_obj = 0;
4023 TIter nxt(input);
4024 while ((obj = nxt())){
4025 if (obj->InheritsFrom("TSelector")) {
4026 selector_obj = (TSelector *) obj;
4027 filename = selector_obj->ClassName();
4028 Info("HandleProcess", "selector obj for '%s' found", selector_obj->ClassName());
4029 break;
4030 }
4031 }
4032
4033 // Signal the master that we are starting processing
4035
4036 // Reset latency stopwatch
4037 fLatency.Reset();
4039
4040 // Process
4041 PDB(kGlobal, 1) Info("HandleProcess", "calling %s::Process()", fPlayer->IsA()->GetName());
4042
4043 if (selector_obj){
4044 Info("HandleProcess", "calling fPlayer->Process() with selector object: %s", selector_obj->ClassName());
4045 fPlayer->Process(dset, selector_obj, opt, nentries, first);
4046 }
4047 else {
4048 Info("HandleProcess", "calling fPlayer->Process() with selector name: %s", filename.Data());
4049 fPlayer->Process(dset, filename, opt, nentries, first);
4050 }
4051
4052 // Return number of events processed
4055 if (fProtocol > 18) {
4056 TProofProgressStatus* status =
4058 gPerfStats?gPerfStats->GetBytesRead():0);
4059 if (status)
4060 m << status << abort;
4061 if (slb)
4062 slb->Form("%d %lld %lld", fPlayer->GetExitStatus(),
4063 status->GetEntries(), status->GetBytesRead());
4064 SafeDelete(status);
4065 } else {
4066 m << fPlayer->GetEventsProcessed() << abort;
4067 if (slb)
4068 slb->Form("%d %lld -1", fPlayer->GetExitStatus(), fPlayer->GetEventsProcessed());
4069 }
4070
4071 fSocket->Send(m);
4072 PDB(kGlobal, 2)
4073 Info("TProofServ::Handleprocess",
4074 "worker %s has finished processing with %d objects in output list",
4076
4077 // Cleanup the input data set info
4078 SafeDelete(dset);
4079 SafeDelete(enl);
4080 SafeDelete(evl);
4081
4084 if (outok) {
4085 // Check if in controlled output sending mode or submerging
4086 Int_t cso = 0;
4087 Bool_t isSubMerging = kFALSE;
4088
4089 // Check if we are in merging mode (i.e. parameter PROOF_UseMergers exists)
4090 Int_t nm = 0;
4091 if (TProof::GetParameter(input, "PROOF_UseMergers", nm) == 0) {
4092 isSubMerging = (nm >= 0) ? kTRUE : kFALSE;
4093 }
4094 if (!isSubMerging) {
4095 cso = gEnv->GetValue("Proof.ControlSendOutput", 1);
4096 if (TProof::GetParameter(input, "PROOF_ControlSendOutput", cso) != 0)
4097 cso = gEnv->GetValue("Proof.ControlSendOutput", 1);
4098 }
4099
4100 if (cso > 0) {
4101
4102 // Control output sending mode: wait for the master to ask for the objects.
4103 // Allows controls of memory usage on the master.
4105 fSocket->Send(msg);
4106
4107 // Set idle
4108 SetIdle(kTRUE);
4109
4110 // Do not cleanup the player yet: it will be used in sending output activities
4111 deleteplayer = kFALSE;
4112
4113 PDB(kGlobal, 1)
4114 Info("HandleProcess", "controlled mode: worker %s has finished,"
4115 " sizes sent to master", fOrdinal.Data());
4116 } else {
4117
4118 // Check if we are in merging mode (i.e. parameter PROOF_UseMergers exists)
4120 if (isSubMerging)
4121 Info("HandleProcess", "submerging disabled because of high-memory case");
4122 isSubMerging = kFALSE;
4123 } else {
4124 PDB(kGlobal, 2) Info("HandleProcess", "merging mode check: %d", isSubMerging);
4125 }
4126
4127 if (!IsMaster() && isSubMerging) {
4128 // Worker in merging mode.
4129 //----------------------------
4130 // First, it reports only the size of its output to the master
4131 // + port on which it can possibly accept outputs from other workers if it becomes a merger
4132 // Master will later tell it where it should send the output (either to the master or to some merger)
4133 // or if it should become a merger
4134
4135 TMessage msg_osize(kPROOF_SUBMERGER);
4136 msg_osize << Int_t(TProof::kOutputSize);
4137 msg_osize << fPlayer->GetOutputList()->GetEntries();
4138
4140 Int_t merge_port = 0;
4141 if (fMergingSocket) {
4142 PDB(kGlobal, 2)
4143 Info("HandleProcess", "possible port for merging connections: %d",
4145 merge_port = fMergingSocket->GetLocalPort();
4146 }
4147 msg_osize << merge_port;
4148 fSocket->Send(msg_osize);
4149
4150 // Set idle
4151 SetIdle(kTRUE);
4152
4153 // Do not cleanup the player yet: it will be used in sub-merging activities
4154 deleteplayer = kFALSE;
4155
4156 PDB(kSubmerger, 2) Info("HandleProcess", "worker %s has finished", fOrdinal.Data());
4157
4158 } else {
4159 // Sub-master OR worker not in merging mode
4160 // ---------------------------------------------
4161 PDB(kGlobal, 2) Info("HandleProcess", "sending result directly to master");
4163 Warning("HandleProcess","problems sending output list");
4164
4165 // Masters reset the mergers, if any
4166 if (IsMaster()) fProof->ResetMergers();
4167
4168 // Signal the master that we are idle
4170
4171 // Set idle
4172 SetIdle(kTRUE);
4173
4174 // Notify the user
4175 SendLogFile();
4176 }
4177
4178
4179
4180 }
4181
4182 } else {
4183 // No output list
4185 Warning("HandleProcess","the output list is empty!");
4186 if (SendResults(fSocket) != 0)
4187 Warning("HandleProcess", "problems sending output list");
4188
4189 // Masters reset the mergers, if any
4190 if (IsMaster()) fProof->ResetMergers();
4191
4192 // Signal the master that we are idle
4194
4195 // Set idle
4196 SetIdle(kTRUE);
4197
4198 // Notify the user
4199 SendLogFile();
4200 }
4201
4202 // Prevent from double-deleting in input
4203 TIter nex(input);
4204 while ((obj = nex())) {
4205 if (obj->InheritsFrom("TSelector")) input->Remove(obj);
4206 }
4207
4208 // Make also sure the input list objects are deleted
4210
4211 // Remove possible inputs from a file and the file, if any
4212 TList *added = dynamic_cast<TList *>(input->FindObject("PROOF_InputObjsFromFile"));
4213 if (added) {
4214 if (added->GetSize() > 0) {
4215 // The file must be the last one
4216 TFile *f = dynamic_cast<TFile *>(added->Last());
4217 if (f) {
4218 added->Remove(f);
4219 TIter nxo(added);
4220 while ((o = nxo())) { input->Remove(o); }
4221 input->Remove(added);
4222 added->SetOwner(kFALSE);
4223 added->Clear();
4224 f->Close();
4225 delete f;
4226 }
4227 }
4228 SafeDelete(added);
4229 }
4230 input->SetOwner();
4232
4233 // Cleanup if required
4234 if (deleteplayer) DeletePlayer();
4235 }
4236
4237 PDB(kGlobal, 1) Info("HandleProcess", "done");
4238
4239 // Done
4240 return;
4241}
4242
4243////////////////////////////////////////////////////////////////////////////////
4244/// Sends all objects from the given list to the specified socket
4245
4247{
4248 PDB(kOutput, 2) Info("SendResults", "enter");
4249
4250 TString msg;
4251 if (fProtocol > 23 && outlist) {
4252 // Send objects in bunches of max fMsgSizeHWM bytes to optimize transfer
4253 // Objects are merged one-by-one by the client
4254 // Messages for objects
4256 // Objects in the output list
4257 Int_t olsz = outlist->GetSize();
4258 if (IsTopMaster() && pq) {
4259 msg.Form("%s: merging output objects ... done ",
4260 fPrefix.Data());
4261 SendAsynMessage(msg.Data());
4262 // Message for the client
4263 msg.Form("%s: objects merged; sending output: %d objs", fPrefix.Data(), olsz);
4264 SendAsynMessage(msg.Data(), kFALSE);
4265 // Send light query info
4266 mbuf << (Int_t) 0;
4267 mbuf.WriteObject(pq);
4268 if (sock->Send(mbuf) < 0) return -1;
4269 }
4270 // Objects in the output list
4271 Int_t ns = 0, np = 0;
4272 TIter nxo(outlist);
4273 TObject *o = 0;
4274 Int_t totsz = 0, objsz = 0;
4275 mbuf.Reset();
4276 while ((o = nxo())) {
4277 if (mbuf.Length() > fMsgSizeHWM) {
4278 PDB(kOutput, 1)
4279 Info("SendResults",
4280 "message has %d bytes: limit of %lld bytes reached - sending ...",
4281 mbuf.Length(), fMsgSizeHWM);
4282 // Compress the message, if required; for these messages we do it already
4283 // here so we get the size; TXSocket does not do it twice.
4284 if (GetCompressionLevel() > 0) {
4286 mbuf.Compress();
4287 objsz = mbuf.CompLength();
4288 } else {
4289 objsz = mbuf.Length();
4290 }
4291 totsz += objsz;
4292 if (IsTopMaster()) {
4293 msg.Form("%s: objects merged; sending obj %d/%d (%d bytes) ",
4294 fPrefix.Data(), ns, olsz, objsz);
4295 SendAsynMessage(msg.Data(), kFALSE);
4296 }
4297 if (sock->Send(mbuf) < 0) return -1;
4298 // Reset the message
4299 mbuf.Reset();
4300 np = 0;
4301 }
4302 ns++;
4303 np++;
4304 mbuf << (Int_t) ((ns >= olsz) ? 2 : 1);
4305 mbuf << o;
4306 }
4307 if (np > 0) {
4308 // Compress the message, if required; for these messages we do it already
4309 // here so we get the size; TXSocket does not do it twice.
4310 if (GetCompressionLevel() > 0) {
4312 mbuf.Compress();
4313 objsz = mbuf.CompLength();
4314 } else {
4315 objsz = mbuf.Length();
4316 }
4317 totsz += objsz;
4318 if (IsTopMaster()) {
4319 msg.Form("%s: objects merged; sending obj %d/%d (%d bytes) ",
4320 fPrefix.Data(), ns, olsz, objsz);
4321 SendAsynMessage(msg.Data(), kFALSE);
4322 }
4323 if (sock->Send(mbuf) < 0) return -1;
4324 }
4325 if (IsTopMaster()) {
4326 // Send total size
4327 msg.Form("%s: grand total: sent %d objects, size: %d bytes ",
4328 fPrefix.Data(), olsz, totsz);
4329 SendAsynMessage(msg.Data());
4330 }
4331 } else if (fProtocol > 10 && outlist) {
4332
4333 // Send objects one-by-one to optimize transfer and merging
4334 // Messages for objects
4336 // Objects in the output list
4337 Int_t olsz = outlist->GetSize();
4338 if (IsTopMaster() && pq) {
4339 msg.Form("%s: merging output objects ... done ",
4340 fPrefix.Data());
4341 SendAsynMessage(msg.Data());
4342 // Message for the client
4343 msg.Form("%s: objects merged; sending output: %d objs", fPrefix.Data(), olsz);
4344 SendAsynMessage(msg.Data(), kFALSE);
4345 // Send light query info
4346 mbuf << (Int_t) 0;
4347 mbuf.WriteObject(pq);
4348 if (sock->Send(mbuf) < 0) return -1;
4349 }
4350
4351 Int_t ns = 0;
4352 Int_t totsz = 0, objsz = 0;
4353 TIter nxo(fPlayer->GetOutputList());
4354 TObject *o = 0;
4355 while ((o = nxo())) {
4356 ns++;
4357 mbuf.Reset();
4358 Int_t type = (Int_t) ((ns >= olsz) ? 2 : 1);
4359 mbuf << type;
4360 mbuf.WriteObject(o);
4361 // Compress the message, if required; for these messages we do it already
4362 // here so we get the size; TXSocket does not do it twice.
4363 if (GetCompressionLevel() > 0) {
4365 mbuf.Compress();
4366 objsz = mbuf.CompLength();
4367 } else {
4368 objsz = mbuf.Length();
4369 }
4370 totsz += objsz;
4371 if (IsTopMaster()) {
4372 msg.Form("%s: objects merged; sending obj %d/%d (%d bytes) ",
4373 fPrefix.Data(), ns, olsz, objsz);
4374 SendAsynMessage(msg.Data(), kFALSE);
4375 }
4376 if (sock->Send(mbuf) < 0) return -1;
4377 }
4378 // Total size
4379 if (IsTopMaster()) {
4380 // Send total size
4381 msg.Form("%s: grand total: sent %d objects, size: %d bytes ",
4382 fPrefix.Data(), olsz, totsz);
4383 SendAsynMessage(msg.Data());
4384 }
4385
4386 } else if (IsTopMaster() && fProtocol > 6 && outlist) {
4387
4388 // Buffer to be sent
4390 mbuf.WriteObject(pq);
4391 // Sizes
4392 Int_t blen = mbuf.CompLength();
4393 Int_t olsz = outlist->GetSize();
4394 // Message for the client
4395 msg.Form("%s: sending output: %d objs, %d bytes", fPrefix.Data(), olsz, blen);
4396 SendAsynMessage(msg.Data(), kFALSE);
4397 if (sock->Send(mbuf) < 0) return -1;
4398
4399 } else {
4400 if (outlist) {
4401 PDB(kGlobal, 2) Info("SendResults", "sending output list");
4402 } else {
4403 PDB(kGlobal, 2) Info("SendResults", "notifying failure or abort");
4404 }
4405 if (sock->SendObject(outlist, kPROOF_OUTPUTLIST) < 0) return -1;
4406 }
4407
4408 PDB(kOutput,2) Info("SendResults", "done");
4409
4410 // Done
4411 return 0;
4412}
4413
4414////////////////////////////////////////////////////////////////////////////////
4415/// process the next query from the queue of submitted jobs.
4416/// to be called on the top master only.
4417
4419{
4420 TDSet *dset = 0;
4421 TString filename, opt;
4422 TList *input = 0;
4423 Long64_t nentries = -1, first = 0;
4424
4425 // TObject *elist = 0;
4426 TProofQueryResult *pq = 0;
4427
4428 TObject* obj = 0;
4429 TSelector* selector_obj = 0;
4430
4431 // Process
4432
4433 // Reset compute stopwatch: we include all what done from now on
4434 fCompute.Reset();
4435 fCompute.Start();
4436
4437 // Get next query info (also removes query from the list)
4438 pq = NextQuery();
4439 if (pq) {
4440
4441 // Set not idle
4442 SetIdle(kFALSE);
4443 opt = pq->GetOptions();
4444 input = pq->GetInputList();
4445 nentries = pq->GetEntries();
4446 first = pq->GetFirst();
4447 filename = pq->GetSelecImp()->GetName();
4448 Ssiz_t id = opt.Last('#');
4449 if (id != kNPOS && id < opt.Length() - 1) {
4450 filename += opt(id + 1, opt.Length());
4451 // Remove it from 'opt' so user found on the workers what they specified
4452 opt.Remove(id);
4453 }
4454 // Attach to data set and entry- (or event-) list (if any)
4455 TObject *o = 0;
4456 if ((o = pq->GetInputObject("TDSet"))) {
4457 dset = (TDSet *) o;
4458 } else {
4459 // Should never get here
4460 Error("ProcessNext", "no TDset object: cannot continue");
4461 return;
4462 }
4463 // elist = 0;
4464 // if ((o = pq->GetInputObject("TEntryList")))
4465 // elist = o;
4466 // else if ((o = pq->GetInputObject("TEventList")))
4467 // elist = o;
4468
4469 // Expand selector files
4470 if (pq->GetSelecImp()) {
4471 gSystem->Exec(TString::Format("%s %s", kRM, pq->GetSelecImp()->GetName()));
4472 pq->GetSelecImp()->SaveSource(pq->GetSelecImp()->GetName());
4473 }
4474 if (pq->GetSelecHdr() &&
4475 !strstr(pq->GetSelecHdr()->GetName(), "TProofDrawHist")) {
4476 gSystem->Exec(TString::Format("%s %s", kRM, pq->GetSelecHdr()->GetName()));
4477 pq->GetSelecHdr()->SaveSource(pq->GetSelecHdr()->GetName());
4478 }
4479
4480 // Taking out a TSelector object from input list
4481 TIter nxt(input);
4482 while ((obj = nxt())){
4483 if (obj->InheritsFrom("TSelector") &&
4484 !strcmp(pq->GetSelecImp()->GetName(), obj->ClassName())) {
4485 selector_obj = (TSelector *) obj;
4486 Info("ProcessNext", "found object for selector '%s'", obj->ClassName());
4487 break;
4488 }
4489 }
4490
4491 } else {
4492 // Should never get here
4493 Error("ProcessNext", "empty waiting queries list!");
4494 return;
4495 }
4496
4497 // Set in running state
4498 SetQueryRunning(pq);
4499
4500 // Save to queries dir, if not standard draw
4501 if (fQMgr) {
4502 if (!(pq->IsDraw()))
4503 fQMgr->SaveQuery(pq);
4504 else
4506 fQMgr->ResetTime();
4507 }
4508
4509 // Signal the client that we are starting a new query
4511 m << TString(pq->GetSelecImp()->GetName())
4512 << dset->GetNumOfFiles()
4513 << pq->GetFirst() << pq->GetEntries();
4514 fSocket->Send(m);
4515
4516 // Create player
4517 MakePlayer();
4518
4519 // Add query results to the player lists
4521
4522 // Set query currently processed
4524
4525 // Setup data set
4526 if (dset->IsA() == TDSetProxy::Class())
4527 ((TDSetProxy*)dset)->SetProofServ(this);
4528
4529 // Add the unique query tag as TNamed object to the input list
4530 // so that it is available in TSelectors for monitoring
4531 TString qid = TString::Format("%s:%s",pq->GetTitle(),pq->GetName());
4532 input->Add(new TNamed("PROOF_QueryTag", qid.Data()));
4533 // ... and the sequential number
4534 fQuerySeqNum = pq->GetSeqNum();
4535 input->Add(new TParameter<Int_t>("PROOF_QuerySeqNum", fQuerySeqNum));
4536
4537 // Check whether we have to enforce the use of submergers, but only if the user did
4538 // not express itself on the subject
4539 if (gEnv->Lookup("Proof.UseMergers") && !input->FindObject("PROOF_UseMergers")) {
4540 Int_t smg = gEnv->GetValue("Proof.UseMergers",-1);
4541 if (smg >= 0) {
4542 input->Add(new TParameter<Int_t>("PROOF_UseMergers", smg));
4543 PDB(kSubmerger, 2) Info("ProcessNext", "PROOF_UseMergers set to %d", smg);
4544 if (gEnv->Lookup("Proof.MergersByHost")) {
4545 Int_t mbh = gEnv->GetValue("Proof.MergersByHost", 0);
4546 if (mbh != 0) {
4547 // Administrator settings have the priority
4548 TObject *o = 0;
4549 if ((o = input->FindObject("PROOF_MergersByHost"))) { input->Remove(o); delete o; }
4550 input->Add(new TParameter<Int_t>("PROOF_MergersByHost", mbh));
4551 PDB(kSubmerger, 2) Info("ProcessNext", "submergers setup by host/node");
4552 }
4553 }
4554 }
4555 }
4556
4557 // Set input
4558 TIter next(input);
4559 TObject *o = 0;
4560 while ((o = next())) {
4561 PDB(kGlobal, 2) Info("ProcessNext", "adding: %s", o->GetName());
4562 fPlayer->AddInput(o);
4563 }
4564
4565 // Remove the list of the missing files from the original list, if any
4566 if ((o = input->FindObject("MissingFiles"))) input->Remove(o);
4567
4568 // Process
4569 PDB(kGlobal, 1) Info("ProcessNext", "calling %s::Process()", fPlayer->IsA()->GetName());
4570 if (selector_obj){
4571 Info("ProcessNext", "calling fPlayer->Process() with selector object: %s", selector_obj->ClassName());
4572 fPlayer->Process(dset, selector_obj, opt, nentries, first);
4573 }
4574 else {
4575 Info("ProcessNext", "calling fPlayer->Process() with selector name: %s", filename.Data());
4576 fPlayer->Process(dset, filename, opt, nentries, first);
4577 }
4578
4579 // This is the end of merging
4581
4582 // Return number of events processed
4583 Bool_t abort =
4586 m.Reset(kPROOF_STOPPROCESS);
4587 // message sent from worker to the master
4588 if (fProtocol > 18) {
4590 m << status << abort;
4591 status = 0; // the status belongs to the player.
4592 } else if (fProtocol > 8) {
4593 m << fPlayer->GetEventsProcessed() << abort;
4594 } else {
4596 }
4597 fSocket->Send(m);
4598 }
4599
4600 // Register any dataset produced during this processing, if required
4602 TNamed *psr = (TNamed *) fPlayer->GetOutputList()->FindObject("PROOFSERV_RegisterDataSet");
4603 if (psr) {
4604 TString emsg;
4606 Warning("ProcessNext", "problems registering produced datasets: %s", emsg.Data());
4607 do {
4608 fPlayer->GetOutputList()->Remove(psr);
4609 delete psr;
4610 } while ((psr = (TNamed *) fPlayer->GetOutputList()->FindObject("PROOFSERV_RegisterDataSet")));
4611 }
4612 }
4613
4614 // Complete filling of the TQueryResult instance
4615 if (fQMgr && !pq->IsDraw()) {
4616 if (!abort) fProof->AskStatistics();
4617 if (fQMgr->FinalizeQuery(pq, fProof, fPlayer))
4619 }
4620
4621 // If we were requested to save results on the master and we are not in save-to-file mode
4622 // then we save the results
4623 if (IsTopMaster() && fPlayer->GetOutputList()) {
4624 Bool_t save = kTRUE;
4625 TIter nxo(fPlayer->GetOutputList());
4626 TObject *xo = 0;
4627 while ((xo = nxo())) {
4628 if (xo->InheritsFrom("TProofOutputFile") && xo->TestBit(TProofOutputFile::kSwapFile)) {
4629 save = kFALSE;
4630 break;
4631 }
4632 }
4633 if (save) {
4634 TNamed *nof = (TNamed *)