Logo ROOT  
Reference Guide
TProofServ.cxx
Go to the documentation of this file.
1 // @(#)root/proof:$Id$
2 // Author: Fons Rademakers 16/02/97
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 /** \class TProofServ
13 \ingroup proofkernel
14 
15 Class providing the PROOF server. It can act either as the master
16 server or as a slave server, depending on its startup arguments. It
17 receives and handles message coming from the client or from the
18 master server.
19 
20 */
21 
22 #include "RConfigure.h"
23 #include <ROOT/RConfig.hxx>
24 #include "Riostream.h"
25 
26 #ifdef WIN32
27  #include <process.h>
28  #include <io.h>
29  #include "snprintf.h"
30  typedef long off_t;
31 #endif
32 #include <errno.h>
33 #include <time.h>
34 #include <fcntl.h>
35 #include <sys/types.h>
36 #include <sys/stat.h>
37 #ifndef WIN32
38 #include <sys/wait.h>
39 #endif
40 #include <cstdlib>
41 
42 // To handle exceptions
43 #include <exception>
44 #include <new>
45 
46 using namespace std;
47 
48 #if (defined(__FreeBSD__) && (__FreeBSD__ < 4)) || \
49  (defined(__APPLE__) && (!defined(MAC_OS_X_VERSION_10_3) || \
50  (MAC_OS_X_VERSION_MAX_ALLOWED < MAC_OS_X_VERSION_10_3)))
51 #include <sys/file.h>
52 #define lockf(fd, op, sz) flock((fd), (op))
53 #ifndef F_LOCK
54 #define F_LOCK (LOCK_EX | LOCK_NB)
55 #endif
56 #ifndef F_ULOCK
57 #define F_ULOCK LOCK_UN
58 #endif
59 #endif
60 
61 #include "TProofServ.h"
62 #include "TDSetProxy.h"
63 #include "TEnv.h"
64 #include "TError.h"
65 #include "TEventList.h"
66 #include "TEntryList.h"
67 #include "TException.h"
68 #include "TFile.h"
69 #include "THashList.h"
70 #include "TInterpreter.h"
71 #include "TKey.h"
72 #include "TMessage.h"
73 #include "TVirtualPerfStats.h"
74 #include "TProofDebug.h"
75 #include "TProof.h"
76 #include "TVirtualProofPlayer.h"
77 #include "TProofQueryResult.h"
78 #include "TQueryResultManager.h"
79 #include "TRegexp.h"
80 #include "TROOT.h"
81 #include "TObjArray.h"
82 #include "TSocket.h"
83 #include "TStopwatch.h"
84 #include "TSystem.h"
85 #include "TTimeStamp.h"
86 #include "TUrl.h"
87 #include "TPluginManager.h"
88 #include "TObjString.h"
89 #include "compiledata.h"
90 #include "TProofResourcesStatic.h"
91 #include "TProofNodeInfo.h"
92 #include "TFileInfo.h"
93 #include "TClass.h"
94 #include "TSQLServer.h"
95 #include "TSQLResult.h"
96 #include "TSQLRow.h"
97 #include "TPRegexp.h"
98 #include "TParameter.h"
99 #include "TMap.h"
100 #include "TSortedList.h"
101 #include "TFileCollection.h"
102 #include "TLockFile.h"
103 #include "TDataSetManagerFile.h"
104 #include "TProofProgressStatus.h"
105 #include "TServerSocket.h"
106 #include "TMonitor.h"
107 #include "TProofOutputFile.h"
108 #include "TSelector.h"
109 #include "TPackMgr.h"
110 
111 // global proofserv handle
113 
114 // debug hook
115 static volatile Int_t gProofServDebug = 1;
116 
117 // Syslog control
120 TString TProofServ::fgSysLogEntity("undef:default");
121 
122 // File where to log: default stderr
124 
125 // Integrate with crash reporter.
126 #ifdef __APPLE__
127 extern "C" {
128 static const char *__crashreporter_info__ = 0;
129 asm(".desc ___crashreporter_info__, 0x10");
130 }
131 #endif
132 
133 // To control allowed actions while processing
135 
136 // Last message and entry before exceptions
137 TString TProofServ::fgLastMsg("<undef>");
139 
140 // Memory controllers
145 
146 // Async Logger
147 static void SendAsynMsg(const char *msg) {
149 }
150 
151 //----- Termination signal handler ---------------------------------------------
152 ////////////////////////////////////////////////////////////////////////////////
153 
154 class TProofServTerminationHandler : public TSignalHandler {
155  TProofServ *fServ;
156 public:
157  TProofServTerminationHandler(TProofServ *s)
158  : TSignalHandler(kSigTermination, kFALSE) { fServ = s; }
159  Bool_t Notify();
160 };
161 
162 ////////////////////////////////////////////////////////////////////////////////
163 /// Handle this interrupt
164 
165 Bool_t TProofServTerminationHandler::Notify()
166 {
167  Printf("Received SIGTERM: terminating");
168  fServ->HandleTermination();
169  return kTRUE;
170 }
171 
172 //----- Interrupt signal handler -----------------------------------------------
173 ////////////////////////////////////////////////////////////////////////////////
174 
175 class TProofServInterruptHandler : public TSignalHandler {
176  TProofServ *fServ;
177 public:
178  TProofServInterruptHandler(TProofServ *s)
179  : TSignalHandler(kSigUrgent, kFALSE) { fServ = s; }
180  Bool_t Notify();
181 };
182 
183 ////////////////////////////////////////////////////////////////////////////////
184 /// Handle this interrupt
185 
186 Bool_t TProofServInterruptHandler::Notify()
187 {
188  fServ->HandleUrgentData();
189  if (TROOT::Initialized()) {
190  Throw(GetSignal());
191  }
192  return kTRUE;
193 }
194 
195 //----- SigPipe signal handler -------------------------------------------------
196 ////////////////////////////////////////////////////////////////////////////////
197 
198 class TProofServSigPipeHandler : public TSignalHandler {
199  TProofServ *fServ;
200 public:
201  TProofServSigPipeHandler(TProofServ *s) : TSignalHandler(kSigPipe, kFALSE)
202  { fServ = s; }
203  Bool_t Notify();
204 };
205 
206 ////////////////////////////////////////////////////////////////////////////////
207 /// Handle this signal
208 
209 Bool_t TProofServSigPipeHandler::Notify()
210 {
211  fServ->HandleSigPipe();
212  return kTRUE;
213 }
214 
215 //----- Input handler for messages from parent or master -----------------------
216 ////////////////////////////////////////////////////////////////////////////////
217 
218 class TProofServInputHandler : public TFileHandler {
219  TProofServ *fServ;
220 public:
221  TProofServInputHandler(TProofServ *s, Int_t fd) : TFileHandler(fd, 1)
222  { fServ = s; }
223  Bool_t Notify();
224  Bool_t ReadNotify() { return Notify(); }
225 };
226 
227 ////////////////////////////////////////////////////////////////////////////////
228 /// Handle this input
229 
230 Bool_t TProofServInputHandler::Notify()
231 {
232  fServ->HandleSocketInput();
233  return kTRUE;
234 }
235 
236 TString TProofServLogHandler::fgPfx = ""; // Default prefix to be prepended to messages
237 Int_t TProofServLogHandler::fgCmdRtn = 0; // Return code of the command execution (available only
238  // after closing the pipe)
239 ////////////////////////////////////////////////////////////////////////////////
240 /// Execute 'cmd' in a pipe and handle output messages from the related file
241 
243  TSocket *s, const char *pfx)
244  : TFileHandler(-1, 1), fSocket(s), fPfx(pfx)
245 {
247  fgCmdRtn = 0;
248  fFile = 0;
249  if (s && cmd) {
250  fFile = gSystem->OpenPipe(cmd, "r");
251  if (fFile) {
252  SetFd(fileno(fFile));
253  // Notify what already in the file
254  Notify();
255  // Used in the destructor
257  } else {
258  fSocket = 0;
259  Error("TProofServLogHandler", "executing command in pipe");
260  fgCmdRtn = -1;
261  }
262  } else {
263  Error("TProofServLogHandler",
264  "undefined command (%p) or socket (%p)", (int *)cmd, s);
265  }
266 }
267 ////////////////////////////////////////////////////////////////////////////////
268 /// Handle available message from the open file 'f'
269 
271  : TFileHandler(-1, 1), fSocket(s), fPfx(pfx)
272 {
274  fgCmdRtn = 0;
275  fFile = 0;
276  if (s && f) {
277  fFile = f;
278  SetFd(fileno(fFile));
279  // Notify what already in the file
280  Notify();
281  } else {
282  Error("TProofServLogHandler", "undefined file (%p) or socket (%p)", f, s);
283  }
284 }
285 ////////////////////////////////////////////////////////////////////////////////
286 /// Handle available message in the open file
287 
289 {
290  if (TestBit(kFileIsPipe) && fFile) {
291  Int_t rc = gSystem->ClosePipe(fFile);
292 #ifdef WIN32
293  fgCmdRtn = rc;
294 #else
295  fgCmdRtn = WIFEXITED(rc) ? WEXITSTATUS(rc) : -1;
296 #endif
297  }
298  fFile = 0;
299  fSocket = 0;
301 }
302 ////////////////////////////////////////////////////////////////////////////////
303 /// Handle available message in the open file
304 
306 {
307  if (IsValid()) {
309  // Read buffer
310  char line[4096];
311  char *plf = 0;
312  while (fgets(line, sizeof(line), fFile)) {
313  if ((plf = strchr(line, '\n')))
314  *plf = 0;
315  // Create log string
316  TString log;
317  if (fPfx.Length() > 0) {
318  // Prepend prefix specific to this instance
319  log.Form("%s: %s", fPfx.Data(), line);
320  } else if (fgPfx.Length() > 0) {
321  // Prepend default prefix
322  log.Form("%s: %s", fgPfx.Data(), line);
323  } else {
324  // Nothing to prepend
325  log = line;
326  }
327  // Send the message one level up
328  m.Reset(kPROOF_MESSAGE);
329  m << log;
330  fSocket->Send(m);
331  }
332  }
333  return kTRUE;
334 }
335 ////////////////////////////////////////////////////////////////////////////////
336 /// Static method to set the default prefix
337 
339 {
340  fgPfx = pfx;
341 }
342 ////////////////////////////////////////////////////////////////////////////////
343 /// Static method to get the return code from the execution of a command via
344 /// the pipe. This is always 0 when the log handler is not used with a pipe
345 
347 {
348  return fgCmdRtn;
349 }
350 
351 ////////////////////////////////////////////////////////////////////////////////
352 /// Init a guard for executing a command in a pipe
353 
355  const char *pfx, Bool_t on)
356 {
357  fExecHandler = 0;
358  if (cmd && on) {
359  fExecHandler = new TProofServLogHandler(cmd, s, pfx);
360  if (fExecHandler->IsValid()) {
362  } else {
363  Error("TProofServLogHandlerGuard","invalid handler");
364  }
365  } else {
366  if (on)
367  Error("TProofServLogHandlerGuard","undefined command");
368  }
369 }
370 
371 ////////////////////////////////////////////////////////////////////////////////
372 /// Init a guard for executing a command in a pipe
373 
375  const char *pfx, Bool_t on)
376 {
377  fExecHandler = 0;
378  if (f && on) {
379  fExecHandler = new TProofServLogHandler(f, s, pfx);
380  if (fExecHandler->IsValid()) {
382  } else {
383  Error("TProofServLogHandlerGuard","invalid handler");
384  }
385  } else {
386  if (on)
387  Error("TProofServLogHandlerGuard","undefined file");
388  }
389 }
390 
391 ////////////////////////////////////////////////////////////////////////////////
392 /// Close a guard for executing a command in a pipe
393 
395 {
396  if (fExecHandler && fExecHandler->IsValid()) {
399  }
400 }
401 
402 //--- Special timer to control delayed shutdowns ----------------------------//
403 ////////////////////////////////////////////////////////////////////////////////
404 /// Construtor
405 
407  : TTimer(delay, kFALSE), fProofServ(p)
408 {
409  fTimeout = gEnv->GetValue("ProofServ.ShutdownTimeout", 20);
410  // Backward compaitibility: until 5.32 the variable was called ProofServ.ShutdonwTimeout
411  fTimeout = gEnv->GetValue("ProofServ.ShutdonwTimeout", fTimeout);
412 }
413 
414 ////////////////////////////////////////////////////////////////////////////////
415 /// Handle expiration of the shutdown timer. In the case of low activity the
416 /// process will be aborted.
417 
419 {
420  if (gDebug > 0)
421  printf("TShutdownTimer::Notify: checking activity on the input socket\n");
422 
423  // Check activity on the socket
424  TSocket *xs = 0;
425  if (fProofServ && (xs = fProofServ->GetSocket())) {
426  TTimeStamp now;
427  TTimeStamp ts = xs->GetLastUsage();
428  Long_t dt = (Long_t)(now.GetSec() - ts.GetSec()) * 1000 +
429  (Long_t)(now.GetNanoSec() - ts.GetNanoSec()) / 1000000 ;
430  if (dt > fTimeout * 60000) {
431  printf("TShutdownTimer::Notify: input socket: %p: did not show any activity"
432  " during the last %d mins: aborting\n", xs, fTimeout);
433  // At this point we lost our controller: we need to abort to avoid
434  // hidden timeouts or loops
435  gSystem->Abort();
436  } else {
437  if (gDebug > 0)
438  printf("TShutdownTimer::Notify: input socket: %p: show activity"
439  " %ld secs ago\n", xs, dt / 60000);
440  }
441  }
442  // Needed for the next shot
443  Reset();
444  return kTRUE;
445 }
446 
447 //--- Synchronous timer used to reap children processes change of state ------//
448 ////////////////////////////////////////////////////////////////////////////////
449 /// Destructor
450 
452 {
453  if (fChildren) {
455  delete fChildren;
456  fChildren = 0;
457  }
458 }
459 
460 ////////////////////////////////////////////////////////////////////////////////
461 /// Add an entry for 'pid' in the internal list
462 
464 {
465  if (pid > 0) {
466  if (!fChildren)
467  fChildren = new TList;
468  TString spid;
469  spid.Form("%d", pid);
470  fChildren->Add(new TParameter<Int_t>(spid.Data(), pid));
471  TurnOn();
472  }
473 }
474 
475 ////////////////////////////////////////////////////////////////////////////////
476 /// Check if any of the registered children has changed its state.
477 /// Unregister those that are gone.
478 
480 {
481  if (fChildren) {
482  TIter nxp(fChildren);
483  TParameter<Int_t> *p = 0;
484  while ((p = (TParameter<Int_t> *)nxp())) {
485  int status;
486 #ifndef WIN32
487  pid_t pid;
488  do {
489  pid = waitpid(p->GetVal(), &status, WNOHANG);
490  } while (pid < 0 && errno == EINTR);
491 #else
492  intptr_t pid;
493  pid = _cwait(&status, (intptr_t)p->GetVal(), 0);
494 #endif
495  if (pid > 0 && pid == p->GetVal()) {
496  // Remove from the list
497  fChildren->Remove(p);
498  delete p;
499  }
500  }
501  }
502 
503  // Stop the timer if no children
504  if (!fChildren || fChildren->GetSize() <= 0) {
505  Stop();
506  } else {
507  // Needed for the next shot
508  Reset();
509  }
510  return kTRUE;
511 }
512 
513 //--- Special timer to terminate idle sessions ----------------------------//
514 ////////////////////////////////////////////////////////////////////////////////
515 /// Handle expiration of the idle timer. The session will just be terminated.
516 
518 {
519  Info ("Notify", "session idle for more then %lld secs: terminating", Long64_t(fTime)/1000);
520 
521  if (fProofServ) {
522  // Set the status to timed-out
523  Int_t uss_rc = -1;
524  if ((uss_rc = fProofServ->UpdateSessionStatus(4)) != 0)
525  Warning("Notify", "problems updating session status (errno: %d)", -uss_rc);
526  // Send a terminate request
527  TString msg;
528  if (fProofServ->GetProtocol() < 29) {
529  msg.Form("\n//\n// PROOF session at %s (%s) terminated because idle for more than %lld secs\n"
530  "// Please IGNORE any error message possibly displayed below\n//",
532  } else {
533  msg.Form("\n//\n// PROOF session at %s (%s) terminated because idle for more than %lld secs\n//",
535  }
537  fProofServ->Terminate(0);
538  Reset();
539  Stop();
540  } else {
541  Warning("Notify", "fProofServ undefined!");
542  Start(-1, kTRUE);
543  }
544  return kTRUE;
545 }
546 
548 
549 // Hook to the constructor. This is needed to avoid using the plugin manager
550 // which may create problems in multi-threaded environments.
551 extern "C" {
552  TApplication *GetTProofServ(Int_t *argc, char **argv, FILE *flog)
553  { return new TProofServ(argc, argv, flog); }
554 }
555 
556 ////////////////////////////////////////////////////////////////////////////////
557 /// Main constructor. Create an application environment. The TProofServ
558 /// environment provides an eventloop via inheritance of TApplication.
559 /// Actual server creation work is done in CreateServer() to allow
560 /// overloading.
561 
562 TProofServ::TProofServ(Int_t *argc, char **argv, FILE *flog)
563  : TApplication("proofserv", argc, argv, 0, -1)
564 {
565  // If test and tty, we are done
566  Bool_t xtest = (argc && *argc == 1) ? kTRUE : kFALSE;
567  if (xtest) {
568  Printf("proofserv: command line testing: OK");
569  exit(0);
570  }
571 
572  // Read session specific rootrc file
573  TString rcfile = gSystem->Getenv("ROOTRCFILE") ? gSystem->Getenv("ROOTRCFILE")
574  : "session.rootrc";
575  if (!gSystem->AccessPathName(rcfile, kReadPermission))
576  gEnv->ReadFile(rcfile, kEnvChange);
577 
578  // Upper limit on Virtual Memory (in kB)
579  fgVirtMemMax = gEnv->GetValue("Proof.VirtMemMax",-1);
580  if (fgVirtMemMax < 0 && gSystem->Getenv("PROOF_VIRTMEMMAX")) {
581  Long_t mmx = strtol(gSystem->Getenv("PROOF_VIRTMEMMAX"), 0, 10);
582  if (mmx < kMaxLong && mmx > 0)
583  fgVirtMemMax = mmx * 1024;
584  }
585  // Old variable for backward compatibility
586  if (fgVirtMemMax < 0 && gSystem->Getenv("ROOTPROOFASHARD")) {
587  Long_t mmx = strtol(gSystem->Getenv("ROOTPROOFASHARD"), 0, 10);
588  if (mmx < kMaxLong && mmx > 0)
589  fgVirtMemMax = mmx * 1024;
590  }
591  // Upper limit on Resident Memory (in kB)
592  fgResMemMax = gEnv->GetValue("Proof.ResMemMax",-1);
593  if (fgResMemMax < 0 && gSystem->Getenv("PROOF_RESMEMMAX")) {
594  Long_t mmx = strtol(gSystem->Getenv("PROOF_RESMEMMAX"), 0, 10);
595  if (mmx < kMaxLong && mmx > 0)
596  fgResMemMax = mmx * 1024;
597  }
598  // Thresholds for warnings and stop processing
599  fgMemStop = gEnv->GetValue("Proof.MemStop", 0.95);
600  fgMemHWM = gEnv->GetValue("Proof.MemHWM", 0.80);
601  if (fgVirtMemMax > 0 || fgResMemMax > 0) {
602  if ((fgMemStop < 0.) || (fgMemStop > 1.)) {
603  Warning("TProofServ", "requested memory fraction threshold to stop processing"
604  " (MemStop) out of range [0,1] - ignoring");
605  fgMemStop = 0.95;
606  }
607  if ((fgMemHWM < 0.) || (fgMemHWM > fgMemStop)) {
608  Warning("TProofServ", "requested memory fraction threshold for warning and finer monitoring"
609  " (MemHWM) out of range [0,MemStop] - ignoring");
610  fgMemHWM = 0.80;
611  }
612  }
613 
614  // Wait (loop) to allow debugger to connect
615  Bool_t test = (argc && *argc >= 4 && !strcmp(argv[3], "test")) ? kTRUE : kFALSE;
616  if ((gEnv->GetValue("Proof.GdbHook",0) == 3 && !test) ||
617  (gEnv->GetValue("Proof.GdbHook",0) == 4 && test)) {
618  while (gProofServDebug)
619  ;
620  }
621 
622  // Test instance
623  if (argc && *argc >= 4)
624  if (!strcmp(argv[3], "test"))
625  fService = "prooftest";
626 
627  // crude check on number of arguments
628  if (argc && *argc < 2) {
629  Error("TProofServ", "Must have at least 1 arguments (see proofd).");
630  exit(1);
631  }
632 
633  // Set global to this instance
634  gProofServ = this;
635 
636  // Log control flags
638 
639  // Abort on higher than kSysError's and set error handler
641  SetErrorHandlerFile(stderr);
643 
644  fNcmd = 0;
645  fGroupPriority = 100;
646  fInterrupt = kFALSE;
647  fProtocol = 0;
648  fOrdinal = gEnv->GetValue("ProofServ.Ordinal", "-1");
649  fGroupId = -1;
650  fGroupSize = 0;
651  fRealTime = 0.0;
652  fCpuTime = 0.0;
653  fProof = 0;
654  fPlayer = 0;
655  fSocket = 0;
656 
657  fTotSessions = -1;
658  fActSessions = -1;
659  fEffSessions = -1.;
660  fPackMgr = 0;
661 
662  fLogFile = flog;
663  fLogFileDes = -1;
664 
665  fArchivePath = "";
666  // Init lockers
667  fCacheLock = 0;
668  fQueryLock = 0;
669 
670  fQMgr = 0;
671  fWaitingQueries = new TList;
672  fIdle = kTRUE;
673  fQuerySeqNum = -1;
674 
675  fQueuedMsg = new TList;
676 
678 
679  fShutdownTimer = 0;
680  fReaperTimer = 0;
681  fIdleTOTimer = 0;
682 
683  fDataSetManager = 0; // Initialized in Setup()
684  fDataSetStgRepo = 0; // Initialized in Setup()
685 
686  fInputHandler = 0;
687 
688  // Quotas disabled by default
689  fMaxQueries = -1;
690  fMaxBoxSize = -1;
691  fHWMBoxSize = -1;
692 
693  // Submerger quantities
694  fMergingSocket = 0;
695  fMergingMonitor = 0;
696  fMergedWorkers = 0;
697 
698  // Bit to flg high-memory footprint
700 
701  // Max message size
702  fMsgSizeHWM = gEnv->GetValue("ProofServ.MsgSizeHWM", 1000000);
703 
704  // Message compression
705  fCompressMsg = gEnv->GetValue("ProofServ.CompressMessage", 0);
706 
707  gProofDebugLevel = gEnv->GetValue("Proof.DebugLevel",0);
709 
710  gProofDebugMask = (TProofDebug::EProofDebugMask) gEnv->GetValue("Proof.DebugMask",~0);
711  if (gProofDebugLevel > 0)
712  Info("TProofServ", "DebugLevel %d Mask 0x%x", gProofDebugLevel, gProofDebugMask);
713 
714  // Max log file size
715  fLogFileMaxSize = -1;
716  TString logmx = gEnv->GetValue("ProofServ.LogFileMaxSize", "");
717  if (!logmx.IsNull()) {
718  Long64_t xf = 1;
719  if (!logmx.IsDigit()) {
720  if (logmx.EndsWith("K")) {
721  xf = 1024;
722  logmx.Remove(TString::kTrailing, 'K');
723  } else if (logmx.EndsWith("M")) {
724  xf = 1024*1024;
725  logmx.Remove(TString::kTrailing, 'M');
726  } else if (logmx.EndsWith("G")) {
727  xf = 1024*1024*1024;
728  logmx.Remove(TString::kTrailing, 'G');
729  }
730  }
731  if (logmx.IsDigit()) {
732  fLogFileMaxSize = logmx.Atoi() * xf;
733  if (fLogFileMaxSize > 0)
734  Info("TProofServ", "keeping the log file size within %lld bytes", fLogFileMaxSize);
735  } else {
736  logmx = gEnv->GetValue("ProofServ.LogFileMaxSize", "");
737  Warning("TProofServ", "bad formatted log file size limit ignored: '%s'", logmx.Data());
738  }
739  }
740 
741  // Parse options
742  GetOptions(argc, argv);
743 
744  // Default prefix in the form '<role>-<ordinal>'
745  fPrefix = (IsMaster() ? "Mst-" : "Wrk-");
746  if (test) fPrefix = "Test";
747  if (fOrdinal != "-1")
748  fPrefix += fOrdinal;
750 
751  // Syslog control
752  TString slog = gEnv->GetValue("ProofServ.LogToSysLog", "");
753  if (!(slog.IsNull())) {
754  if (slog.IsDigit()) {
755  fgLogToSysLog = slog.Atoi();
756  } else {
757  char c = (slog[0] == 'M' || slog[0] == 'm') ? 'm' : 'a';
758  c = (slog[0] == 'W' || slog[0] == 'w') ? 'w' : c;
759  Bool_t dosyslog = ((c == 'm' && IsMaster()) ||
760  (c == 'w' && !IsMaster()) || c == 'a') ? kTRUE : kFALSE;
761  if (dosyslog) {
762  slog.Remove(0,1);
763  if (slog.IsDigit()) fgLogToSysLog = slog.Atoi();
764  if (fgLogToSysLog <= 0)
765  Warning("TProofServ", "request for syslog logging ineffective!");
766  }
767  }
768  }
769  // Initialize proper service if required
770  if (fgLogToSysLog > 0) {
771  fgSysLogService = (IsMaster()) ? "proofm" : "proofw";
772  if (fOrdinal != "-1") fgSysLogService += TString::Format("-%s", fOrdinal.Data());
774  }
775 
776  // Enable optimized sending of streamer infos to use embedded backward/forward
777  // compatibility support between different ROOT versions and different versions of
778  // users classes
779  Bool_t enableSchemaEvolution = gEnv->GetValue("Proof.SchemaEvolution",1);
780  if (enableSchemaEvolution) {
782  } else {
783  Info("TProofServ", "automatic schema evolution in TMessage explicitly disabled");
784  }
785 }
786 
787 ////////////////////////////////////////////////////////////////////////////////
788 /// Finalize the server setup. If master, create the TProof instance to talk
789 /// to the worker or submaster nodes.
790 /// Return 0 on success, -1 on error
791 
793 {
794  // Get socket to be used (setup in proofd)
795  TString opensock = gSystem->Getenv("ROOTOPENSOCK");
796  if (opensock.Length() <= 0)
797  opensock = gEnv->GetValue("ProofServ.OpenSock", "-1");
798  Int_t sock = opensock.Atoi();
799  if (sock <= 0) {
800  Fatal("CreateServer", "Invalid socket descriptor number (%d)", sock);
801  return -1;
802  }
803  fSocket = new TSocket(sock);
804 
805  // Set compression level, if any
807 
808  // debug hooks
809  if (IsMaster()) {
810  // wait (loop) in master to allow debugger to connect
811  if (gEnv->GetValue("Proof.GdbHook",0) == 1) {
812  while (gProofServDebug)
813  ;
814  }
815  } else {
816  // wait (loop) in slave to allow debugger to connect
817  if (gEnv->GetValue("Proof.GdbHook",0) == 2) {
818  while (gProofServDebug)
819  ;
820  }
821  }
822 
823  if (gProofDebugLevel > 0)
824  Info("CreateServer", "Service %s ConfDir %s IsMaster %d\n",
826 
827  if (Setup() != 0) {
828  // Setup failure
829  LogToMaster();
830  SendLogFile();
831  Terminate(0);
832  return -1;
833  }
834 
835  // Set the default prefix in the form '<role>-<ordinal>' (it was already done
836  // in the constructor, but for standard PROOF the ordinal number is only set in
837  // Setup(), so we need to do it again here)
838  TString pfx = (IsMaster() ? "Mst-" : "Wrk-");
839  pfx += GetOrdinal();
841 
842  if (!fLogFile) {
843  RedirectOutput();
844  // If for some reason we failed setting a redirection file for the logs
845  // we cannot continue
846  if (!fLogFile || (fLogFileDes = fileno(fLogFile)) < 0) {
847  LogToMaster();
848  SendLogFile(-98);
849  Terminate(0);
850  return -1;
851  }
852  } else {
853  // Use the file already open by pmain
854  if ((fLogFileDes = fileno(fLogFile)) < 0) {
855  LogToMaster();
856  SendLogFile(-98);
857  Terminate(0);
858  return -1;
859  }
860  }
861 
862  // Send message of the day to the client
863  if (IsMaster()) {
864  if (CatMotd() == -1) {
865  LogToMaster();
866  SendLogFile(-99);
867  Terminate(0);
868  return -1;
869  }
870  }
871 
872  // Everybody expects std::iostream to be available, so load it...
873  ProcessLine("#include <iostream>", kTRUE);
874  ProcessLine("#include <string>",kTRUE); // for std::string std::iostream.
875 
876  // The following libs are also useful to have, make sure they are loaded...
877  //gROOT->LoadClass("TMinuit", "Minuit");
878  //gROOT->LoadClass("TPostScript", "Postscript");
879 
880  // Load user functions
881  const char *logon;
882  logon = gEnv->GetValue("Proof.Load", (char *)0);
883  if (logon) {
884  char *mac = gSystem->Which(TROOT::GetMacroPath(), logon, kReadPermission);
885  if (mac)
886  ProcessLine(TString::Format(".L %s", logon), kTRUE);
887  delete [] mac;
888  }
889 
890  // Execute logon macro
891  logon = gEnv->GetValue("Proof.Logon", (char *)0);
892  if (logon && !NoLogOpt()) {
893  char *mac = gSystem->Which(TROOT::GetMacroPath(), logon, kReadPermission);
894  if (mac)
895  ProcessFile(logon);
896  delete [] mac;
897  }
898 
899  // Save current interpreter context
900  gInterpreter->SaveContext();
901  gInterpreter->SaveGlobalsContext();
902 
903  // Install interrupt and message input handlers
904  gSystem->AddSignalHandler(new TProofServTerminationHandler(this));
905  gSystem->AddSignalHandler(new TProofServInterruptHandler(this));
906  fInputHandler = new TProofServInputHandler(this, sock);
908 
909  // if master, start slave servers
910  if (IsMaster()) {
911  TString master = "proof://__master__";
913  if (a.IsValid()) {
914  master += ":";
915  master += a.GetPort();
916  }
917 
918  // Get plugin manager to load appropriate TProof from
919  TPluginManager *pm = gROOT->GetPluginManager();
920  if (!pm) {
921  Error("CreateServer", "no plugin manager found");
922  SendLogFile(-99);
923  Terminate(0);
924  return -1;
925  }
926 
927  // Find the appropriate handler
928  TPluginHandler *h = pm->FindHandler("TProof", fConfFile);
929  if (!h) {
930  Error("CreateServer", "no plugin found for TProof with a"
931  " config file of '%s'", fConfFile.Data());
932  SendLogFile(-99);
933  Terminate(0);
934  return -1;
935  }
936 
937  // load the plugin
938  if (h->LoadPlugin() == -1) {
939  Error("CreateServer", "plugin for TProof could not be loaded");
940  SendLogFile(-99);
941  Terminate(0);
942  return -1;
943  }
944 
945  // make instance of TProof
946  fProof = reinterpret_cast<TProof*>(h->ExecPlugin(5, master.Data(),
947  fConfFile.Data(),
948  GetConfDir(),
949  fLogLevel, 0));
950  if (!fProof || !fProof->IsValid()) {
951  Error("CreateServer", "plugin for TProof could not be executed");
953  SendLogFile(-99);
954  Terminate(0);
955  return -1;
956  }
957  // Find out if we are a master in direct contact only with workers
959 
960  SendLogFile();
961  }
962 
963  // Setup the shutdown timer
964  if (!fShutdownTimer) {
965  // Check activity on socket every 5 mins
966  fShutdownTimer = new TShutdownTimer(this, 300000);
968  }
969 
970  // Check if schema evolution is effective: clients running versions <=17 do not
971  // support that: send a warning message
972  if (fProtocol <= 17) {
973  TString msg;
974  msg.Form("Warning: client version is too old: automatic schema evolution is ineffective.\n"
975  " This may generate compatibility problems between streamed objects.\n"
976  " The advise is to move to ROOT >= 5.21/02 .");
977  SendAsynMessage(msg.Data());
978  }
979 
980  // Setup the idle timer
981  if (IsMaster() && !fIdleTOTimer) {
982  // Check activity on socket every 5 mins
983  Int_t idle_to = gEnv->GetValue("ProofServ.IdleTimeout", -1);
984  if (idle_to > 0) {
985  fIdleTOTimer = new TIdleTOTimer(this, idle_to * 1000);
986  fIdleTOTimer->Start(-1, kTRUE);
987  if (gProofDebugLevel > 0)
988  Info("CreateServer", " idle timer started (%d secs)", idle_to);
989  } else if (gProofDebugLevel > 0) {
990  Info("CreateServer", " idle timer not started (no idle timeout requested)");
991  }
992  }
993 
994  // Done
995  return 0;
996 }
997 
998 ////////////////////////////////////////////////////////////////////////////////
999 /// Cleanup. Not really necessary since after this dtor there is no
1000 /// live anyway.
1001 
1003 {
1011  close(fLogFileDes);
1012 }
1013 
1014 ////////////////////////////////////////////////////////////////////////////////
1015 /// Print message of the day (in the file pointed by the env PROOFMOTD
1016 /// or from fConfDir/etc/proof/motd). The motd is not shown more than
1017 /// once a day. If the file pointed by env PROOFNOPROOF exists (or the
1018 /// file fConfDir/etc/proof/noproof exists), show its contents and close
1019 /// the connection.
1020 
1022 {
1023  TString lastname;
1024  FILE *motd;
1025  Bool_t show = kFALSE;
1026 
1027  // If we are disabled just print the message and close the connection
1028  TString motdname(GetConfDir());
1029  // The env variable PROOFNOPROOF allows to put the file in an alternative
1030  // location not overwritten by a new installation
1031  if (gSystem->Getenv("PROOFNOPROOF")) {
1032  motdname = gSystem->Getenv("PROOFNOPROOF");
1033  } else {
1034  motdname += "/etc/proof/noproof";
1035  }
1036  if ((motd = fopen(motdname, "r"))) {
1037  Int_t c;
1038  printf("\n");
1039  while ((c = getc(motd)) != EOF)
1040  putchar(c);
1041  fclose(motd);
1042  printf("\n");
1043 
1044  return -1;
1045  }
1046 
1047  // get last modification time of the file ~/proof/.prooflast
1048  lastname = TString(GetWorkDir()) + "/.prooflast";
1049  gSystem->ExpandPathName(lastname);
1050  Long64_t size;
1051  Long_t id, flags, modtime, lasttime = 0;
1052  if (gSystem->GetPathInfo(lastname.Data(), &id, &size, &flags, &lasttime) == 1)
1053  lasttime = 0;
1054 
1055  // show motd at least once per day
1056  if (time(0) - lasttime > (time_t)86400)
1057  show = kTRUE;
1058 
1059  // The env variable PROOFMOTD allows to put the file in an alternative
1060  // location not overwritten by a new installation
1061  if (gSystem->Getenv("PROOFMOTD")) {
1062  motdname = gSystem->Getenv("PROOFMOTD");
1063  } else {
1064  motdname = GetConfDir();
1065  motdname += "/etc/proof/motd";
1066  }
1067  if (gSystem->GetPathInfo(motdname, &id, &size, &flags, &modtime) == 0) {
1068  if (modtime > lasttime || show) {
1069  if ((motd = fopen(motdname, "r"))) {
1070  Int_t c;
1071  printf("\n");
1072  while ((c = getc(motd)) != EOF)
1073  putchar(c);
1074  fclose(motd);
1075  printf("\n");
1076  }
1077  }
1078  }
1079 
1080  if (lasttime)
1081  gSystem->Unlink(lastname.Data());
1082  Int_t fd = creat(lastname.Data(), 0600);
1083  if (fd >= 0) close(fd);
1084 
1085  return 0;
1086 }
1087 
1088 ////////////////////////////////////////////////////////////////////////////////
1089 /// Get object with name "name;cycle" (e.g. "aap;2") from master or client.
1090 /// This method is called by TDirectory::Get() in case the object can not
1091 /// be found locally.
1092 
1093 TObject *TProofServ::Get(const char *namecycle)
1094 {
1095  if (fSocket->Send(namecycle, kPROOF_GETOBJECT) < 0) {
1096  Error("Get", "problems sending request");
1097  return (TObject *)0;
1098  }
1099 
1100  TObject *idcur = 0;
1101 
1102  Bool_t notdone = kTRUE;
1103  while (notdone) {
1104  TMessage *mess = 0;
1105  if (fSocket->Recv(mess) < 0)
1106  return 0;
1107  Int_t what = mess->What();
1108  if (what == kMESS_OBJECT) {
1109  idcur = mess->ReadObject(mess->GetClass());
1110  notdone = kFALSE;
1111  } else {
1112  Int_t xrc = HandleSocketInput(mess, kFALSE);
1113  if (xrc == -1) {
1114  Error("Get", "command %d cannot be executed while processing", what);
1115  } else if (xrc == -2) {
1116  Error("Get", "unknown command %d ! Protocol error?", what);
1117  }
1118  }
1119  delete mess;
1120  }
1121 
1122  return idcur;
1123 }
1124 
1125 ////////////////////////////////////////////////////////////////////////////////
1126 /// Reset the compute time
1127 
1129 {
1130  fCompute.Stop();
1131  if (fPlayer) {
1133  if (status) status->SetLearnTime(fCompute.RealTime());
1134  Info("RestartComputeTime", "compute time restarted after %f secs (%d entries)",
1136  }
1138 }
1139 
1140 ////////////////////////////////////////////////////////////////////////////////
1141 /// Get next range of entries to be processed on this server.
1142 
1144 {
1145  Long64_t bytesRead = 0;
1146 
1147  if (gPerfStats) bytesRead = gPerfStats->GetBytesRead();
1148 
1149  if (fCompute.Counter() > 0)
1150  fCompute.Stop();
1151 
1153  Double_t cputime = fCompute.CpuTime();
1154  Double_t realtime = fCompute.RealTime();
1155 
1156  if (fProtocol > 18) {
1157  req << fLatency.RealTime();
1158  TProofProgressStatus *status = 0;
1159  if (fPlayer) {
1161  status = fPlayer->GetProgressStatus();
1162  } else {
1163  Error("GetNextPacket", "no progress status object");
1164  return 0;
1165  }
1166  // the CPU and wallclock proc times are kept in the TProofServ and here
1167  // added to the status object in the fPlayer.
1168  if (status->GetEntries() > 0) {
1169  PDB(kLoop, 2) status->Print(GetOrdinal());
1170  status->IncProcTime(realtime);
1171  status->IncCPUTime(cputime);
1172  }
1173  // Flag cases with problems in opening files
1174  if (totalEntries < 0) status->SetBit(TProofProgressStatus::kFileNotOpen);
1175  // Add to the message
1176  req << status;
1177  // Send tree cache information
1178  Long64_t cacheSize = (fPlayer) ? fPlayer->GetCacheSize() : -1;
1179  Int_t learnent = (fPlayer) ? fPlayer->GetLearnEntries() : -1;
1180  req << cacheSize << learnent;
1181 
1182  // Sent over the number of entries in the file, used by packetizer do not relying
1183  // on initial validation. Also, -1 means that the file could not be open, which is
1184  // used to flag files as missing
1185  req << totalEntries;
1186 
1187  // Send the time spent in saving the partial result to file
1188  if (fProtocol > 34) req << fSaveOutput.RealTime();
1189 
1190  PDB(kLoop, 1) {
1191  PDB(kLoop, 2) status->Print();
1192  Info("GetNextPacket","cacheSize: %lld, learnent: %d", cacheSize, learnent);
1193  }
1194  // Reset the status bits
1195  status->ResetBit(TProofProgressStatus::kFileNotOpen);
1196  status->ResetBit(TProofProgressStatus::kFileCorrupted);
1197  status = 0; // status is owned by the player.
1198  } else {
1199  req << fLatency.RealTime() << realtime << cputime
1200  << bytesRead << totalEntries;
1201  if (fPlayer)
1202  req << fPlayer->GetEventsProcessed();
1203  }
1204 
1205  fLatency.Start();
1206  Int_t rc = fSocket->Send(req);
1207  if (rc <= 0) {
1208  Error("GetNextPacket","Send() failed, returned %d", rc);
1209  return 0;
1210  }
1211 
1212  // Save the current output
1213  if (fPlayer) {
1214  fSaveOutput.Start();
1215  if (fPlayer->SavePartialResults(kFALSE) < 0)
1216  Warning("GetNextPacket", "problems saving partial results");
1217  fSaveOutput.Stop();
1218  }
1219 
1220  TDSetElement *e = 0;
1221  Bool_t notdone = kTRUE;
1222  while (notdone) {
1223 
1224  TMessage *mess;
1225  if ((rc = fSocket->Recv(mess)) <= 0) {
1226  fLatency.Stop();
1227  Error("GetNextPacket","Recv() failed, returned %d", rc);
1228  return 0;
1229  }
1230 
1231  Int_t xrc = 0;
1232  TString file, dir, obj;
1233 
1234  Int_t what = mess->What();
1235 
1236  switch (what) {
1237  case kPROOF_GETPACKET:
1238 
1239  fLatency.Stop();
1240  (*mess) >> e;
1241  if (e != 0) {
1242  fCompute.Start();
1243  PDB(kLoop, 2) Info("GetNextPacket", "'%s' '%s' '%s' %lld %lld",
1244  e->GetFileName(), e->GetDirectory(),
1245  e->GetObjName(), e->GetFirst(),e->GetNum());
1246  } else {
1247  PDB(kLoop, 2) Info("GetNextPacket", "Done");
1248  }
1249  notdone = kFALSE;
1250  break;
1251 
1252  case kPROOF_STOPPROCESS:
1253  // if a kPROOF_STOPPROCESS message is returned to kPROOF_GETPACKET
1254  // GetNextPacket() will return 0 and the TPacketizer and hence
1255  // TEventIter will be stopped
1256  fLatency.Stop();
1257  PDB(kLoop, 2) Info("GetNextPacket:kPROOF_STOPPROCESS","received");
1258  break;
1259 
1260  default:
1261  xrc = HandleSocketInput(mess, kFALSE);
1262  if (xrc == -1) {
1263  Error("GetNextPacket", "command %d cannot be executed while processing", what);
1264  } else if (xrc == -2) {
1265  Error("GetNextPacket", "unknown command %d ! Protocol error?", what);
1266  }
1267  break;
1268  }
1269 
1270  delete mess;
1271 
1272  }
1273 
1274  // Done
1275  return e;
1276 }
1277 
1278 ////////////////////////////////////////////////////////////////////////////////
1279 /// Get and handle command line options. Fixed format:
1280 /// "proofserv"|"proofslave" <confdir>
1281 
1282 void TProofServ::GetOptions(Int_t *argc, char **argv)
1283 {
1284  Bool_t xtest = (argc && *argc > 3 && !strcmp(argv[3], "test")) ? kTRUE : kFALSE;
1285 
1286  // If test and tty
1287  if (xtest && !(isatty(0) == 0 || isatty(1) == 0)) {
1288  Printf("proofserv: command line testing: OK");
1289  exit(0);
1290  }
1291 
1292  if (!argc || (argc && *argc <= 1)) {
1293  Fatal("GetOptions", "Must be started from proofd with arguments");
1294  exit(1);
1295  }
1296 
1297  if (!strcmp(argv[1], "proofserv")) {
1298  fMasterServ = kTRUE;
1299  fEndMaster = kTRUE;
1300  } else if (!strcmp(argv[1], "proofslave")) {
1301  fMasterServ = kFALSE;
1302  fEndMaster = kFALSE;
1303  } else {
1304  Fatal("GetOptions", "Must be started as 'proofserv' or 'proofslave'");
1305  exit(1);
1306  }
1307 
1308  fService = argv[1];
1309 
1310  // Confdir
1311  if (!(gSystem->Getenv("ROOTCONFDIR"))) {
1312  Fatal("GetOptions", "ROOTCONFDIR shell variable not set");
1313  exit(1);
1314  }
1315  fConfDir = gSystem->Getenv("ROOTCONFDIR");
1316 }
1317 
1318 ////////////////////////////////////////////////////////////////////////////////
1319 /// Handle input coming from the client or from the master server.
1320 
1322 {
1323  // The idle timeout guard: stops the timer and restarts when we return from here
1325 
1326  Bool_t all = (fgRecursive > 0) ? kFALSE : kTRUE;
1327  fgRecursive++;
1328 
1329  TMessage *mess;
1330  Int_t rc = 0;
1331  TString exmsg;
1332 
1333  // Check log file length (before the action, so we have the chance to keep the
1334  // latest logs)
1335  TruncateLogFile();
1336 
1337  try {
1338 
1339  // Get message
1340  if (fSocket->Recv(mess) <= 0 || !mess) {
1341  // Pending: do something more intelligent here
1342  // but at least get a message in the log file
1343  Error("HandleSocketInput", "retrieving message from input socket");
1344  Terminate(0);
1345  return;
1346  }
1347  Int_t what = mess->What();
1348  PDB(kCollect, 1)
1349  Info("HandleSocketInput", "got type %d from '%s'", what, fSocket->GetTitle());
1350 
1351  fNcmd++;
1352 
1353  if (fProof) fProof->SetActive();
1354 
1355  Bool_t doit = kTRUE;
1356 
1357  while (doit) {
1358 
1359  // Process the message
1360  rc = HandleSocketInput(mess, all);
1361  if (rc < 0) {
1362  TString emsg;
1363  if (rc == -1) {
1364  emsg.Form("HandleSocketInput: command %d cannot be executed while processing", what);
1365  } else if (rc == -3) {
1366  emsg.Form("HandleSocketInput: message %d undefined! Protocol error?", what);
1367  } else {
1368  emsg.Form("HandleSocketInput: unknown command %d! Protocol error?", what);
1369  }
1370  SendAsynMessage(emsg.Data());
1371  } else if (rc == 2) {
1372  // Add to the queue
1373  fQueuedMsg->Add(mess);
1374  PDB(kGlobal, 1)
1375  Info("HandleSocketInput", "message of type %d enqueued; sz: %d",
1376  what, fQueuedMsg->GetSize());
1377  mess = 0;
1378  }
1379 
1380  // Still something to do?
1381  doit = 0;
1382  if (fgRecursive == 1 && fQueuedMsg->GetSize() > 0) {
1383  // Add to the queue
1384  PDB(kCollect, 1)
1385  Info("HandleSocketInput", "processing enqueued message of type %d; left: %d",
1386  what, fQueuedMsg->GetSize());
1387  all = 1;
1388  SafeDelete(mess);
1389  mess = (TMessage *) fQueuedMsg->First();
1390  if (mess) fQueuedMsg->Remove(mess);
1391  doit = 1;
1392  }
1393  }
1394 
1395  } catch (std::bad_alloc &) {
1396  // Memory allocation problem:
1397  exmsg.Form("caught exception 'bad_alloc' (memory leak?) %s %lld",
1399  } catch (std::exception &exc) {
1400  // Standard exception caught
1401  exmsg.Form("caught standard exception '%s' %s %lld",
1402  exc.what(), fgLastMsg.Data(), fgLastEntry);
1403  } catch (int i) {
1404  // Other exception caught
1405  exmsg.Form("caught exception throwing %d %s %lld",
1406  i, fgLastMsg.Data(), fgLastEntry);
1407  } catch (const char *str) {
1408  // Other exception caught
1409  exmsg.Form("caught exception throwing '%s' %s %lld",
1410  str, fgLastMsg.Data(), fgLastEntry);
1411  } catch (...) {
1412  // Caught other exception
1413  exmsg.Form("caught exception <unknown> %s %lld",
1415  }
1416 
1417  // Terminate on exception
1418  if (!exmsg.IsNull()) {
1419  // Save info in the log file too
1420  Error("HandleSocketInput", "%s", exmsg.Data());
1421  // Try to warn the user
1422  SendAsynMessage(TString::Format("%s: %s", GetOrdinal(), exmsg.Data()));
1423  // Terminate
1424  Terminate(0);
1425  }
1426 
1427  // Terminate also if a high memory footprint was detected before the related
1428  // exception was thrwon
1430  // Save info in the log file too
1431  exmsg.Form("high-memory footprint detected during Process(...) - terminating");
1432  Error("HandleSocketInput", "%s", exmsg.Data());
1433  // Try to warn the user
1434  SendAsynMessage(TString::Format("%s: %s", GetOrdinal(), exmsg.Data()));
1435  // Terminate
1436  Terminate(0);
1437  }
1438 
1439  fgRecursive--;
1440 
1441  if (fProof) {
1442  // If something wrong went on during processing and we do not have
1443  // any worker anymore, we shutdown this session
1444  Bool_t masterOnly = gEnv->GetValue("Proof.MasterOnly", kFALSE);
1445  Bool_t dynamicStartup = gEnv->GetValue("Proof.DynamicStartup", kFALSE);
1447  if (rc == 0 && ngwrks == 0 && !masterOnly && !dynamicStartup) {
1448  SendAsynMessage(" *** No workers left: cannot continue! Terminating ... *** ");
1449  Terminate(0);
1450  }
1452  // Reset PROOF to running state
1454  }
1455 
1456  // Cleanup
1457  SafeDelete(mess);
1458 }
1459 
1460 ////////////////////////////////////////////////////////////////////////////////
1461 /// Process input coming from the client or from the master server.
1462 /// If 'all' is kFALSE, process only those messages that can be handled
1463 /// during query processing.
1464 /// Returns -1 if the message could not be processed, <-1 if something went
1465 /// wrong. Returns 1 if the action may have changed the parallel state.
1466 /// Returns 2 if the message has to be enqueued.
1467 /// Returns 0 otherwise
1468 
1470 {
1471  static TStopwatch timer;
1472  char str[2048];
1473  Bool_t aborted = kFALSE;
1474 
1475  if (!mess) return -3;
1476 
1477  Int_t what = mess->What();
1478  PDB(kCollect, 1)
1479  Info("HandleSocketInput", "processing message type %d from '%s'",
1480  what, fSocket->GetTitle());
1481 
1482  timer.Start();
1483 
1484  Int_t rc = 0, lirc = 0;
1485  TString slb;
1486  TString *pslb = (fgLogToSysLog > 0) ? &slb : (TString *)0;
1487 
1488  switch (what) {
1489 
1490  case kMESS_CINT:
1491  if (all) {
1492  mess->ReadString(str, sizeof(str));
1493  // Make sure that the relevant files are available
1494  TString fn;
1495 
1496  Bool_t hasfn = TProof::GetFileInCmd(str, fn);
1497 
1498  if (IsParallel() && fProof && !fProof->UseDynamicStartup()) {
1499  fProof->SendCommand(str);
1500  } else {
1501  PDB(kGlobal, 1)
1502  Info("HandleSocketInput:kMESS_CINT", "processing: %s...", str);
1503  TString ocwd;
1504  if (hasfn) {
1505  fCacheLock->Lock();
1506  ocwd = gSystem->WorkingDirectory();
1508  }
1509  ProcessLine(str);
1510  if (hasfn) {
1511  gSystem->ChangeDirectory(ocwd);
1512  fCacheLock->Unlock();
1513  }
1514  }
1515 
1516  LogToMaster();
1517  } else {
1518  rc = -1;
1519  }
1520  SendLogFile();
1521  if (pslb) slb = str;
1522  break;
1523 
1524  case kMESS_STRING:
1525  if (all) {
1526  mess->ReadString(str, sizeof(str));
1527  } else {
1528  rc = -1;
1529  }
1530  break;
1531 
1532  case kMESS_OBJECT:
1533  if (all) {
1534  mess->ReadObject(mess->GetClass());
1535  } else {
1536  rc = -1;
1537  }
1538  break;
1539 
1540  case kPROOF_GROUPVIEW:
1541  if (all) {
1542  mess->ReadString(str, sizeof(str));
1543  // coverity[secure_coding]
1544  sscanf(str, "%d %d", &fGroupId, &fGroupSize);
1545  } else {
1546  rc = -1;
1547  }
1548  break;
1549 
1550  case kPROOF_LOGLEVEL:
1551  { UInt_t mask;
1552  mess->ReadString(str, sizeof(str));
1553  sscanf(str, "%d %u", &fLogLevel, &mask);
1554  Bool_t levelchanged = (fLogLevel != gProofDebugLevel) ? kTRUE : kFALSE;
1557  if (levelchanged)
1558  Info("HandleSocketInput:kPROOF_LOGLEVEL", "debug level set to %d (mask: 0x%x)",
1560  if (IsMaster())
1561  fProof->SetLogLevel(fLogLevel, mask);
1562  }
1563  break;
1564 
1565  case kPROOF_PING:
1566  { if (IsMaster())
1567  fProof->Ping();
1568  // do nothing (ping is already acknowledged)
1569  }
1570  break;
1571 
1572  case kPROOF_PRINT:
1573  mess->ReadString(str, sizeof(str));
1574  Print(str);
1575  LogToMaster();
1576  SendLogFile();
1577  break;
1578 
1579  case kPROOF_RESET:
1580  if (all) {
1581  mess->ReadString(str, sizeof(str));
1582  Reset(str);
1583  } else {
1584  rc = -1;
1585  }
1586  break;
1587 
1588  case kPROOF_STATUS:
1589  Warning("HandleSocketInput:kPROOF_STATUS",
1590  "kPROOF_STATUS message is obsolete");
1592  Warning("HandleSocketInput:kPROOF_STATUS", "problem sending of request");
1593  break;
1594 
1595  case kPROOF_GETSTATS:
1596  SendStatistics();
1597  break;
1598 
1599  case kPROOF_GETPARALLEL:
1600  SendParallel();
1601  break;
1602 
1603  case kPROOF_STOP:
1604  if (all) {
1605  if (IsMaster()) {
1606  TString ord;
1607  *mess >> ord;
1608  PDB(kGlobal, 1)
1609  Info("HandleSocketInput:kPROOF_STOP", "request for worker %s", ord.Data());
1610  if (fProof) fProof->TerminateWorker(ord);
1611  } else {
1612  PDB(kGlobal, 1)
1613  Info("HandleSocketInput:kPROOF_STOP", "got request to terminate");
1614  Terminate(0);
1615  }
1616  } else {
1617  rc = -1;
1618  }
1619  break;
1620 
1621  case kPROOF_STOPPROCESS:
1622  if (all) {
1623  // this message makes only sense when the query is being processed,
1624  // however the message can also be received if the user pressed
1625  // ctrl-c, so ignore it!
1626  PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_STOPPROCESS","enter");
1627  } else {
1628  Long_t timeout = -1;
1629  (*mess) >> aborted;
1630  if (fProtocol > 9)
1631  (*mess) >> timeout;
1632  PDB(kGlobal, 1)
1633  Info("HandleSocketInput:kPROOF_STOPPROCESS",
1634  "recursive mode: enter %d, %ld", aborted, timeout);
1635  if (fProof)
1636  // On the master: propagate further
1637  fProof->StopProcess(aborted, timeout);
1638  else
1639  // Worker: actually stop processing
1640  if (fPlayer)
1641  fPlayer->StopProcess(aborted, timeout);
1642  }
1643  break;
1644 
1645  case kPROOF_PROCESS:
1646  {
1648  PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_PROCESS","enter");
1649  HandleProcess(mess, pslb);
1650  // The log file is send either in HandleProcess or HandleSubmergers.
1651  // The reason is that the order of various messages depend on the
1652  // processing mode (sync/async) and/or merging mode
1653  }
1654  break;
1655 
1656  case kPROOF_SENDOUTPUT:
1657  {
1658  PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_SENDOUTPUT",
1659  "worker was asked to send output to master");
1660  Int_t sorc = 0;
1661  if (SendResults(fSocket, fPlayer->GetOutputList()) != 0) {
1662  Error("HandleSocketInput:kPROOF_SENDOUTPUT", "problems sending output list");
1663  sorc = 1;
1664  }
1665  // Signal the master that we are idle
1667  SetIdle(kTRUE);
1668  DeletePlayer();
1669  SendLogFile(sorc);
1670  }
1671  break;
1672 
1673  case kPROOF_QUERYLIST:
1674  {
1675  HandleQueryList(mess);
1676  // Notify
1677  SendLogFile();
1678  }
1679  break;
1680 
1681  case kPROOF_REMOVE:
1682  {
1683  HandleRemove(mess, pslb);
1684  // Notify
1685  SendLogFile();
1686  }
1687  break;
1688 
1689  case kPROOF_RETRIEVE:
1690  {
1691  HandleRetrieve(mess, pslb);
1692  // Notify
1693  SendLogFile();
1694  }
1695  break;
1696 
1697  case kPROOF_ARCHIVE:
1698  {
1699  HandleArchive(mess, pslb);
1700  // Notify
1701  SendLogFile();
1702  }
1703  break;
1704 
1705  case kPROOF_MAXQUERIES:
1706  { PDB(kGlobal, 1)
1707  Info("HandleSocketInput:kPROOF_MAXQUERIES", "Enter");
1709  m << fMaxQueries;
1710  fSocket->Send(m);
1711  // Notify
1712  SendLogFile();
1713  }
1714  break;
1715 
1716  case kPROOF_CLEANUPSESSION:
1717  if (all) {
1718  PDB(kGlobal, 1)
1719  Info("HandleSocketInput:kPROOF_CLEANUPSESSION", "Enter");
1720  TString stag;
1721  (*mess) >> stag;
1722  if (fQMgr && fQMgr->CleanupSession(stag) == 0) {
1723  Printf("Session %s cleaned up", stag.Data());
1724  } else {
1725  Printf("Could not cleanup session %s", stag.Data());
1726  }
1727  } else {
1728  rc = -1;
1729  }
1730  // Notify
1731  SendLogFile();
1732  break;
1733 
1734  case kPROOF_GETENTRIES:
1735  { PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETENTRIES", "Enter");
1736  Bool_t isTree;
1737  TString filename;
1738  TString dir;
1739  TString objname("undef");
1740  Long64_t entries = -1;
1741 
1742  if (all) {
1743  (*mess) >> isTree >> filename >> dir >> objname;
1744  PDB(kGlobal, 2) Info("HandleSocketInput:kPROOF_GETENTRIES",
1745  "Report size of object %s (%s) in dir %s in file %s",
1746  objname.Data(), isTree ? "T" : "O",
1747  dir.Data(), filename.Data());
1748  entries = TDSet::GetEntries(isTree, filename, dir, objname);
1749  PDB(kGlobal, 2) Info("HandleSocketInput:kPROOF_GETENTRIES",
1750  "Found %lld %s", entries, isTree ? "entries" : "objects");
1751  } else {
1752  rc = -1;
1753  }
1755  answ << entries << objname;
1756  SendLogFile(); // in case of error messages
1757  fSocket->Send(answ);
1758  PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETENTRIES", "Done");
1759  }
1760  break;
1761 
1762  case kPROOF_CHECKFILE:
1763  if (!all && fProtocol <= 19) {
1764  // Come back later
1765  rc = 2;
1766  } else {
1767  // Handle file checking request
1768  HandleCheckFile(mess, pslb);
1769  FlushLogFile(); // Avoid sending (error) messages at next action
1770  }
1771  break;
1772 
1773  case kPROOF_SENDFILE:
1774  if (!all && fProtocol <= 19) {
1775  // Come back later
1776  rc = 2;
1777  } else {
1778  mess->ReadString(str, sizeof(str));
1779  Long_t size;
1780  Int_t bin, fw = 1;
1781  char name[1024];
1782  if (fProtocol > 5) {
1783  sscanf(str, "%1023s %d %ld %d", name, &bin, &size, &fw);
1784  } else {
1785  sscanf(str, "%1023s %d %ld", name, &bin, &size);
1786  }
1787  TString fnam(name);
1788  Bool_t copytocache = kTRUE;
1789  if (fnam.BeginsWith("cache:")) {
1790  fnam.ReplaceAll("cache:", TString::Format("%s/", fCacheDir.Data()));
1791  copytocache = kFALSE;
1792  }
1793 
1794  Int_t rfrc = 0;
1795  if (size > 0) {
1796  rfrc = ReceiveFile(fnam, bin ? kTRUE : kFALSE, size);
1797  } else {
1798  // Take it from the cache
1799  if (!fnam.BeginsWith(fCacheDir.Data())) {
1800  fnam.Insert(0, TString::Format("%s/", fCacheDir.Data()));
1801  }
1802  }
1803  if (rfrc == 0) {
1804  // copy file to cache if not a PAR file
1805  if (copytocache && size > 0 && !fPackMgr->IsInDir(name))
1806  gSystem->Exec(TString::Format("%s %s %s", kCP, fnam.Data(), fCacheDir.Data()));
1807  if (IsMaster() && fw == 1) {
1809  if (bin)
1810  opt |= TProof::kBinary;
1811  PDB(kGlobal, 1)
1812  Info("HandleSocketInput","forwarding file: %s", fnam.Data());
1813  if (fProof->SendFile(fnam, opt, (copytocache ? "cache" : "")) < 0) {
1814  Error("HandleSocketInput", "forwarding file: %s", fnam.Data());
1815  }
1816  }
1817  if (fProtocol > 19) fSocket->Send(kPROOF_SENDFILE);
1818  } else {
1819  // There was an error
1820  SendLogFile(1);
1821  }
1822  }
1823  break;
1824 
1825  case kPROOF_LOGFILE:
1826  {
1827  Int_t start, end;
1828  (*mess) >> start >> end;
1829  PDB(kGlobal, 1)
1830  Info("HandleSocketInput:kPROOF_LOGFILE",
1831  "Logfile request - byte range: %d - %d", start, end);
1832 
1833  LogToMaster();
1834  SendLogFile(0, start, end);
1835  }
1836  break;
1837 
1838  case kPROOF_PARALLEL:
1839  if (all) {
1840  if (IsMaster()) {
1841  Int_t nodes;
1842  Bool_t random = kFALSE;
1843  (*mess) >> nodes;
1844  if ((mess->BufferSize() > mess->Length()))
1845  (*mess) >> random;
1846  if (fProof) fProof->SetParallel(nodes, random);
1847  rc = 1;
1848  }
1849  } else {
1850  rc = -1;
1851  }
1852  // Notify
1853  SendLogFile();
1854  break;
1855 
1856  case kPROOF_CACHE:
1857  if (!all && fProtocol <= 19) {
1858  // Come back later
1859  rc = 2;
1860  } else {
1862  PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_CACHE","enter");
1863  Int_t hcrc = HandleCache(mess, pslb);
1864  // Notify
1865  SendLogFile(hcrc);
1866  }
1867  break;
1868 
1869  case kPROOF_WORKERLISTS:
1870  { Int_t wlrc = -1;
1871  if (all) {
1872  if (IsMaster())
1873  wlrc = HandleWorkerLists(mess);
1874  else
1875  Warning("HandleSocketInput:kPROOF_WORKERLISTS",
1876  "Action meaning-less on worker nodes: protocol error?");
1877  } else {
1878  rc = -1;
1879  }
1880  // Notify
1881  SendLogFile(wlrc);
1882  }
1883  break;
1884 
1885  case kPROOF_GETSLAVEINFO:
1886  if (all) {
1887  PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETSLAVEINFO", "Enter");
1888  if (IsMaster()) {
1889 
1890  Bool_t ok = kTRUE;
1891  // if the session does not have workers and is in the dynamic mode
1892  if (fProof->UseDynamicStartup()) {
1893  ok = kFALSE;
1894  // get the a list of workers and start them
1895  Int_t pc = 0;
1896  TList* workerList = new TList();
1897  EQueryAction retVal = GetWorkers(workerList, pc);
1898  if (retVal != TProofServ::kQueryStop && retVal != TProofServ::kQueryEnqueued) {
1899  Int_t ret = fProof->AddWorkers(workerList);
1900  if (ret < 0) {
1901  Error("HandleSocketInput:kPROOF_GETSLAVEINFO",
1902  "adding a list of worker nodes returned: %d", ret);
1903  }
1904  } else {
1905  Error("HandleSocketInput:kPROOF_GETSLAVEINFO",
1906  "getting list of worker nodes returned: %d", retVal);
1907  }
1908  ok = kTRUE;
1909  }
1910  if (ok) {
1911  TList *info = fProof->GetListOfSlaveInfos();
1913  answ << info;
1914  fSocket->Send(answ);
1915  // stop the workers
1917  }
1918  } else {
1920  TList *info = new TList;
1922  SysInfo_t si;
1923  gSystem->GetSysInfo(&si);
1924  wi->SetSysInfo(si);
1925  info->Add(wi);
1926  answ << (TList *)info;
1927  fSocket->Send(answ);
1928  info->SetOwner(kTRUE);
1929  delete info;
1930  }
1931 
1932  PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETSLAVEINFO", "Done");
1933  } else {
1935  answ << (TList *)0;
1936  fSocket->Send(answ);
1937  rc = -1;
1938  }
1939  break;
1940 
1941  case kPROOF_GETTREEHEADER:
1942  if (all) {
1943  PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETTREEHEADER", "Enter");
1944 
1946  if (p) {
1947  p->HandleGetTreeHeader(mess);
1948  delete p;
1949  } else {
1950  Error("HandleSocketInput:kPROOF_GETTREEHEADER", "could not create TProofPlayer instance!");
1951  }
1952 
1953  PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETTREEHEADER", "Done");
1954  } else {
1956  answ << TString("Failed") << (TObject *)0;
1957  fSocket->Send(answ);
1958  rc = -1;
1959  }
1960  break;
1961 
1962  case kPROOF_GETOUTPUTLIST:
1963  { PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETOUTPUTLIST", "Enter");
1964  TList* outputList = 0;
1965  if (IsMaster()) {
1966  outputList = fProof->GetOutputList();
1967  if (!outputList)
1968  outputList = new TList();
1969  } else {
1970  outputList = new TList();
1971  if (fProof->GetPlayer()) {
1972  TList *olist = fProof->GetPlayer()->GetOutputList();
1973  TIter next(olist);
1974  TObject *o;
1975  while ( (o = next()) ) {
1976  outputList->Add(new TNamed(o->GetName(), ""));
1977  }
1978  }
1979  }
1980  outputList->SetOwner();
1982  answ << outputList;
1983  fSocket->Send(answ);
1984  delete outputList;
1985  PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETOUTPUTLIST", "Done");
1986  }
1987  break;
1988 
1989  case kPROOF_VALIDATE_DSET:
1990  if (all) {
1991  PDB(kGlobal, 1)
1992  Info("HandleSocketInput:kPROOF_VALIDATE_DSET", "Enter");
1993 
1994  TDSet* dset = 0;
1995  (*mess) >> dset;
1996 
1997  if (IsMaster()) fProof->ValidateDSet(dset);
1998  else dset->Validate();
1999 
2001  answ << dset;
2002  fSocket->Send(answ);
2003  delete dset;
2004  PDB(kGlobal, 1)
2005  Info("HandleSocketInput:kPROOF_VALIDATE_DSET", "Done");
2006  } else {
2007  rc = -1;
2008  }
2009  // Notify
2010  SendLogFile();
2011  break;
2012 
2013  case kPROOF_DATA_READY:
2014  if (all) {
2015  PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_DATA_READY", "Enter");
2017  if (IsMaster()) {
2018  Long64_t totalbytes = 0, bytesready = 0;
2019  Bool_t dataready = fProof->IsDataReady(totalbytes, bytesready);
2020  answ << dataready << totalbytes << bytesready;
2021  } else {
2022  Error("HandleSocketInput:kPROOF_DATA_READY",
2023  "This message should not be sent to slaves");
2024  answ << kFALSE << Long64_t(0) << Long64_t(0);
2025  }
2026  fSocket->Send(answ);
2027  PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_DATA_READY", "Done");
2028  } else {
2030  answ << kFALSE << Long64_t(0) << Long64_t(0);
2031  fSocket->Send(answ);
2032  rc = -1;
2033  }
2034  // Notify
2035  SendLogFile();
2036  break;
2037 
2038  case kPROOF_DATASETS:
2039  { Int_t dsrc = -1;
2040  if (fProtocol > 16) {
2041  dsrc = HandleDataSets(mess, pslb);
2042  } else {
2043  Error("HandleSocketInput", "old client: no or incompatible dataset support");
2044  }
2045  SendLogFile(dsrc);
2046  }
2047  break;
2048 
2049  case kPROOF_SUBMERGER:
2050  { HandleSubmerger(mess);
2051  }
2052  break;
2053 
2054  case kPROOF_LIB_INC_PATH:
2055  if (all) {
2056  lirc = HandleLibIncPath(mess);
2057  } else {
2058  rc = -1;
2059  }
2060  // Notify the client
2061  if (lirc > 0) SendLogFile();
2062  break;
2063 
2064  case kPROOF_REALTIMELOG:
2065  { Bool_t on;
2066  (*mess) >> on;
2067  PDB(kGlobal, 1)
2068  Info("HandleSocketInput:kPROOF_REALTIMELOG",
2069  "setting real-time logging %s", (on ? "ON" : "OFF"));
2070  fRealTimeLog = on;
2071  // Forward the request to lower levels
2072  if (IsMaster())
2073  fProof->SetRealTimeLog(on);
2074  }
2075  break;
2076 
2077  case kPROOF_FORK:
2078  if (all) {
2079  HandleFork(mess);
2080  LogToMaster();
2081  } else {
2082  rc = -1;
2083  }
2084  SendLogFile();
2085  break;
2086 
2087  case kPROOF_STARTPROCESS:
2088  if (all) {
2089  // This message resumes the session; should not come during processing.
2090 
2091  if (WaitingQueries() == 0) {
2092  Error("HandleSocketInput", "no queries enqueued");
2093  break;
2094  }
2095 
2096  // Similar to handle process
2097  // get the list of workers and start them
2098  TList *workerList = (fProof->UseDynamicStartup()) ? new TList : (TList *)0;
2099  Int_t pc = 0;
2100  EQueryAction retVal = GetWorkers(workerList, pc, kTRUE);
2101 
2102  if (retVal == TProofServ::kQueryOK) {
2103  Int_t ret = 0;
2104  if (workerList && (ret = fProof->AddWorkers(workerList)) < 0) {
2105  Error("HandleSocketInput", "adding a list of worker nodes returned: %d", ret);
2106  } else {
2107  ProcessNext(pslb);
2108  // Set idle
2109  SetIdle(kTRUE);
2110  // Signal the client that we are idle
2112  Bool_t waiting = (WaitingQueries() > 0) ? kTRUE : kFALSE;
2113  m << waiting;
2114  fSocket->Send(m);
2115  }
2116  } else {
2117  if (retVal == TProofServ::kQueryStop) {
2118  Error("HandleSocketInput", "error getting list of worker nodes");
2119  } else if (retVal != TProofServ::kQueryEnqueued) {
2120  Warning("HandleSocketInput", "query was re-queued!");
2121  } else {
2122  Error("HandleSocketInput", "unexpected answer: %d", retVal);
2123  break;
2124  }
2125  }
2126 
2127  }
2128  break;
2129 
2130  case kPROOF_GOASYNC:
2131  { // The client requested to switch to asynchronous mode:
2132  // communicate the sequential number of the running query for later
2133  // identification, if any
2134  if (!IsIdle() && fPlayer) {
2135  // Get query currently being processed
2138  m << pq->GetSeqNum() << kFALSE;
2139  fSocket->Send(m);
2140  } else {
2141  // Idle or undefined: nothing to do; ignore
2142  SendAsynMessage("Processing request to go asynchronous:"
2143  " idle or undefined player - ignoring");
2144  }
2145  }
2146  break;
2147 
2148  case kPROOF_ECHO:
2149  { // Echo request: an object has been sent along. If the object is a
2150  // string, it is simply echoed back to the client from the master
2151  // and each worker. Elsewhere, the output of TObject::Print() is
2152  // sent. Received object is disposed after usage.
2153 
2154  TObject *obj = mess->ReadObject(0x0); // class type ignored
2155 
2156  if (IsMaster()) {
2157  // We are on master
2158  // dbTODO: forward on dynamic startup when wrks are up
2159  if (IsParallel() && fProof && !fProof->UseDynamicStartup()) {
2160  fProof->Echo(obj); // forward to lower layer
2161  }
2162  }
2163 
2164  TMessage rmsg(kPROOF_MESSAGE);
2165  TString smsg;
2166 
2167  if (obj->InheritsFrom(TObjString::Class())) {
2168  // It's a string: echo it
2169  smsg.Form("Echo response from %s:%s: %s",
2170  gSystem->HostName(), GetOrdinal(),
2171  ((TObjString *)obj)->String().Data());
2172  }
2173  else {
2174  // Not a string: collect Print() output and send it
2175 
2176  // Output to tempfile
2177  TString tmpfn = "echo-out-";
2178  FILE *tf = gSystem->TempFileName(tmpfn, fDataDir);
2179  if (!tf || (gSystem->RedirectOutput(tmpfn.Data()) == -1)) {
2180  Error("HandleSocketInput", "Can't redirect output");
2181  if (tf) {
2182  fclose(tf);
2183  gSystem->Unlink(tmpfn);
2184  }
2185  rc = -1;
2186  delete obj;
2187  break;
2188  }
2189  //cout << obj->ClassName() << endl;
2190  obj->Print();
2191  gSystem->RedirectOutput(0x0); // restore
2192  fclose(tf);
2193 
2194  // Read file back and send it via message
2195  smsg.Form("*** Echo response from %s:%s ***\n",
2196  gSystem->HostName(), GetOrdinal());
2197  TMacro *fr = new TMacro();
2198  fr->ReadFile(tmpfn);
2199  TIter nextLine(fr->GetListOfLines());
2200  TObjString *line;
2201  while (( line = (TObjString *)nextLine() )) {
2202  smsg.Append( line->String() );
2203  }
2204 
2205  // Close the reader (TMacro) and remove file
2206  delete fr;
2207  gSystem->Unlink(tmpfn);
2208  }
2209 
2210  // Send message and dispose object
2211  rmsg << smsg;
2212  GetSocket()->Send(rmsg);
2213  delete obj;
2214  }
2215  break;
2216 
2217  default:
2218  Error("HandleSocketInput", "unknown command %d", what);
2219  rc = -2;
2220  break;
2221  }
2222 
2223  fRealTime += (Float_t)timer.RealTime();
2224  fCpuTime += (Float_t)timer.CpuTime();
2225 
2226  if (!(slb.IsNull()) || fgLogToSysLog > 1) {
2227  TString s;
2228  s.Form("%s %d %.3f %.3f %s", fgSysLogEntity.Data(),
2229  what, timer.RealTime(), timer.CpuTime(), slb.Data());
2230  gSystem->Syslog(kLogNotice, s.Data());
2231  }
2232 
2233  // Done
2234  return rc;
2235 }
2236 
2237 ////////////////////////////////////////////////////////////////////////////////
2238 /// Accept and merge results from a set of workers
2239 
2241 {
2242  TMessage *mess = new TMessage();
2243  Int_t mergedWorkers = 0;
2244 
2245  PDB(kSubmerger, 1) Info("AcceptResults", "enter");
2246 
2247  // Overall result of this procedure
2248  Bool_t result = kTRUE;
2249 
2250  fMergingMonitor = new TMonitor();
2252 
2253  Int_t numworkers = 0;
2254  while (fMergingMonitor->GetActive() > 0 && mergedWorkers < connections) {
2255 
2257  if (!s) {
2258  Info("AcceptResults", "interrupt!");
2259  result = kFALSE;
2260  break;
2261  }
2262 
2263  if (s == fMergingSocket) {
2264  // New incoming connection
2265  TSocket *sw = fMergingSocket->Accept();
2266  if (sw && sw != (TSocket *)(-1)) {
2267  fMergingMonitor->Add(sw);
2268 
2269  PDB(kSubmerger, 2)
2270  Info("AcceptResults", "connection from a worker accepted on merger %s ",
2271  fOrdinal.Data());
2272  // All assigned workers are connected
2273  if (++numworkers >= connections)
2275  } else {
2276  PDB(kSubmerger, 1)
2277  Info("AcceptResults", "spurious signal found of merging socket");
2278  }
2279  } else {
2280  if (s->Recv(mess) < 0) {
2281  Error("AcceptResults", "problems receiving message");
2282  continue;
2283  }
2284  PDB(kSubmerger, 2)
2285  Info("AcceptResults", "message received: %d ", (mess ? mess->What() : 0));
2286  if (!mess) {
2287  Error("AcceptResults", "message received: %p ", mess);
2288  continue;
2289  }
2290  Int_t type = 0;
2291 
2292  // Read output objec(s) from the received message
2293  while ((mess->BufferSize() > mess->Length())) {
2294  (*mess) >> type;
2295 
2296  PDB(kSubmerger, 2) Info("AcceptResults", " type %d ", type);
2297  if (type == 2) {
2298  mergedWorkers++;
2299  PDB(kSubmerger, 2)
2300  Info("AcceptResults",
2301  "a new worker has been mergerd. Total merged workers: %d",
2302  mergedWorkers);
2303  }
2304  TObject *o = mess->ReadObject(TObject::Class());
2305  if (mergerPlayer->AddOutputObject(o) == 1) {
2306  // Remove the object if it has been merged
2307  PDB(kSubmerger, 2) Info("AcceptResults", "removing %p (has been merged)", o);
2308  SafeDelete(o);
2309  } else
2310  PDB(kSubmerger, 2) Info("AcceptResults", "%p not merged yet", o);
2311  }
2312  }
2313  }
2315 
2317  Int_t size = sockets->GetSize();
2318  for (Int_t i =0; i< size; ++i){
2319  ((TSocket*)(sockets->At(i)))->Close();
2320  PDB(kSubmerger, 2) Info("AcceptResults", "closing socket");
2321  delete ((TSocket*)(sockets->At(i)));
2322  }
2323  delete sockets;
2324 
2327 
2328  PDB(kSubmerger, 2) Info("AcceptResults", "exit: %d", result);
2329  return result;
2330 }
2331 
2332 ////////////////////////////////////////////////////////////////////////////////
2333 /// Handle Out-Of-Band data sent by the master or client.
2334 
2336 {
2337  char oob_byte;
2338  Int_t n, nch, wasted = 0;
2339 
2340  const Int_t kBufSize = 1024;
2341  char waste[kBufSize];
2342 
2343  // Real-time notification of messages
2345 
2346  PDB(kGlobal, 5)
2347  Info("HandleUrgentData", "handling oob...");
2348 
2349  // Receive the OOB byte
2350  while ((n = fSocket->RecvRaw(&oob_byte, 1, kOob)) < 0) {
2351  if (n == -2) { // EWOULDBLOCK
2352  //
2353  // The OOB data has not yet arrived: flush the input stream
2354  //
2355  // In some systems (Solaris) regular recv() does not return upon
2356  // receipt of the oob byte, which makes the below call to recv()
2357  // block indefinitely if there are no other data in the queue.
2358  // FIONREAD ioctl can be used to check if there are actually any
2359  // data to be flushed. If not, wait for a while for the oob byte
2360  // to arrive and try to read it again.
2361  //
2363  if (nch == 0) {
2364  gSystem->Sleep(1000);
2365  continue;
2366  }
2367 
2368  if (nch > kBufSize) nch = kBufSize;
2369  n = fSocket->RecvRaw(waste, nch);
2370  if (n <= 0) {
2371  Error("HandleUrgentData", "error receiving waste");
2372  break;
2373  }
2374  wasted = 1;
2375  } else {
2376  Error("HandleUrgentData", "error receiving OOB");
2377  return;
2378  }
2379  }
2380 
2381  PDB(kGlobal, 5)
2382  Info("HandleUrgentData", "got OOB byte: %d\n", oob_byte);
2383 
2384  if (fProof) fProof->SetActive();
2385 
2386  switch (oob_byte) {
2387 
2389  Info("HandleUrgentData", "*** Hard Interrupt");
2390 
2391  // If master server, propagate interrupt to slaves
2392  if (IsMaster())
2394 
2395  // Flush input socket
2396  while (1) {
2397  Int_t atmark;
2398 
2399  fSocket->GetOption(kAtMark, atmark);
2400 
2401  if (atmark) {
2402  // Send the OOB byte back so that the client knows where
2403  // to stop flushing its input stream of obsolete messages
2404  n = fSocket->SendRaw(&oob_byte, 1, kOob);
2405  if (n <= 0)
2406  Error("HandleUrgentData", "error sending OOB");
2407  break;
2408  }
2409 
2410  // find out number of bytes to read before atmark
2412  if (nch == 0) {
2413  gSystem->Sleep(1000);
2414  continue;
2415  }
2416 
2417  if (nch > kBufSize) nch = kBufSize;
2418  n = fSocket->RecvRaw(waste, nch);
2419  if (n <= 0) {
2420  Error("HandleUrgentData", "error receiving waste (2)");
2421  break;
2422  }
2423  }
2424 
2425  SendLogFile();
2426 
2427  break;
2428 
2430  Info("HandleUrgentData", "Soft Interrupt");
2431 
2432  // If master server, propagate interrupt to slaves
2433  if (IsMaster())
2435 
2436  if (wasted) {
2437  Error("HandleUrgentData", "soft interrupt flushed stream");
2438  break;
2439  }
2440 
2441  Interrupt();
2442 
2443  SendLogFile();
2444 
2445  break;
2446 
2448  Info("HandleUrgentData", "Shutdown Interrupt");
2449 
2450  // If master server, propagate interrupt to slaves
2451  if (IsMaster())
2453 
2454  Terminate(0);
2455 
2456  break;
2457 
2458  default:
2459  Error("HandleUrgentData", "unexpected OOB byte");
2460  break;
2461  }
2462 
2463  if (fProof) fProof->SetActive(kFALSE);
2464 }
2465 
2466 ////////////////////////////////////////////////////////////////////////////////
2467 /// Called when the client is not alive anymore (i.e. when kKeepAlive
2468 /// has failed).
2469 
2471 {
2472  // Real-time notification of messages
2474 
2475  if (IsMaster()) {
2476  // Check if we are here because client is closed. Try to ping client,
2477  // if that works it we are here because some slave died
2478  if (fSocket->Send(kPROOF_PING | kMESS_ACK) < 0) {
2479  Info("HandleSigPipe", "keepAlive probe failed");
2480  // Tell slaves we are going to close since there is no client anymore
2481 
2482  fProof->SetActive();
2485  Terminate(0);
2486  }
2487  } else {
2488  Info("HandleSigPipe", "keepAlive probe failed");
2489  Terminate(0); // will not return from here....
2490  }
2491 }
2492 
2493 ////////////////////////////////////////////////////////////////////////////////
2494 /// True if in parallel mode.
2495 
2497 {
2498  if (IsMaster() && fProof)
2499  return fProof->IsParallel() || fProof->UseDynamicStartup() ;
2500 
2501  // false in case we are a slave
2502  return kFALSE;
2503 }
2504 
2505 ////////////////////////////////////////////////////////////////////////////////
2506 /// Print status of slave server.
2507 
2508 void TProofServ::Print(Option_t *option) const
2509 {
2510  if (IsMaster() && fProof)
2511  fProof->Print(option);
2512  else
2513  Printf("This is worker %s", gSystem->HostName());
2514 }
2515 
2516 ////////////////////////////////////////////////////////////////////////////////
2517 /// Redirect stdout to a log file. This log file will be flushed to the
2518 /// client or master after each command.
2519 
2520 void TProofServ::RedirectOutput(const char *dir, const char *mode)
2521 {
2522  char logfile[512];
2523 
2524  TString sdir = (dir && strlen(dir) > 0) ? dir : fSessionDir.Data();
2525  if (IsMaster()) {
2526  snprintf(logfile, 512, "%s/master-%s.log", sdir.Data(), fOrdinal.Data());
2527  } else {
2528  snprintf(logfile, 512, "%s/worker-%s.log", sdir.Data(), fOrdinal.Data());
2529  }
2530 
2531  if ((freopen(logfile, mode, stdout)) == 0)
2532  SysError("RedirectOutput", "could not freopen stdout (%s)", logfile);
2533 
2534  if ((dup2(fileno(stdout), fileno(stderr))) < 0)
2535  SysError("RedirectOutput", "could not redirect stderr");
2536 
2537  if ((fLogFile = fopen(logfile, "r")) == 0)
2538  SysError("RedirectOutput", "could not open logfile '%s'", logfile);
2539 
2540  // from this point on stdout and stderr are properly redirected
2541  if (fProtocol < 4 && fWorkDir != TString::Format("~/%s", kPROOF_WorkDir)) {
2542  Warning("RedirectOutput", "no way to tell master (or client) where"
2543  " to upload packages");
2544  }
2545 }
2546 
2547 ////////////////////////////////////////////////////////////////////////////////
2548 /// Reset PROOF environment to be ready for execution of next command.
2549 
2550 void TProofServ::Reset(const char *dir)
2551 {
2552  // First go to new directory. Check first that we got a reasonable path;
2553  // in PROOF-Lite it may not be the case
2554  TString dd(dir);
2555  if (!dd.BeginsWith("proofserv")) {
2556  Int_t ic = dd.Index(":");
2557  if (ic != kNPOS)
2558  dd.Replace(0, ic, "proofserv");
2559  }
2560  gDirectory->cd(dd.Data());
2561 
2562  // Clear interpreter environment.
2563  gROOT->Reset();
2564 
2565  // Make sure current directory is empty (don't delete anything when
2566  // we happen to be in the ROOT memory only directory!?)
2567  if (gDirectory != gROOT) {
2568  gDirectory->Delete();
2569  }
2570 
2571  if (IsMaster()) fProof->SendCurrentState();
2572 }
2573 
2574 ////////////////////////////////////////////////////////////////////////////////
2575 /// Receive a file, either sent by a client or a master server.
2576 /// If bin is true it is a binary file, other wise it is an ASCII
2577 /// file and we need to check for Windows \r tokens. Returns -1 in
2578 /// case of error, 0 otherwise.
2579 
2581 {
2582  if (size <= 0) return 0;
2583 
2584  // open file, overwrite already existing file
2585  Int_t fd = open(file, O_CREAT | O_TRUNC | O_WRONLY, 0600);
2586  if (fd < 0) {
2587  SysError("ReceiveFile", "error opening file %s", file);
2588  return -1;
2589  }
2590 
2591  const Int_t kMAXBUF = 16384; //32768 //16384 //65536;
2592  char buf[kMAXBUF], cpy[kMAXBUF];
2593 
2594  Int_t left, r;
2595  Long64_t filesize = 0;
2596 
2597  while (filesize < size) {
2598  left = Int_t(size - filesize);
2599  if (left > kMAXBUF)
2600  left = kMAXBUF;
2601  r = fSocket->RecvRaw(&buf, left);
2602  if (r > 0) {
2603  char *p = buf;
2604 
2605  filesize += r;
2606  while (r) {
2607  Int_t w;
2608 
2609  if (!bin) {
2610  Int_t k = 0, i = 0, j = 0;
2611  char *q;
2612  while (i < r) {
2613  if (p[i] == '\r') {
2614  i++;
2615  k++;
2616  }
2617  cpy[j++] = buf[i++];
2618  }
2619  q = cpy;
2620  r -= k;
2621  w = write(fd, q, r);
2622  } else {
2623  w = write(fd, p, r);
2624  }
2625 
2626  if (w < 0) {
2627  SysError("ReceiveFile", "error writing to file %s", file);
2628  close(fd);
2629  return -1;
2630  }
2631  r -= w;
2632  p += w;
2633  }
2634  } else if (r < 0) {
2635  Error("ReceiveFile", "error during receiving file %s", file);
2636  close(fd);
2637  return -1;
2638  }
2639  }
2640 
2641  close(fd);
2642 
2643  if (chmod(file, 0644) != 0)
2644  Warning("ReceiveFile", "error setting mode 0644 on file %s", file);
2645 
2646  return 0;
2647 }
2648 
2649 ////////////////////////////////////////////////////////////////////////////////
2650 /// Main server eventloop.
2651 
2653 {
2654  // Setup the server
2655  if (CreateServer() == 0) {
2656 
2657  // Run the main event loop
2658  TApplication::Run(retrn);
2659  }
2660 }
2661 
2662 ////////////////////////////////////////////////////////////////////////////////
2663 /// Send log file to master.
2664 /// If start > -1 send only bytes in the range from start to end,
2665 /// if end <= start send everything from start.
2666 
2667 void TProofServ::SendLogFile(Int_t status, Int_t start, Int_t end)
2668 {
2669  // Determine the number of bytes left to be read from the log file.
2670  fflush(stdout);
2671 
2672  // On workers we do not send the logs to masters (to avoid duplication of
2673  // text) unless asked explicitly, e.g. after an Exec(...) request.
2674  if (!IsMaster()) {
2675  if (!fSendLogToMaster) {
2676  FlushLogFile();
2677  } else {
2678  // Decide case by case
2680  }
2681  }
2682 
2683  off_t ltot=0, lnow=0;
2684  Int_t left = -1;
2685  Bool_t adhoc = kFALSE;
2686 
2687  if (fLogFileDes > -1) {
2688  ltot = lseek(fileno(stdout), (off_t) 0, SEEK_END);
2689  lnow = lseek(fLogFileDes, (off_t) 0, SEEK_CUR);
2690 
2691  if (ltot >= 0 && lnow >= 0) {
2692  if (start > -1) {
2693  lseek(fLogFileDes, (off_t) start, SEEK_SET);
2694  if (end <= start || end > ltot)
2695  end = ltot;
2696  left = (Int_t)(end - start);
2697  if (end < ltot)
2698  left++;
2699  adhoc = kTRUE;
2700  } else {
2701  left = (Int_t)(ltot - lnow);
2702  }
2703  }
2704  }
2705 
2706  if (left > 0) {
2707  if (fSocket->Send(left, kPROOF_LOGFILE) < 0) {
2708  SysError("SendLogFile", "error sending kPROOF_LOGFILE");
2709  return;
2710  }
2711 
2712  const Int_t kMAXBUF = 32768; //16384 //65536;
2713  char buf[kMAXBUF];
2714  Int_t wanted = (left > kMAXBUF) ? kMAXBUF : left;
2715  Int_t len;
2716  do {
2717  while ((len = read(fLogFileDes, buf, wanted)) < 0 &&
2718  TSystem::GetErrno() == EINTR)
2720 
2721  if (len < 0) {
2722  SysError("SendLogFile", "error reading log file");
2723  break;
2724  }
2725 
2726  if (end == ltot && len == wanted)
2727  buf[len-1] = '\n';
2728 
2729  if (fSocket->SendRaw(buf, len) < 0) {
2730  SysError("SendLogFile", "error sending log file");
2731  break;
2732  }
2733 
2734  // Update counters
2735  left -= len;
2736  wanted = (left > kMAXBUF) ? kMAXBUF : left;
2737 
2738  } while (len > 0 && left > 0);
2739  }
2740 
2741  // Restore initial position if partial send
2742  if (adhoc && lnow >=0 )
2743  lseek(fLogFileDes, lnow, SEEK_SET);
2744 
2745  TMessage mess(kPROOF_LOGDONE);
2746  if (IsMaster())
2747  mess << status << (fProof ? fProof->GetParallel() : 0);
2748  else
2749  mess << status << (Int_t) 1;
2750 
2751  if (fSocket->Send(mess) < 0) {
2752  SysError("SendLogFile", "error sending kPROOF_LOGDONE");
2753  return;
2754  }
2755 
2756  PDB(kGlobal, 1) Info("SendLogFile", "kPROOF_LOGDONE sent");
2757 }
2758 
2759 ////////////////////////////////////////////////////////////////////////////////
2760 /// Send statistics of slave server to master or client.
2761 
2763 {
2764  Long64_t bytesread = TFile::GetFileBytesRead();
2765  Float_t cputime = fCpuTime, realtime = fRealTime;
2766  if (IsMaster()) {
2767  bytesread = fProof->GetBytesRead();
2768  cputime = fProof->GetCpuTime();
2769  }
2770 
2771  TMessage mess(kPROOF_GETSTATS);
2772  TString workdir = gSystem->WorkingDirectory(); // expect TString on other side
2773  mess << bytesread << realtime << cputime << workdir;
2774  if (fProtocol >= 4) mess << TString(gProofServ->GetWorkDir());
2775  mess << TString(gProofServ->GetImage());
2776  fSocket->Send(mess);
2777 }
2778 
2779 ////////////////////////////////////////////////////////////////////////////////
2780 /// Send number of parallel nodes to master or client.
2781 
2783 {
2784  Int_t nparallel = 0;
2785  if (IsMaster()) {
2786  PDB(kGlobal, 2)
2787  Info("SendParallel", "Will invoke AskParallel()");
2788  fProof->AskParallel();
2789  PDB(kGlobal, 2)
2790  Info("SendParallel", "Will invoke GetParallel()");
2791  nparallel = fProof->GetParallel();
2792  } else {
2793  nparallel = 1;
2794  }
2795 
2797  mess << nparallel << async;
2798  fSocket->Send(mess);
2799 }
2800 
2801 ////////////////////////////////////////////////////////////////////////////////
2802 /// Print the ProofServ logo on standard output.
2803 /// Return 0 on success, -1 on failure
2804 
2806 {
2807  char str[512];
2808 
2809  if (IsMaster()) {
2810  snprintf(str, 512, "**** Welcome to the PROOF server @ %s ****", gSystem->HostName());
2811  } else {
2812  snprintf(str, 512, "**** PROOF slave server @ %s started ****", gSystem->HostName());
2813  }
2814 
2815  if (fSocket->Send(str) != 1+static_cast<Int_t>(strlen(str))) {
2816  Error("Setup", "failed to send proof server startup message");
2817  return -1;
2818  }
2819 
2820  // exchange protocol level between client and master and between
2821  // master and slave
2822  Int_t what;
2823  if (fSocket->Recv(fProtocol, what) != 2*sizeof(Int_t)) {
2824  Error("Setup", "failed to receive remote proof protocol");
2825  return -1;
2826  }
2827  if (fSocket->Send(kPROOF_Protocol, kROOTD_PROTOCOL) != 2*sizeof(Int_t)) {
2828  Error("Setup", "failed to send local proof protocol");
2829  return -1;
2830  }
2831 
2832  // If old version, setup authentication related stuff
2833  if (fProtocol < 5) {
2834  TString wconf;
2835  if (OldAuthSetup(wconf) != 0) {
2836  Error("Setup", "OldAuthSetup: failed to setup authentication");
2837  return -1;
2838  }
2839  if (IsMaster()) {
2840  fConfFile = wconf;
2841  fWorkDir.Form("~/%s", kPROOF_WorkDir);
2842  } else {
2843  if (fProtocol < 4) {
2844  fWorkDir.Form("~/%s", kPROOF_WorkDir);
2845  } else {
2846  fWorkDir = wconf;
2847  if (fWorkDir.IsNull()) fWorkDir.Form("~/%s", kPROOF_WorkDir);
2848  }
2849  }
2850  } else {
2851 
2852  // Receive some useful information
2853  TMessage *mess;
2854  if ((fSocket->Recv(mess) <= 0) || !mess) {
2855  Error("Setup", "failed to receive ordinal and config info");
2856  return -1;
2857  }
2858  if (IsMaster()) {
2859  (*mess) >> fUser >> fOrdinal >> fConfFile;
2860  fWorkDir = gEnv->GetValue("ProofServ.Sandbox", TString::Format("~/%s", kPROOF_WorkDir));
2861  } else {
2862  (*mess) >> fUser >> fOrdinal >> fWorkDir;
2863  if (fWorkDir.IsNull())
2864  fWorkDir = gEnv->GetValue("ProofServ.Sandbox", TString::Format("~/%s", kPROOF_WorkDir));
2865  }
2866  // Set the correct prefix
2867  if (fOrdinal != "-1")
2868  fPrefix += fOrdinal;
2870  delete mess;
2871  }
2872 
2873  if (IsMaster()) {
2874 
2875  // strip off any prooftype directives
2876  TString conffile = fConfFile;
2877  conffile.Remove(0, 1 + conffile.Index(":"));
2878 
2879  // parse config file to find working directory
2880  TProofResourcesStatic resources(fConfDir, conffile);
2881  if (resources.IsValid()) {
2882  if (resources.GetMaster()) {
2883  TString tmpWorkDir = resources.GetMaster()->GetWorkDir();
2884  if (tmpWorkDir != "")
2885  fWorkDir = tmpWorkDir;
2886  }
2887  } else {
2888  Info("Setup", "invalid config file %s (missing or unreadable",
2889  resources.GetFileName().Data());
2890  }
2891  }
2892 
2893  // Set $HOME and $PATH. The HOME directory was already set to the
2894  // user's home directory by proofd.
2895  gSystem->Setenv("HOME", gSystem->HomeDirectory());
2896 
2897  // Add user name in case of non default workdir
2898  if (fWorkDir.BeginsWith("/") &&
2900  if (!fWorkDir.EndsWith("/"))
2901  fWorkDir += "/";
2903  if (u) {
2904  fWorkDir += u->fUser;
2905  delete u;
2906  }
2907  }
2908 
2909  // Goto to the main PROOF working directory
2911  if (gProofDebugLevel > 0)
2912  Info("Setup", "working directory set to %s", fWorkDir.Data());
2913 
2914  // host first name
2915  TString host = gSystem->HostName();
2916  if (host.Index(".") != kNPOS)
2917  host.Remove(host.Index("."));
2918 
2919  // Session tag
2920  fSessionTag.Form("%s-%s-%ld-%d", fOrdinal.Data(), host.Data(),
2921  (Long_t)TTimeStamp().GetSec(),gSystem->GetPid());
2923 
2924  // create session directory and make it the working directory
2926  if (IsMaster())
2927  fSessionDir += "/master-";
2928  else
2929  fSessionDir += "/slave-";
2931 
2932  // Common setup
2933  if (SetupCommon() != 0) {
2934  Error("Setup", "common setup failed");
2935  return -1;
2936  }
2937 
2938  // Incoming OOB should generate a SIGURG
2940 
2941  // Send packets off immediately to reduce latency
2942  fSocket->SetOption(kNoDelay, 1);
2943 
2944  // Check every two hours if client is still alive
2946 
2947  // Done
2948  return 0;
2949 }
2950 
2951 ////////////////////////////////////////////////////////////////////////////////
2952 /// Common part (between TProofServ and TXProofServ) of the setup phase.
2953 /// Return 0 on success, -1 on error
2954 
2956 {
2957  // deny write access for group and world
2958  gSystem->Umask(022);
2959 
2960 #ifdef R__UNIX
2961  // Add bindir to PATH
2962  TString path(gSystem->Getenv("PATH"));
2963  TString bindir(TROOT::GetBinDir());
2964  // Augment PATH, if required
2965  // ^<compiler>, <compiler>, ^<sysbin>, <sysbin>
2966  TString paths = gEnv->GetValue("ProofServ.BinPaths", "");
2967  if (paths.Length() > 0) {
2968  Int_t icomp = 0;
2969  if (paths.Contains("^<compiler>"))
2970  icomp = 1;
2971  else if (paths.Contains("<compiler>"))
2972  icomp = -1;
2973  if (icomp != 0) {
2974 # ifdef COMPILER
2975  TString compiler = COMPILER;
2976  if (compiler.Index("is ") != kNPOS)
2977  compiler.Remove(0, compiler.Index("is ") + 3);
2978  compiler = gSystem->GetDirName(compiler);
2979  if (icomp == 1) {
2980  if (!bindir.IsNull()) bindir += ":";
2981  bindir += compiler;
2982  } else if (icomp == -1) {
2983  if (!path.IsNull()) path += ":";
2984  path += compiler;
2985  }
2986 #endif
2987  }
2988  Int_t isysb = 0;
2989  if (paths.Contains("^<sysbin>"))
2990  isysb = 1;
2991  else if (paths.Contains("<sysbin>"))
2992  isysb = -1;
2993  if (isysb != 0) {
2994  if (isysb == 1) {
2995  if (!bindir.IsNull()) bindir += ":";
2996  bindir += "/bin:/usr/bin:/usr/local/bin";
2997  } else if (isysb == -1) {
2998  if (!path.IsNull()) path += ":";
2999  path += "/bin:/usr/bin:/usr/local/bin";
3000  }
3001  }
3002  }
3003  // Final insert
3004  if (!bindir.IsNull()) bindir += ":";
3005  path.Insert(0, bindir);
3006  gSystem->Setenv("PATH", path);
3007 #endif
3008 
3011  if (!gSystem->ChangeDirectory(fWorkDir)) {
3012  Error("SetupCommon", "can not change to PROOF directory %s",
3013  fWorkDir.Data());
3014  return -1;
3015  }
3016  } else {
3017  if (!gSystem->ChangeDirectory(fWorkDir)) {
3020  if (!gSystem->ChangeDirectory(fWorkDir)) {
3021  Error("SetupCommon", "can not change to PROOF directory %s",
3022  fWorkDir.Data());
3023  return -1;
3024  }
3025  }
3026  }
3027 
3028  // Set group
3029  fGroup = gEnv->GetValue("ProofServ.ProofGroup", "default");
3030 
3031  // Check and make sure "cache" directory exists
3032  fCacheDir = gEnv->GetValue("ProofServ.CacheDir",
3037  if (gProofDebugLevel > 0)
3038  Info("SetupCommon", "cache directory set to %s", fCacheDir.Data());
3039  fCacheLock =
3040  new TProofLockPath(TString::Format("%s/%s%s",
3042  TString(fCacheDir).ReplaceAll("/","%").Data()));
3043  // Make also sure the cache path is in the macro path
3045 
3046  // Check and make sure "packages" directory exists
3047  TString packdir = gEnv->GetValue("ProofServ.PackageDir",
3049  ResolveKeywords(packdir);
3050  if (gSystem->AccessPathName(packdir))
3051  gSystem->mkdir(packdir, kTRUE);
3052  fPackMgr = new TPackMgr(packdir);
3054  // Notification message
3055  TString noth;
3056  const char *k = (IsMaster()) ? "Mst" : "Wrk";
3057  noth.Form("%s-%s", k, fOrdinal.Data());
3058  fPackMgr->SetPrefix(noth.Data());
3059  if (gProofDebugLevel > 0)
3060  Info("SetupCommon", "package directory set to %s", packdir.Data());
3061 
3062  // Check and make sure "data" directory exists
3063  fDataDir = gEnv->GetValue("ProofServ.DataDir","");
3064  Ssiz_t isep = kNPOS;
3065  if (fDataDir.IsNull()) {
3066  // Use default
3067  fDataDir.Form("%s/%s/<ord>/<stag>", fWorkDir.Data(), kPROOF_DataDir);
3068  } else if ((isep = fDataDir.Last(' ')) != kNPOS) {
3069  fDataDirOpts = fDataDir(isep + 1, fDataDir.Length());
3070  fDataDir.Remove(isep);
3071  }
3074  if (gSystem->mkdir(fDataDir, kTRUE) != 0) {
3075  Warning("SetupCommon", "problems creating path '%s' (errno: %d)",
3077  }
3078  if (gProofDebugLevel > 0)
3079  Info("SetupCommon", "data directory set to %s", fDataDir.Data());
3080 
3081  // Check and apply possible options
3082  // (see http://root.cern.ch/drupal/content/configuration-reference-guide#datadir)
3083  TString dataDirOpts = gEnv->GetValue("ProofServ.DataDirOpts","");
3084  if (!dataDirOpts.IsNull()) {
3085  // Do they apply to this server type
3086  Bool_t doit = kTRUE;
3087  if ((IsMaster() && !dataDirOpts.Contains("M")) ||
3088  (!IsMaster() && !dataDirOpts.Contains("W"))) doit = kFALSE;
3089  if (doit) {
3090  // Get the wanted mode
3091  UInt_t m = 0755;
3092  if (dataDirOpts.Contains("g")) m = 0775;
3093  if (dataDirOpts.Contains("a") || dataDirOpts.Contains("o")) m = 0777;
3094  if (gProofDebugLevel > 0)
3095  Info("SetupCommon", "requested mode for data directories is '%o'", m);
3096  // Loop over paths
3097  FileStat_t st;
3098  TString p, subp;
3099  Int_t from = 0;
3100  if (fDataDir.BeginsWith("/")) p = "/";
3101  while (fDataDir.Tokenize(subp, from, "/")) {
3102  if (subp.IsNull()) continue;
3103  p += subp;
3104  if (gSystem->GetPathInfo(p, st) == 0) {
3105  if (st.fUid == (Int_t) gSystem->GetUid() && st.fGid == (Int_t) gSystem->GetGid()) {
3106  if (gSystem->Chmod(p.Data(), m) != 0) {
3107  Warning("SetupCommon", "problems setting mode '%o' on path '%s' (errno: %d)",
3108  m, p.Data(), TSystem::GetErrno());
3109  break;
3110  }
3111  }
3112  p += "/";
3113  } else {
3114  Warning("SetupCommon", "problems stat-ing path '%s' (errno: %d; datadir: %s)",
3115  p.Data(), TSystem::GetErrno(), fDataDir.Data());
3116  break;
3117  }
3118  }
3119  }
3120  }
3121 
3122  // List of directories where to look for global packages
3123  TString globpack = gEnv->GetValue("Proof.GlobalPackageDirs","");
3124 
3125  ResolveKeywords(globpack);
3126  Int_t nglb = TPackMgr::RegisterGlobalPath(globpack);
3127  Info("SetupCommon", " %d global package directories registered", nglb);
3128  FlushLogFile();
3129 
3130  // Check the session dir
3131  if (fSessionDir != gSystem->WorkingDirectory()) {
3136  Error("SetupCommon", "can not change to working directory '%s'",
3137  fSessionDir.Data());
3138  return -1;
3139  }
3140  }
3141  gSystem->Setenv("PROOF_SANDBOX", fSessionDir);
3142  if (gProofDebugLevel > 0)
3143  Info("SetupCommon", "session dir is '%s'", fSessionDir.Data());
3144 
3145  // On masters, check and make sure that "queries" and "datasets"
3146  // directories exist
3147  if (IsMaster()) {
3148 
3149  // Make sure that the 'queries' dir exist
3150  fQueryDir = fWorkDir;
3151  fQueryDir += TString("/") + kPROOF_QueryDir;
3155  fQueryDir += TString("/session-") + fTopSessionTag;
3158  if (gProofDebugLevel > 0)
3159  Info("SetupCommon", "queries dir is %s", fQueryDir.Data());
3160 
3161  // Create 'queries' locker instance and lock it
3162  fQueryLock = new TProofLockPath(TString::Format("%s/%s%s-%s",
3165  TString(fQueryDir).ReplaceAll("/","%").Data()));
3166  fQueryLock->Lock();
3167  // Create the query manager
3169  fQueryLock, 0);
3170  }
3171 
3172  // Server image
3173  fImage = gEnv->GetValue("ProofServ.Image", "");
3174 
3175  // Get the group priority
3176  if (IsMaster()) {
3177  // Send session tag to client
3179  m << fTopSessionTag << fGroup << fUser;
3180  fSocket->Send(m);
3181  // Group priority
3183  // Dataset manager instance via plug-in
3184  TPluginHandler *h = 0;
3185  TString dsms = gEnv->GetValue("Proof.DataSetManager", "");
3186  if (!dsms.IsNull()) {
3187  TString dsm;
3188  Int_t from = 0;
3189  while (dsms.Tokenize(dsm, from, ",")) {
3191  Warning("SetupCommon", "a valid dataset manager already initialized");
3192  Warning("SetupCommon", "support for multiple managers not yet available");
3193  break;
3194  }
3195  // Get plugin manager to load the appropriate TDataSetManager
3196  if (gROOT->GetPluginManager()) {
3197  // Find the appropriate handler
3198  h = gROOT->GetPluginManager()->FindHandler("TDataSetManager", dsm);
3199  if (h && h->LoadPlugin() != -1) {
3200  // make instance of the dataset manager
3201  fDataSetManager =
3202  reinterpret_cast<TDataSetManager*>(h->ExecPlugin(3, fGroup.Data(),
3203  fUser.Data(), dsm.Data()));
3204  }
3205  }
3206  }
3207  // Check the result of the dataset manager initialization
3209  Warning("SetupCommon", "dataset manager plug-in initialization failed");
3210  SendAsynMessage("TXProofServ::SetupCommon: dataset manager plug-in initialization failed");
3212  }
3213  } else {
3214  // Initialize the default dataset manager
3215  TString opts("Av:");
3216  TString dsetdir = gEnv->GetValue("ProofServ.DataSetDir", "");
3217  if (dsetdir.IsNull()) {
3218  // Use the default in the sandbox
3219  dsetdir.Form("%s/%s", fWorkDir.Data(), kPROOF_DataSetDir);
3222  opts += "Sb:";
3223  }
3224  // Find the appropriate handler
3225  if (!h) {
3226  h = gROOT->GetPluginManager()->FindHandler("TDataSetManager", "file");
3227  if (h && h->LoadPlugin() == -1) h = 0;
3228  }
3229  if (h) {
3230  // make instance of the dataset manager
3231  TString oo = TString::Format("dir:%s opt:%s", dsetdir.Data(), opts.Data());
3232  fDataSetManager = reinterpret_cast<TDataSetManager*>(h->ExecPlugin(3,
3233  fGroup.Data(), fUser.Data(), oo.Data()));
3234  }
3236  Warning("SetupCommon", "default dataset manager plug-in initialization failed");
3238  }
3239  }
3240  // Dataset manager for staging requests
3241  TString dsReqCfg = gEnv->GetValue("Proof.DataSetStagingRequests", "");
3242  if (!dsReqCfg.IsNull()) {
3243  TPMERegexp reReqDir("(^| )(dir:)?([^ ]+)( |$)");
3244 
3245  if (reReqDir.Match(dsReqCfg) == 5) {
3246  TString dsDirFmt;
3247  dsDirFmt.Form("dir:%s perms:open", reReqDir[3].Data());
3248  fDataSetStgRepo = new TDataSetManagerFile("_stage_", "_stage_",
3249  dsDirFmt);
3250  if (fDataSetStgRepo &&
3252  Warning("SetupCommon",
3253  "failed init of dataset staging requests repository");
3255  }
3256  } else {
3257  Warning("SetupCommon",
3258  "specify, with [dir:]<path>, a valid path for staging requests");
3259  }
3260  } else if (gProofDebugLevel > 0) {
3261  Warning("SetupCommon", "no repository for staging requests available");
3262  }
3263  }
3264 
3265  // Quotas
3266  TString quotas = gEnv->GetValue(TString::Format("ProofServ.UserQuotas.%s", fUser.Data()),"");
3267  if (quotas.IsNull())
3268  quotas = gEnv->GetValue(TString::Format("ProofServ.UserQuotasByGroup.%s", fGroup.Data()),"");
3269  if (quotas.IsNull())
3270  quotas = gEnv->GetValue("ProofServ.UserQuotas", "");
3271  if (!quotas.IsNull()) {
3272  // Parse it; format ("maxquerykept=10 hwmsz=800m maxsz=1g")
3273  TString tok;
3274  Ssiz_t from = 0;
3275  while (quotas.Tokenize(tok, from, " ")) {
3276  // Set max number of query results to keep
3277  if (tok.BeginsWith("maxquerykept=")) {
3278  tok.ReplaceAll("maxquerykept=","");
3279  if (tok.IsDigit())
3280  fMaxQueries = tok.Atoi();
3281  else
3282  Info("SetupCommon",
3283  "parsing 'maxquerykept' :ignoring token %s : not a digit", tok.Data());
3284  }
3285  // Set High-Water-Mark or max on the sandbox size
3286  const char *ksz[2] = {"hwmsz=", "maxsz="};
3287  for (Int_t j = 0; j < 2; j++) {
3288  if (tok.BeginsWith(ksz[j])) {
3289  tok.ReplaceAll(ksz[j],"");
3290  Long64_t fact = -1;
3291  if (!tok.IsDigit()) {
3292  // Parse (k, m, g)
3293  tok.ToLower();
3294  const char *s[3] = {"k", "m", "g"};
3295  Int_t i = 0, ki = 1024;
3296  while (fact < 0) {
3297  if (tok.EndsWith(s[i++]))
3298  fact = ki;
3299  else
3300  ki *= 1024;
3301  }
3302  tok.Remove(tok.Length()-1);
3303  }
3304  if (tok.IsDigit()) {
3305  if (j == 0)
3306  fHWMBoxSize = (fact > 0) ? tok.Atoi() * fact : tok.Atoi();
3307  else
3308  fMaxBoxSize = (fact > 0) ? tok.Atoi() * fact : tok.Atoi();
3309  } else {
3310  TString ssz(ksz[j], strlen(ksz[j])-1);
3311  Info("SetupCommon", "parsing '%s' : ignoring token %s", ssz.Data(), tok.Data());
3312  }
3313  }
3314  }
3315  }
3316  }
3317 
3318  // Apply quotas, if any
3319  if (IsMaster() && fQMgr)
3320  if (fQMgr->ApplyMaxQueries(fMaxQueries) != 0)
3321  Warning("SetupCommon", "problems applying fMaxQueries");
3322 
3323  // Send "ROOTversion|ArchCompiler" flag
3324  if (fProtocol > 12) {
3325  TString vac = gROOT->GetVersion();
3326  vac += TString::Format(":%s", gROOT->GetGitCommit());
3327  TString rtag = gEnv->GetValue("ProofServ.RootVersionTag", "");
3328  if (rtag.Length() > 0)
3329  vac += TString::Format(":%s", rtag.Data());
3332  m << vac;
3333  fSocket->Send(m);
3334  }
3335 
3336  // Set user vars in TProof
3337  TString all_vars(gSystem->Getenv("PROOF_ALLVARS"));
3338  TString name;
3339  Int_t from = 0;
3340  while (all_vars.Tokenize(name, from, ",")) {
3341  if (!name.IsNull()) {
3342  TString value = gSystem->Getenv(name);
3343  TProof::AddEnvVar(name, value);
3344  }
3345  }
3346 
3347  if (fgLogToSysLog > 0) {
3348  // Set the syslog entity (all the information is available now)
3349  if (!(fUser.IsNull()) && !(fGroup.IsNull())) {
3350  fgSysLogEntity.Form("%s:%s", fUser.Data(), fGroup.Data());
3351  } else if (!(fUser.IsNull()) && fGroup.IsNull()) {
3352  fgSysLogEntity.Form("%s:default", fUser.Data());
3353  } else if (fUser.IsNull() && !(fGroup.IsNull())) {
3354  fgSysLogEntity.Form("undef:%s", fGroup.Data());
3355  }
3356  // Log the beginning of this session
3357  TString s;
3358  s.Form("%s 0 %.3f %.3f", fgSysLogEntity.Data(), fRealTime, fCpuTime);
3359  gSystem->Syslog(kLogNotice, s.Data());
3360  }
3361 
3362  if (gProofDebugLevel > 0)
3363  Info("SetupCommon", "successfully completed");
3364 
3365  // Done
3366  return 0;
3367 }
3368 
3369 ////////////////////////////////////////////////////////////////////////////////
3370 /// Terminate the proof server.
3371 
3373 {
3374  if (fgLogToSysLog > 0) {
3375  TString s;
3376  s.Form("%s -1 %.3f %.3f %d", fgSysLogEntity.Data(), fRealTime, fCpuTime, status);
3377  gSystem->Syslog(kLogNotice, s.Data());
3378  }
3379 
3380  // Notify the memory footprint
3381  ProcInfo_t pi;
3382  if (!gSystem->GetProcInfo(&pi)){
3383  Info("Terminate", "process memory footprint: %ld/%ld kB virtual, %ld/%ld kB resident ",
3384  pi.fMemVirtual, fgVirtMemMax, pi.fMemResident, fgResMemMax);
3385  }
3386 
3387  // Cleanup session directory
3388  if (status == 0) {
3389  // make sure we remain in a "connected" directory
3390  gSystem->ChangeDirectory("/");
3391  // needed in case fSessionDir is on NFS ?!
3392  gSystem->MakeDirectory(fSessionDir+"/.delete");
3394  }
3395 
3396  // Cleanup queries directory if empty
3397  if (IsMaster()) {
3398  if (!(fQMgr && fQMgr->Queries() && fQMgr->Queries()->GetSize())) {
3399  // make sure we remain in a "connected" directory
3400  gSystem->ChangeDirectory("/");
3401  // needed in case fQueryDir is on NFS ?!
3402  gSystem->MakeDirectory(fQueryDir+"/.delete");
3403  gSystem->Exec(TString::Format("%s %s", kRM, fQueryDir.Data()));
3404  // Remove lock file
3405  if (fQueryLock)
3407  }
3408 
3409  // Unlock the query dir owned by this session
3410  if (fQueryLock)
3411  fQueryLock->Unlock();
3412  }
3413 
3414  // Cleanup data directory if empty
3416  if (UnlinkDataDir(fDataDir))
3417  Info("Terminate", "data directory '%s' has been removed", fDataDir.Data());
3418  }
3419 
3420  // Remove input handler to avoid spurious signals in socket
3421  // selection for closing activities executed upon exit()
3423  TObject *fh = 0;
3424  while ((fh = next())) {
3425  TProofServInputHandler *ih = dynamic_cast<TProofServInputHandler *>(fh);
3426  if (ih)
3428  }
3429 
3430  // Stop processing events
3431  gSystem->ExitLoop();
3432 
3433  // Exit() is called in pmain
3434 }
3435 
3436 ////////////////////////////////////////////////////////////////////////////////
3437 /// Scan recursively the datadir and unlink it if empty
3438 /// Return kTRUE if it can be unlinked, kFALSE otherwise
3439 
3441 {
3442  if (!path || strlen(path) <= 0) return kFALSE;
3443 
3444  Bool_t dorm = kTRUE;
3445  void *dirp = gSystem->OpenDirectory(path);
3446  if (dirp) {
3447  TString fpath;
3448  const char *ent = 0;
3449  while (dorm && (ent = gSystem->GetDirEntry(dirp))) {
3450  if (!strcmp(ent, ".") || !strcmp(ent, "..")) continue;
3451  fpath.Form("%s/%s", path, ent);
3452  FileStat_t st;
3453  if (gSystem->GetPathInfo(fpath, st) == 0 && R_ISDIR(st.fMode)) {
3454  dorm = UnlinkDataDir(fpath);
3455  } else {
3456  dorm = kFALSE;
3457  }
3458  }
3459  // Close the directory
3460  gSystem->FreeDirectory(dirp);
3461  } else {
3462  // Cannot open the directory
3463  dorm = kFALSE;
3464  }
3465 
3466  // Do remove, if required
3467  if (dorm && gSystem->Unlink(path) != 0)
3468  Warning("UnlinkDataDir", "data directory '%s' is empty but could not be removed", path);
3469  // done
3470  return dorm;
3471 }
3472 
3473 ////////////////////////////////////////////////////////////////////////////////
3474 /// Static function that returns kTRUE in case we are a PROOF server.
3475 
3477 {
3478  return gProofServ ? kTRUE : kFALSE;
3479 }
3480 
3481 ////////////////////////////////////////////////////////////////////////////////
3482 /// Static function returning pointer to global object gProofServ.
3483 /// Mainly for use via CINT, where the gProofServ symbol might be
3484 /// deleted from the symbol table.
3485 
3487 {
3488  return gProofServ;
3489 }
3490 
3491 ////////////////////////////////////////////////////////////////////////////////
3492 /// Setup authentication related stuff for old versions.
3493 /// Provided for backward compatibility.
3494 
3496 {
3497  OldProofServAuthSetup_t oldAuthSetupHook = 0;
3498 
3499  if (!oldAuthSetupHook) {
3500  // Load libraries needed for (server) authentication ...
3501  TString authlib = "libRootAuth";
3502  char *p = 0;
3503  // The generic one
3504  if ((p = gSystem->DynamicPathName(authlib, kTRUE))) {
3505  delete[] p;
3506  if (gSystem->Load(authlib) == -1) {
3507  Error("OldAuthSetup", "can't load %s",authlib.Data());
3508  return kFALSE;
3509  }
3510  } else {
3511  Error("OldAuthSetup", "can't locate %s",authlib.Data());
3512  return -1;
3513  }
3514  //
3515  // Locate OldProofServAuthSetup
3516  Func_t f = gSystem->DynFindSymbol(authlib,"OldProofServAuthSetup");
3517  if (f)
3518  oldAuthSetupHook = (OldProofServAuthSetup_t)(f);
3519  else {
3520  Error("OldAuthSetup", "can't find OldProofServAuthSetup");
3521  return -1;
3522  }
3523  }
3524  //
3525  // Setup
3526  return (*oldAuthSetupHook)(fSocket, IsMaster(), fProtocol,
3527  fUser, fOrdinal, conf);
3528 }
3529 
3530 ////////////////////////////////////////////////////////////////////////////////
3531 /// Create a TProofQueryResult instance for this query.
3532 
3534  const char *opt,
3535  TList *inlist, Long64_t fst,
3536  TDSet *dset, const char *selec,
3537  TObject *elist)
3538 {
3539  // Increment sequential number
3540  Int_t seqnum = -1;
3541  if (fQMgr) {
3543  seqnum = fQMgr->SeqNum();
3544  }
3545 
3546  // Locally we always use the current streamer
3547  Bool_t olds = (dset && dset->TestBit(TDSet::kWriteV3)) ? kTRUE : kFALSE;
3548  if (olds)
3549  dset->SetWriteV3(kFALSE);
3550 
3551  // Create the instance and add it to the list
3552  TProofQueryResult *pqr = new TProofQueryResult(seqnum, opt, inlist, nent,
3553  fst, dset, selec, elist);
3554  // Title is the session identifier
3556 
3557  // Restore old streamer info
3558  if (olds)
3559  dset->SetWriteV3(kTRUE);
3560 
3561  return pqr;
3562 }
3563 
3564 ////////////////////////////////////////////////////////////////////////////////
3565 /// Set query in running state.
3566 
3568 {
3569  // Record current position in the log file at start
3570  fflush(stdout);
3571  Int_t startlog = lseek(fileno(stdout), (off_t) 0, SEEK_END);
3572 
3573  // Add some header to logs
3574  Printf(" ");
3575  Info("SetQueryRunning", "starting query: %d", pq->GetSeqNum());
3576 
3577  // Build the list of loaded PAR packages
3578  TString parlist = "";
3579  fPackMgr->GetEnabledPackages(parlist);
3580 
3581  if (fProof) {
3582  // Set in running state
3583  pq->SetRunning(startlog, parlist, fProof->GetParallel());
3584 
3585  // Bytes and CPU at start (we will calculate the differential at end)
3586  pq->SetProcessInfo(pq->GetEntries(),
3588  } else {
3589  // Set in running state
3590  pq->SetRunning(startlog, parlist, -1);
3591 
3592  // Bytes and CPU at start (we will calculate the differential at end)
3593  pq->SetProcessInfo(pq->GetEntries(), float(0.), 0);
3594  }
3595 }
3596 
3597 ////////////////////////////////////////////////////////////////////////////////
3598 /// Handle archive request.
3599 
3601 {
3602  PDB(kGlobal, 1)
3603  Info("HandleArchive", "Enter");
3604 
3605  TString queryref;
3606  TString path;
3607  (*mess) >> queryref >> path;
3608 
3609  if (slb) slb->Form("%s %s", queryref.Data(), path.Data());
3610 
3611  // If this is a set default action just save the default
3612  if (queryref == "Default") {
3613  fArchivePath = path;
3614  Info("HandleArchive",
3615  "default path set to %s", fArchivePath.Data());
3616  return;
3617  }
3618 
3619  Int_t qry = -1;
3620  TString qdir;
3621  TProofQueryResult *pqr = fQMgr ? fQMgr->LocateQuery(queryref, qry, qdir) : 0;
3622  TProofQueryResult *pqm = pqr;
3623 
3624  if (path.Length() <= 0) {
3625  if (fArchivePath.Length() <= 0) {
3626  Info("HandleArchive",
3627  "archive paths are not defined - do nothing");
3628  return;
3629  }
3630  if (qry > 0) {
3631  path.Form("%s/session-%s-%d.root",
3632  fArchivePath.Data(), fTopSessionTag.Data(), qry);
3633  } else {
3634  path = queryref;
3635  path.ReplaceAll(":q","-");
3636  path.Insert(0, TString::Format("%s/",fArchivePath.Data()));
3637  path += ".root";
3638  }
3639  }
3640 
3641  // Build file name for specific query
3642  if (!pqr || qry < 0) {
3643  TString fout = qdir;
3644  fout += "/query-result.root";
3645 
3646  TFile *f = TFile::Open(fout,"READ");
3647  pqr = 0;
3648  if (f) {
3649  f->ReadKeys();
3650  TIter nxk(f->GetListOfKeys());
3651  TKey *k = 0;
3652  while ((k = (TKey *)nxk())) {
3653  if (!strcmp(k->GetClassName(), "TProofQueryResult")) {
3654  pqr = (TProofQueryResult *) f->Get(k->GetName());
3655  if (pqr)
3656  break;
3657  }
3658  }
3659  f->Close();
3660  delete f;
3661  } else {
3662  Info("HandleArchive",
3663  "file cannot be open (%s)",fout.Data());
3664  return;
3665  }
3666  }
3667 
3668  if (pqr) {
3669 
3670  PDB(kGlobal, 1) Info("HandleArchive",
3671  "archive path for query #%d: %s",
3672  qry, path.Data());
3673  TFile *farc = 0;
3674  if (gSystem->AccessPathName(path))
3675  farc = TFile::Open(path,"NEW");
3676  else
3677  farc = TFile::Open(path,"UPDATE");
3678  if (!farc || !(farc->IsOpen())) {
3679  Info("HandleArchive",
3680  "archive file cannot be open (%s)",path.Data());
3681  return;
3682  }
3683  farc->cd();
3684 
3685  // Update query status
3686  pqr->SetArchived(path);
3687  if (pqm)
3688  pqm->SetArchived(path);
3689 
3690  // Write to file
3691  pqr->Write();
3692 
3693  // Update temporary files too
3694  if (qry > -1 && fQMgr)
3695  fQMgr->SaveQuery(pqr);
3696 
3697  // Notify
3698  Info("HandleArchive",
3699  "results of query %s archived to file %s",
3700  queryref.Data(), path.Data());
3701  }
3702 
3703  // Done
3704  return;
3705 }
3706 
3707 ////////////////////////////////////////////////////////////////////////////////
3708 /// Get a map {server-name, list-of-files} for collection 'fc' to be used in
3709 /// TPacketizerFile. Returns a pointer to the map (ownership of the caller).
3710 /// Or (TMap *)0 and an error message in emsg.
3711 
3713 {
3714  TMap *fcmap = 0;
3715  emsg = "";
3716 
3717  // Sanity checks
3718  if (!fc) {
3719  emsg.Form("file collection undefined!");
3720  return fcmap;
3721  }
3722 
3723  // Prepare data set map
3724  fcmap = new TMap();
3725 
3726  TIter nxf(fc->GetList());
3727  TFileInfo *fiind = 0;
3728  TString key;
3729  while ((fiind = (TFileInfo *)nxf())) {
3730  TUrl *xurl = fiind->GetCurrentUrl();
3731  // Find the key for this server
3732  key.Form("%s://%s", xurl->GetProtocol(), xurl->GetHostFQDN());
3733  if (xurl->GetPort() > 0)
3734  key += TString::Format(":%d", xurl->GetPort());
3735  // Get the map entry for this key
3736  TPair *ent = 0;
3737  THashList* l = 0;
3738  if ((ent = (TPair *) fcmap->FindObject(key.Data()))) {
3739  // Attach to the list
3740  l = (THashList *) ent->Value();
3741  } else {
3742  // Create list
3743  l = new THashList;
3744  l->SetOwner(kTRUE);
3745  // Add it to the map
3746  fcmap->Add(new TObjString(key.Data()), l);
3747  }
3748  // Add fileinfo with index to list
3749  l->Add(fiind);
3750  }
3751 
3752  // Done
3753  return fcmap;
3754 }
3755 
3756 ////////////////////////////////////////////////////////////////////////////////
3757 /// Handle processing request.
3758 
3760 {
3761  PDB(kGlobal, 1)
3762  Info("HandleProcess", "Enter");
3763 
3764  // Nothing to do for slaves if we are not idle
3765  if (!IsTopMaster() && !IsIdle())
3766  return;
3767 
3768  TDSet *dset;
3769  TString filename, opt;
3770  TList *input;
3772  TEventList *evl = 0;
3773  TEntryList *enl = 0;
3774  Bool_t sync;
3775 
3776  (*mess) >> dset >> filename >> input >> opt >> nentries >> first >> evl >> sync;
3777  // Get entry list information, if any (support started with fProtocol == 15)
3778  if ((mess->BufferSize() > mess->Length()) && fProtocol > 14)
3779  (*mess) >> enl;
3780  Bool_t hasNoData = (!dset || dset->TestBit(TDSet::kEmpty)) ? kTRUE : kFALSE;
3781 
3782  // Priority to the entry list
3783  TObject *elist = (enl) ? (TObject *)enl : (TObject *)evl;
3784  if (enl && evl)
3785  // Cannot specify both at the same time
3786  SafeDelete(evl);
3787  if ((!hasNoData) && elist)
3788  dset->SetEntryList(elist);
3789 
3790  if (IsTopMaster()) {
3791 
3792  TString emsg;
3793  // Make sure the dataset contains the information needed
3794  if ((!hasNoData) && dset->GetListOfElements()->GetSize() == 0) {
3795  if (TProof::AssertDataSet(dset, input, fDataSetManager, emsg) != 0) {
3796  SendAsynMessage(TString::Format("AssertDataSet on %s: %s",
3797  fPrefix.Data(), emsg.Data()));
3798  Error("HandleProcess", "AssertDataSet: %s", emsg.Data());
3799  // To terminate collection
3800  if (sync) SendLogFile();
3801  return;
3802  }
3803  } else if (hasNoData) {
3804  // Check if we are required to process with TPacketizerFile a registered dataset
3805  TNamed *ftp = dynamic_cast<TNamed *>(input->FindObject("PROOF_FilesToProcess"));
3806  if (ftp) {
3807  TString dsn(ftp->GetTitle());
3808  if (!dsn.Contains(":") || dsn.BeginsWith("dataset:")) {
3809  dsn.ReplaceAll("dataset:", "");
3810  // Get the map for TPacketizerFile
3811  // Make sure we have something in input and a dataset manager
3812  if (!fDataSetManager) {
3813  emsg.Form("dataset manager not initialized!");
3814  } else {
3815  TFileCollection *fc = 0;
3816  // Get the dataset
3817  if (!(fc = fDataSetManager->GetDataSet(dsn))) {
3818  emsg.Form("requested dataset '%s' does not exists", dsn.Data());
3819  } else {
3820  TMap *fcmap = GetDataSetNodeMap(fc, emsg);
3821  if (fcmap) {
3822  input->Remove(ftp);
3823  delete ftp;
3824  fcmap->SetOwner(kTRUE);
3825  fcmap->SetName("PROOF_FilesToProcess");
3826  input->Add(fcmap);
3827  }
3828  }
3829  }
3830  if (!emsg.IsNull()) {
3831  SendAsynMessage(TString::Format("HandleProcess on %s: %s",
3832  fPrefix.Data(), emsg.Data()));
3833  Error("HandleProcess", "%s", emsg.Data());
3834  // To terminate collection
3835  if (sync) SendLogFile();
3836  return;
3837  }
3838  }
3839  }
3840  }
3841 
3842  TProofQueryResult *pq = 0;
3843 
3844  // Create instance of query results; we set ownership of the input list
3845  // to the TQueryResult object, to avoid too many instantiations
3846  pq = MakeQueryResult(nentries, opt, 0, first, 0, filename, 0);
3847 
3848  // Prepare the input list and transfer it into the TQueryResult object
3849  if (dset) input->Add(dset);
3850  if (elist) input->Add(elist);
3851  pq->SetInputList(input, kTRUE);
3852 
3853  // Clear the list
3854  input->Clear("nodelete");
3855  SafeDelete(input);
3856 
3857  // Save input data, if any
3858  if (TProof::SaveInputData(pq, fCacheDir.Data(), emsg) != 0)
3859  Warning("HandleProcess", "could not save input data: %s", emsg.Data());
3860 
3861  // If not a draw action add the query to the main list
3862  if (!(pq->IsDraw())) {
3863  if (fQMgr) {
3864  if (fQMgr->Queries()) fQMgr->Queries()->Add(pq);
3865  // Also save it to queries dir
3866  fQMgr->SaveQuery(pq);
3867  }
3868  }
3869 
3870  // Add anyhow to the waiting lists
3871  QueueQuery(pq);
3872 
3873  // Call get Workers
3874  // if we are not idle the scheduler will just enqueue the query and
3875  // send a resume message later.
3876 
3877  Bool_t enqueued = kFALSE;
3878  Int_t pc = 0;
3879  // if the session does not have workers and is in the dynamic mode
3880  if (fProof->UseDynamicStartup()) {
3881  // get the a list of workers and start them
3882  TList* workerList = new TList();
3883  EQueryAction retVal = GetWorkers(workerList, pc);
3884  if (retVal == TProofServ::kQueryStop) {
3885  Error("HandleProcess", "error getting list of worker nodes");
3886  // To terminate collection
3887  if (sync) SendLogFile();
3888  return;
3889  } else if (retVal == TProofServ::kQueryEnqueued) {
3890  // change to an asynchronous query
3891  enqueued = kTRUE;
3892  Info("HandleProcess", "query %d enqueued", pq->GetSeqNum());
3893  } else {
3894  Int_t ret = fProof->AddWorkers(workerList);
3895  if (ret < 0) {
3896  Error("HandleProcess", "Adding a list of worker nodes returned: %d",
3897  ret);
3898  // To terminate collection
3899  if (sync) SendLogFile();
3900  return;
3901  }
3902  }
3903  } else {
3904  EQueryAction retVal = GetWorkers(0, pc);
3905  if (retVal == TProofServ::kQueryStop) {
3906  Error("HandleProcess", "error getting list of worker nodes");
3907  // To terminate collection
3908  if (sync) SendLogFile();
3909  return;
3910  } else if (retVal == TProofServ::kQueryEnqueued) {
3911  // change to an asynchronous query
3912  enqueued = kTRUE;
3913  Info("HandleProcess", "query %d enqueued", pq->GetSeqNum());
3914  } else if (retVal != TProofServ::kQueryOK) {
3915  Error("HandleProcess", "unknown return value: %d", retVal);
3916  // To terminate collection
3917  if (sync) SendLogFile();
3918  return;
3919  }
3920  }
3921 
3922  // If the client submission was asynchronous, signal the submission of
3923  // the query and communicate the assigned sequential number for later
3924  // identification
3926  if (!sync || enqueued) {
3927  m << pq->GetSeqNum() << kFALSE;
3928  fSocket->Send(m);
3929  }
3930 
3931  // Nothing more to do if we are not idle
3932  if (!IsIdle()) {
3933  // Notify submission
3934  Info("HandleProcess",
3935  "query \"%s:%s\" submitted", pq->GetTitle(), pq->GetName());
3936  return;
3937  }
3938 
3939  // Process
3940  // in the static mode, if a session is enqueued it will be processed after current query
3941  // (there is no way to enqueue if idle).
3942  // in the dynamic mode we will process here only if the session was idle and got workers!
3943  Bool_t doprocess = kFALSE;
3944  while (WaitingQueries() > 0 && !enqueued) {
3945  doprocess = kTRUE;
3946  //
3947  ProcessNext(slb);
3948  // avoid processing async queries sent during processing in dyn mode
3949  if (fProof->UseDynamicStartup())
3950  enqueued = kTRUE;
3951 
3952  } // Loop on submitted queries
3953 
3954  // Set idle
3955  SetIdle(kTRUE);
3956 
3957  // Reset mergers
3958  fProof->ResetMergers();
3959 
3960  // kPROOF_SETIDLE sets the client to idle; in asynchronous mode clients monitor
3961  // TProof::IsIdle for to check the readiness of a query, so we need to send this
3962  // before to be sure thatn everything about a query is received by the client
3963  if (!sync) SendLogFile();
3964 
3965  // Signal the client that we are idle
3966  if (doprocess) {
3967  m.Reset(kPROOF_SETIDLE);
3968  Bool_t waiting = (WaitingQueries() > 0) ? kTRUE : kFALSE;
3969  m << waiting;
3970  fSocket->Send(m);
3971  }
3972 
3973  // In synchronous mode TProof::Collect is terminated by the reception of the
3974  // log file and subsequent submissions are controlled by TProof::IsIdle(), so
3975  // this must be last one to be sent
3976  if (sync) SendLogFile();
3977 
3978  // Set idle
3979  SetIdle(kTRUE);
3980 
3981  } else {
3982 
3983  // Reset compute stopwatch: we include all what done from now on
3984  fCompute.Reset();
3985  fCompute.Start();
3986 
3987  // Set not idle
3988  SetIdle(kFALSE);
3989 
3990  // Cleanup the player
3991  Bool_t deleteplayer = kTRUE;
3992  MakePlayer();
3993 
3994  // Setup data set
3995  if (dset && (dset->IsA() == TDSetProxy::Class()))
3996  ((TDSetProxy*)dset)->SetProofServ(this);
3997 
3998  // Get input data, if any
3999  TString emsg;
4000  if (TProof::GetInputData(input, fCacheDir.Data(), emsg) != 0)
4001  Warning("HandleProcess", "could not get input data: %s", emsg.Data());
4002 
4003  // Get query sequential number
4004  if (TProof::GetParameter(input, "PROOF_QuerySeqNum", fQuerySeqNum) != 0)
4005  Warning("HandleProcess", "could not get query sequential number!");
4006 
4007  // Make the ordinal number available in the selector
4008  TObject *nord = 0;
4009  while ((nord = input->FindObject("PROOF_Ordinal")))
4010  input->Remove(nord);
4011  input->Add(new TNamed("PROOF_Ordinal", GetOrdinal()));
4012 
4013  // Set input
4014  TIter next(input);
4015  TObject *o = 0;
4016  while ((o = next())) {
4017  PDB(kGlobal, 2) Info("HandleProcess", "adding: %s", o->GetName());
4018  fPlayer->AddInput(o);
4019  }
4020 
4021  // Check if a TSelector object is passed via input list
4022  TObject *obj = 0;
4023  TSelector *selector_obj = 0;
4024  TIter nxt(input);
4025  while ((obj = nxt())){
4026  if (obj->InheritsFrom("TSelector")) {
4027  selector_obj = (TSelector *) obj;
4028  filename = selector_obj->ClassName();
4029  Info("HandleProcess", "selector obj for '%s' found", selector_obj->ClassName());
4030  break;
4031  }
4032  }
4033 
4034  // Signal the master that we are starting processing
4036 
4037  // Reset latency stopwatch
4038  fLatency.Reset();
4039  fSaveOutput.Reset();
4040 
4041  // Process
4042  PDB(kGlobal, 1) Info("HandleProcess", "calling %s::Process()", fPlayer->IsA()->GetName());
4043 
4044  if (selector_obj){
4045  Info("HandleProcess", "calling fPlayer->Process() with selector object: %s", selector_obj->ClassName());
4046  fPlayer->Process(dset, selector_obj, opt, nentries, first);
4047  }
4048  else {
4049  Info("HandleProcess", "calling fPlayer->Process() with selector name: %s", filename.Data());
4050  fPlayer->Process(dset, filename, opt, nentries, first);
4051  }
4052 
4053  // Return number of events processed
4056  if (fProtocol > 18) {
4057  TProofProgressStatus* status =
4059  gPerfStats?gPerfStats->GetBytesRead():0);
4060  if (status)
4061  m << status << abort;
4062  if (slb)
4063  slb->Form("%d %lld %lld", fPlayer->GetExitStatus(),
4064  status->GetEntries(), status->GetBytesRead());
4065  SafeDelete(status);
4066  } else {
4067  m << fPlayer->GetEventsProcessed() << abort;
4068  if (slb)
4069  slb->Form("%d %lld -1", fPlayer->GetExitStatus(), fPlayer->GetEventsProcessed());
4070  }
4071 
4072  fSocket->Send(m);
4073  PDB(kGlobal, 2)
4074  Info("TProofServ::Handleprocess",
4075  "worker %s has finished processing with %d objects in output list",
4077 
4078  // Cleanup the input data set info
4079  SafeDelete(dset);
4080  SafeDelete(enl);
4081  SafeDelete(evl);
4082 
4085  if (outok) {
4086  // Check if in controlled output sending mode or submerging
4087  Int_t cso = 0;
4088  Bool_t isSubMerging = kFALSE;
4089 
4090  // Check if we are in merging mode (i.e. parameter PROOF_UseMergers exists)
4091  Int_t nm = 0;
4092  if (TProof::GetParameter(input, "PROOF_UseMergers", nm) == 0) {
4093  isSubMerging = (nm >= 0) ? kTRUE : kFALSE;
4094  }
4095  if (!isSubMerging) {
4096  cso = gEnv->GetValue("Proof.ControlSendOutput", 1);
4097  if (TProof::GetParameter(input, "PROOF_ControlSendOutput", cso) != 0)
4098  cso = gEnv->GetValue("Proof.ControlSendOutput", 1);
4099  }
4100 
4101  if (cso > 0) {
4102 
4103  // Control output sending mode: wait for the master to ask for the objects.
4104  // Allows controls of memory usage on the master.
4106  fSocket->Send(msg);
4107 
4108  // Set idle
4109  SetIdle(kTRUE);
4110 
4111  // Do not cleanup the player yet: it will be used in sending output activities
4112  deleteplayer = kFALSE;
4113 
4114  PDB(kGlobal, 1)
4115  Info("HandleProcess", "controlled mode: worker %s has finished,"
4116  " sizes sent to master", fOrdinal.Data());
4117  } else {
4118 
4119  // Check if we are in merging mode (i.e. parameter PROOF_UseMergers exists)
4121  if (isSubMerging)
4122  Info("HandleProcess", "submerging disabled because of high-memory case");
4123  isSubMerging = kFALSE;
4124  } else {
4125  PDB(kGlobal, 2) Info("HandleProcess", "merging mode check: %d", isSubMerging);
4126  }
4127 
4128  if (!IsMaster() && isSubMerging) {
4129  // Worker in merging mode.
4130  //----------------------------
4131  // First, it reports only the size of its output to the master
4132  // + port on which it can possibly accept outputs from other workers if it becomes a merger
4133  // Master will later tell it where it should send the output (either to the master or to some merger)
4134  // or if it should become a merger
4135 
4136  TMessage msg_osize(kPROOF_SUBMERGER);
4137  msg_osize << Int_t(TProof::kOutputSize);
4138  msg_osize << fPlayer->GetOutputList()->GetEntries();
4139 
4140  fMergingSocket = new TServerSocket(0);
4141  Int_t merge_port = 0;
4142  if (fMergingSocket) {
4143  PDB(kGlobal, 2)
4144  Info("HandleProcess", "possible port for merging connections: %d",
4146  merge_port = fMergingSocket->GetLocalPort();
4147  }
4148  msg_osize << merge_port;
4149  fSocket->Send(msg_osize);
4150 
4151  // Set idle
4152  SetIdle(kTRUE);
4153 
4154  // Do not cleanup the player yet: it will be used in sub-merging activities
4155  deleteplayer = kFALSE;
4156 
4157  PDB(kSubmerger, 2) Info("HandleProcess", "worker %s has finished", fOrdinal.Data());
4158 
4159  } else {
4160  // Sub-master OR worker not in merging mode
4161  // ---------------------------------------------
4162  PDB(kGlobal, 2) Info("HandleProcess", "sending result directly to master");
4163  if (SendResults(fSocket, fPlayer->GetOutputList()) != 0)
4164  Warning("HandleProcess","problems sending output list");
4165 
4166  // Masters reset the mergers, if any
4167  if (IsMaster()) fProof->ResetMergers();
4168 
4169  // Signal the master that we are idle
4171 
4172  // Set idle
4173  SetIdle(kTRUE);
4174 
4175  // Notify the user
4176  SendLogFile();
4177  }
4178 
4179 
4180 
4181  }
4182 
4183  } else {
4184  // No output list
4186  Warning("HandleProcess","the output list is empty!");
4187  if (SendResults(fSocket) != 0)
4188  Warning("HandleProcess", "problems sending output list");
4189 
4190  // Masters reset the mergers, if any
4191  if (IsMaster()) fProof->ResetMergers();
4192 
4193  // Signal the master that we are idle
4195 
4196  // Set idle
4197  SetIdle(kTRUE);
4198 
4199  // Notify the user
4200  SendLogFile();
4201  }
4202 
4203  // Prevent from double-deleting in input
4204  TIter nex(input);
4205  while ((obj = nex())) {
4206  if (obj->InheritsFrom("TSelector")) input->Remove(obj);
4207  }
4208 
4209  // Make also sure the input list objects are deleted
4210  fPlayer->GetInputList()->SetOwner(0);
4211 
4212  // Remove possible inputs from a file and the file, if any
4213  TList *added = dynamic_cast<TList *>(input->FindObject("PROOF_InputObjsFromFile"));
4214  if (added) {
4215  if (added->GetSize() > 0) {
4216  // The file must be the last one
4217  TFile *f = dynamic_cast<TFile *>(added->Last());
4218  if (f) {
4219  added->Remove(f);
4220  TIter nxo(added);
4221  while ((o = nxo())) { input->Remove(o); }
4222  input->Remove(added);
4223  added->SetOwner(kFALSE);
4224  added->Clear();
4225  f->Close();
4226  delete f;
4227  }
4228  }
4229  SafeDelete(added);
4230  }
4231  input->SetOwner();
4232  SafeDelete(input);
4233 
4234  // Cleanup if required
4235  if (deleteplayer) DeletePlayer();
4236  }
4237 
4238  PDB(kGlobal, 1) Info("HandleProcess", "done");
4239 
4240  // Done
4241  return;
4242 }
4243 
4244 ////////////////////////////////////////////////////////////////////////////////
4245 /// Sends all objects from the given list to the specified socket
4246 
4248 {
4249  PDB(kOutput, 2) Info("SendResults", "enter");
4250 
4251  TString msg;
4252  if (fProtocol > 23 && outlist) {
4253  // Send objects in bunches of max fMsgSizeHWM bytes to optimize transfer
4254  // Objects are merged one-by-one by the client
4255  // Messages for objects
4257  // Objects in the output list
4258  Int_t olsz = outlist->GetSize();
4259  if (IsTopMaster() && pq) {
4260  msg.Form("%s: merging output objects ... done ",
4261  fPrefix.Data());
4262  SendAsynMessage(msg.Data());
4263  // Message for the client
4264  msg.Form("%s: objects merged; sending output: %d objs", fPrefix.Data(), olsz);
4265  SendAsynMessage(msg.Data(), kFALSE);
4266  // Send light query info
4267  mbuf << (Int_t) 0;
4268  mbuf.WriteObject(pq);
4269  if (sock->Send(mbuf) < 0) return -1;
4270  }
4271  // Objects in the output list
4272  Int_t ns = 0, np = 0;
4273  TIter nxo(outlist);
4274  TObject *o = 0;
4275  Int_t totsz = 0, objsz = 0;
4276  mbuf.Reset();
4277  while ((o = nxo())) {
4278  if (mbuf.Length() > fMsgSizeHWM) {
4279  PDB(kOutput, 1)
4280  Info("SendResults",
4281  "message has %d bytes: limit of %lld bytes reached - sending ...",
4282  mbuf.Length(), fMsgSizeHWM);
4283  // Compress the message, if required; for these messages we do it already
4284  // here so we get the size; TXSocket does not do it twice.
4285  if (GetCompressionLevel() > 0) {
4287  mbuf.Compress();
4288  objsz = mbuf.CompLength();
4289  } else {
4290  objsz = mbuf.Length();
4291  }
4292  totsz += objsz;
4293  if (IsTopMaster()) {
4294  msg.Form("%s: objects merged; sending obj %d/%d (%d bytes) ",
4295  fPrefix.Data(), ns, olsz, objsz);
4296  SendAsynMessage(msg.Data(), kFALSE);
4297  }
4298  if (sock->Send(mbuf) < 0) return -1;
4299  // Reset the message
4300  mbuf.Reset();
4301  np = 0;
4302  }
4303  ns++;
4304  np++;
4305  mbuf << (Int_t) ((ns >= olsz) ? 2 : 1);
4306  mbuf << o;
4307  }
4308  if (np > 0) {
4309  // Compress the message, if required; for these messages we do it already
4310  // here so we get the size; TXSocket does not do it twice.
4311  if (GetCompressionLevel() > 0) {
4313  mbuf.Compress();
4314  objsz = mbuf.CompLength();
4315  } else {
4316  objsz = mbuf.Length();
4317  }
4318  totsz += objsz;
4319  if (IsTopMaster()) {
4320  msg.Form("%s: objects merged; sending obj %d/%d (%d bytes) ",
4321  fPrefix.Data(), ns, olsz, objsz);
4322  SendAsynMessage(msg.Data(), kFALSE);
4323  }
4324  if (sock->Send(mbuf) < 0) return -1;
4325  }
4326  if (IsTopMaster()) {
4327  // Send total size
4328  msg.Form("%s: grand total: sent %d objects, size: %d bytes ",
4329  fPrefix.Data(), olsz, totsz);
4330  SendAsynMessage(msg.Data());
4331  }
4332  } else if (fProtocol > 10 && outlist) {
4333 
4334  // Send objects one-by-one to optimize transfer and merging
4335  // Messages for objects
4337  // Objects in the output list
4338  Int_t olsz = outlist->GetSize();
4339  if (IsTopMaster() && pq) {
4340  msg.Form("%s: merging output objects ... done ",
4341  fPrefix.Data());
4342  SendAsynMessage(msg.Data());
4343  // Message for the client
4344  msg.Form("%s: objects merged; sending output: %d objs", fPrefix.Data(), olsz);
4345  SendAsynMessage(msg.Data(), kFALSE);
4346  // Send light query info
4347  mbuf << (Int_t) 0;
4348  mbuf.WriteObject(pq);
4349  if (sock->Send(mbuf) < 0) return -1;
4350  }
4351 
4352  Int_t ns = 0;
4353  Int_t totsz = 0, objsz = 0;
4354  TIter nxo(fPlayer->GetOutputList());
4355  TObject *o = 0;
4356  while ((o = nxo())) {
4357  ns++;
4358  mbuf.Reset();
4359  Int_t type = (Int_t) ((ns >= olsz) ? 2 : 1);
4360  mbuf << type;
4361  mbuf.WriteObject(o);
4362  // Compress the message, if required; for these messages we do it already
4363  // here so we get the size; TXSocket does not do it twice.
4364  if (GetCompressionLevel() > 0) {
4366  mbuf.Compress();
4367  objsz = mbuf.CompLength();
4368  } else {
4369  objsz = mbuf.Length();
4370  }
4371  totsz += objsz;
4372  if (IsTopMaster()) {
4373  msg.Form("%s: objects merged; sending obj %d/%d (%d bytes) ",
4374  fPrefix.Data(), ns, olsz, objsz);
4375  SendAsynMessage(msg.Data(), kFALSE);
4376  }
4377  if (sock->Send(mbuf) < 0) return -1;
4378  }
4379  // Total size
4380  if (IsTopMaster()) {
4381  // Send total size
4382  msg.Form("%s: grand total: sent %d objects, size: %d bytes ",
4383  fPrefix.Data(), olsz, totsz);
4384  SendAsynMessage(msg.Data());
4385  }
4386 
4387  } else if (IsTopMaster() && fProtocol > 6 && outlist) {
4388 
4389  // Buffer to be sent
4391  mbuf.WriteObject(pq);
4392  // Sizes
4393  Int_t blen = mbuf.CompLength();
4394  Int_t olsz = outlist->GetSize();
4395  // Message for the client
4396  msg.Form("%s: sending output: %d objs, %d bytes", fPrefix.Data(), olsz, blen);
4397  SendAsynMessage(msg.Data(), kFALSE);
4398  if (sock->Send(mbuf) < 0) return -1;
4399 
4400  } else {
4401  if (outlist) {
4402  PDB(kGlobal, 2) Info("SendResults", "sending output list");
4403  } else {
4404  PDB(kGlobal, 2) Info("SendResults", "notifying failure or abort");
4405  }
4406  if (sock->SendObject(outlist, kPROOF_OUTPUTLIST) < 0) return -1;
4407  }
4408 
4409  PDB(kOutput,2) Info("SendResults", "done");
4410 
4411  // Done
4412  return 0;
4413 }
4414 
4415 ////////////////////////////////////////////////////////////////////////////////
4416 /// process the next query from the queue of submitted jobs.
4417 /// to be called on the top master only.
4418 
4420 {
4421  TDSet *dset = 0;
4422  TString filename, opt;
4423  TList *input = 0;
4424  Long64_t nentries = -1, first = 0;
4425 
4426  // TObject *elist = 0;
4427  TProofQueryResult *pq = 0;
4428 
4429  TObject* obj = 0;
4430  TSelector* selector_obj = 0;
4431 
4432  // Process
4433 
4434  // Reset compute stopwatch: we include all what done from now on
4435  fCompute.Reset();
4436  fCompute.Start();
4437 
4438  // Get next query info (also removes query from the list)
4439  pq = NextQuery();
4440  if (pq) {
4441 
4442  // Set not idle
4443  SetIdle(kFALSE);
4444  opt = pq->GetOptions();
4445  input = pq->GetInputList();
4446  nentries = pq->GetEntries();
4447  first = pq->GetFirst();
4448  filename = pq->GetSelecImp()->GetName();
4449  Ssiz_t id = opt.Last('#');
4450  if (id != kNPOS && id < opt.Length() - 1) {
4451  filename += opt(id + 1, opt.Length());
4452  // Remove it from 'opt' so user found on the workers what they specified
4453  opt.Remove(id);
4454  }
4455  // Attach to data set and entry- (or event-) list (if any)
4456  TObject *o = 0;
4457  if ((o = pq->GetInputObject("TDSet"))) {
4458  dset = (TDSet *) o;
4459  } else {
4460  // Should never get here
4461  Error("ProcessNext", "no TDset object: cannot continue");
4462  return;
4463  }
4464  // elist = 0;
4465  // if ((o = pq->GetInputObject("TEntryList")))
4466  // elist = o;
4467  // else if ((o = pq->GetInputObject("TEventList")))
4468  // elist = o;
4469 
4470  // Expand selector files
4471  if (pq->GetSelecImp()) {
4472  gSystem->Exec(TString::Format("%s %s", kRM, pq->GetSelecImp()->GetName()));
4473  pq->GetSelecImp()->SaveSource(pq->GetSelecImp()->GetName());
4474  }
4475  if (pq->GetSelecHdr() &&
4476  !strstr(pq->GetSelecHdr()->GetName(), "TProofDrawHist")) {
4477  gSystem->Exec(TString::Format("%s %s", kRM, pq->GetSelecHdr()->GetName()));
4478  pq->GetSelecHdr()->SaveSource(pq->GetSelecHdr()->GetName());
4479  }
4480 
4481  // Taking out a TSelector object from input list
4482  TIter nxt(input);
4483  while ((obj = nxt())){
4484  if (obj->InheritsFrom("TSelector") &&
4485  !strcmp(pq->GetSelecImp()->GetName(), obj->ClassName())) {
4486  selector_obj = (TSelector *) obj;
4487  Info("ProcessNext", "found object for selector '%s'", obj->ClassName());
4488  break;
4489  }
4490  }
4491 
4492  } else {
4493  // Should never get here
4494  Error("ProcessNext", "empty waiting queries list!");
4495  return;
4496  }
4497 
4498  // Set in running state
4499  SetQueryRunning(pq);
4500 
4501  // Save to queries dir, if not standard draw
4502  if (fQMgr) {
4503  if (!(pq->IsDraw()))
4504  fQMgr->SaveQuery(pq);
4505  else
4507  fQMgr->ResetTime();
4508  }
4509 
4510  // Signal the client that we are starting a new query
4512  m << TString(pq->GetSelecImp()->GetName())
4513  << dset->GetNumOfFiles()
4514  << pq->GetFirst() << pq->GetEntries();
4515  fSocket->Send(m);
4516 
4517  // Create player
4518  MakePlayer();
4519 
4520  // Add query results to the player lists
4521  fPlayer->AddQueryResult(pq);
4522 
4523  // Set query currently processed
4524  fPlayer->SetCurrentQuery(pq);
4525 
4526  // Setup data set
4527  if (dset->IsA() == TDSetProxy::Class())
4528  ((TDSetProxy*)dset)->SetProofServ(this);
4529 
4530  // Add the unique query tag as TNamed object to the input list
4531  // so that it is available in TSelectors for monitoring
4532  TString qid = TString::Format("%s:%s",pq->GetTitle(),pq->GetName());
4533  input->Add(new TNamed("PROOF_QueryTag", qid.Data()));
4534  // ... and the sequential number
4535  fQuerySeqNum = pq->GetSeqNum();
4536  input->Add(new TParameter<Int_t>("PROOF_QuerySeqNum", fQuerySeqNum));
4537 
4538  // Check whether we have to enforce the use of submergers, but only if the user did
4539  // not express itself on the subject
4540  if (gEnv->Lookup("Proof.UseMergers") && !input->FindObject("PROOF_UseMergers")) {
4541  Int_t smg = gEnv->GetValue("Proof.UseMergers",-1);
4542  if (smg >= 0) {
4543  input->Add(new TParameter<Int_t>("PROOF_UseMergers", smg));
4544  PDB(kSubmerger, 2) Info("ProcessNext", "PROOF_UseMergers set to %d", smg);
4545  if (gEnv->Lookup("Proof.MergersByHost")) {
4546  Int_t mbh = gEnv->GetValue("Proof.MergersByHost", 0);
4547  if (mbh != 0) {
4548  // Administrator settings have the priority
4549  TObject *o = 0;
4550  if ((o = input->FindObject("PROOF_MergersByHost"))) { input->Remove(o); delete o; }
4551  input->Add(new TParameter<Int_t>("PROOF_MergersByHost", mbh));
4552  PDB(kSubmerger, 2) Info("ProcessNext", "submergers setup by host/node");
4553  }
4554  }
4555  }
4556  }
4557 
4558  // Set input
4559  TIter next(input);
4560  TObject *o = 0;
4561  while ((o = next())) {
4562  PDB(kGlobal, 2) Info("ProcessNext", "adding: %s", o->GetName());
4563  fPlayer->AddInput(o);
4564  }
4565 
4566  // Remove the list of the missing files from the original list, if any
4567  if ((o = input->FindObject("MissingFiles"))) input->Remove(o);
4568 
4569  // Process
4570  PDB(kGlobal, 1) Info("ProcessNext", "calling %s::Process()", fPlayer->IsA()->GetName());
4571  if (selector_obj){
4572  Info("ProcessNext", "calling fPlayer->Process() with selector object: %s", selector_obj->ClassName());
4573  fPlayer->Process(dset, selector_obj, opt, nentries, first);
4574  }
4575  else {
4576  Info("ProcessNext", "calling fPlayer->Process() with selector name: %s", filename.Data());
4577  fPlayer->Process(dset, filename, opt, nentries, first);
4578  }
4579 
4580  // This is the end of merging
4582 
4583  // Return number of events processed
4584  Bool_t abort =
4587  m.Reset(kPROOF_STOPPROCESS);
4588  // message sent from worker to the master
4589  if (fProtocol > 18) {
4591  m << status << abort;
4592  status = 0; // the status belongs to the player.
4593  } else if (fProtocol > 8) {
4594  m << fPlayer->GetEventsProcessed() << abort;
4595  } else {
4597  }
4598  fSocket->Send(m);
4599  }
4600 
4601  // Register any dataset produced during this processing, if required
4603  TNamed *psr = (TNamed *) fPlayer->GetOutputList()->FindObject("PROOFSERV_RegisterDataSet");
4604  if (psr) {
4605  TString emsg;
4606  if (RegisterDataSets(input, fPlayer->GetOutputList(), fDataSetManager, emsg) != 0)
4607  Warning("ProcessNext", "problems registering produced datasets: %s", emsg.Data());
4608  do {
4609  fPlayer->GetOutputList()->Remove(psr);
4610  delete psr;
4611  } while ((psr = (TNamed *) fPlayer->GetOutputList()->FindObject("PROOFSERV_RegisterDataSet")));
4612  }
4613  }
4614 
4615  // Complete filling of the TQueryResult instance
4616  if (fQMgr && !pq->IsDraw()) {
4617  if (!abort) fProof->AskStatistics();
4618  if (fQMgr->FinalizeQuery(pq, fProof, fPlayer))
4619  fQMgr->SaveQuery(pq, fMaxQueries);
4620  }
4621 
4622  // If we were requested to save results on the master and we are not in save-to-file mode
4623  // then we save the results
4624  if (IsTopMaster() && fPlayer->GetOutputList()) {
4625  Bool_t save = kTRUE;
4626  TIter nxo(fPlayer->GetOutputList());
4627  TObject *xo = 0;
4628  while ((xo = nxo())) {
4629  if (xo->InheritsFrom("TProofOutputFile") && xo->TestBit(TProofOutputFile::kSwapFile)) {
4630  save = kFALSE;
4631  break;
4632  }
4633  }
4634  if (save) {
4635  TNamed *nof = (TNamed *) input->FindObject("PROOF_DefaultOutputOption");
4636  if (nof) {
4637  TString oopt(nof->GetTitle());
4638  if (oopt.BeginsWith("of:")) {
4639  oopt.Replace(0, 3, "");
4640  if (!oopt.IsNull()) fPlayer->SetOutputFilePath(oopt);
4642  }
4643  }
4644  }
4645  }
4646 
4647  // Send back the results
4648  TQueryResult *pqr = pq->CloneInfo();
4649  // At least the TDSet name in the light object
4650  Info("ProcessNext", "adding info about dataset '%s' in the light query result", dset->GetName());
4651  TList rin;
4652  TDSet *ds = new TDSet(dset->GetName(), dset->GetObjName());
4653  rin.Add(ds);
4654  if (pqr) pqr->SetInputList(&rin, kTRUE);
4656  PDB(kGlobal, 2)
4657  Info("ProcessNext", "sending results");
4658  TQueryResult *xpq = (pqr && fProtocol > 10) ? pqr : pq;
4659  if (SendResults(fSocket, fPlayer->GetOutputList(), xpq) != 0)
4660  Warning("ProcessNext", "problems sending output list");
4661  if (slb) slb->Form("%d %lld %lld %.3f", fPlayer->GetExitStatus(), pq->GetEntries(),
4662  pq->GetBytes(), pq->GetUsedCPU());
4663  } else {
4665  Warning("ProcessNext","the output list is empty!");
4666  if (SendResults(fSocket, fPlayer->GetOutputList()) != 0)
4667  Warning("ProcessNext", "problems sending output list");
4668  if (slb) slb->Form("%d -1 -1 %.3f", fPlayer->GetExitStatus(), pq->GetUsedCPU());
4669  }
4670 
4671  // Remove aborted queries from the list
4673  SafeDelete(pqr);
4674  if (fQMgr) fQMgr->RemoveQuery(pq);
4675  } else {
4676  // Keep in memory only light infor about a query
4677  if (!(pq->IsDraw()) && pqr) {
4678  if (fQMgr && fQMgr->Queries()) {
4679  fQMgr->Queries()->Add(pqr);
4680  // Remove from the fQueries list
4681  fQMgr->Queries()->Remove(pq);
4682  }
4683  // These removes 'pq' from the internal player list and
4684  // deletes it; in this way we do not attempt a double delete
4685  // when destroying the player
4687  pq->GetTitle(), pq->GetName()));
4688  }
4689  }
4690 
4691  DeletePlayer();
4692  if (IsMaster() && fProof->UseDynamicStartup())
4693  // stop the workers
4694  fProof->RemoveWorkers(0);
4695 }
4696 
4697 ////////////////////////////////////////////////////////////////////////////////
4698 /// Register TFileCollections in 'out' as datasets according to the rules in 'in'
4699 
4701  TDataSetManager *dsm, TString &msg)
4702 {
4703  PDB(kDataset, 1)
4704  ::Info("TProofServ::RegisterDataSets",
4705  "enter: %d objs in the output list", (out ? out->GetSize() : -1));
4706 
4707  if (!in || !out || !dsm) {
4708  ::Error("TProofServ::RegisterDataSets", "invalid inputs: %p, %p, %p", in, out, dsm);
4709  return 0;
4710  }
4711  msg = "";
4712  THashList tags;
4713  TList torm;
4714  TIter nxo(out);
4715  TObject *o = 0;
4716  while ((o = nxo())) {
4717  // Only file collections TFileCollection
4718  TFileCollection *ds = dynamic_cast<TFileCollection*> (o);
4719  if (ds) {
4720  // Origin of this dataset
4721  ds->SetTitle(gSystem->HostName());
4722  // The tag and register option
4723  TNamed *fcn = 0;
4724  TString tag = TString::Format("DATASET_%s", ds->GetName());
4725  if (!(fcn = (TNamed *) out->FindObject(tag))) continue;
4726  // If this tag is in the list of processed tags, flag it for removal
4727  if (tags.FindObject(tag)) {
4728  torm.Add(o);
4729  continue;
4730  }
4731  // Register option
4732  TString regopt(fcn->GetTitle());
4733  // Sort according to the internal index, if required
4734  if (regopt.Contains(":sortidx:")) {
4735  ds->Sort(kTRUE);
4736  regopt.ReplaceAll(":sortidx:", "");
4737  }
4738  // Register this dataset
4740  // Extract the list
4741  if (ds->GetList()->GetSize() > 0) {
4742  // Register the dataset (quota checks are done inside here)
4743  const char *vfmsg = regopt.Contains("V") ? " and verifying" : "";
4744  msg.Form("Registering%s dataset '%s' ... ", vfmsg, ds->GetName());
4745  // Always allow verification for this action
4746  Bool_t allowVerify = dsm->TestBit(TDataSetManager::kAllowVerify) ? kTRUE : kFALSE;
4747  if (regopt.Contains("V") && !allowVerify) dsm->SetBit(TDataSetManager::kAllowVerify);
4748  // Main action
4749  Int_t rc = dsm->RegisterDataSet(ds->GetName(), ds, regopt);
4750  // Reset to the previous state if needed
4751  if (regopt.Contains("V") && !allowVerify) dsm->ResetBit(TDataSetManager::kAllowVerify);
4752  if (rc != 0) {
4753  ::Warning("TProofServ::RegisterDataSets",
4754  "failure registering or verifying dataset '%s'", ds->GetName());
4755  msg.Form("Registering%s dataset '%s' ... failed! See log for more details", vfmsg, ds->GetName());
4756  } else {
4757  ::Info("TProofServ::RegisterDataSets", "dataset '%s' successfully registered%s",
4758  ds->GetName(), (strlen(vfmsg) > 0) ? " and verified" : "");
4759  msg.Form("Registering%s dataset '%s' ... OK", vfmsg, ds->GetName());
4760  // Add tag to the list of processed tags to avoid double processing
4761  // (there may be more objects with the same name, created by each worker)
4762  tags.Add(new TObjString(tag));
4763  }
4764  // Notify
4765  PDB(kDataset, 2) {
4766  ::Info("TProofServ::RegisterDataSets", "printing collection");
4767  ds->Print("F");
4768  }
4769  } else {
4770  ::Warning("TProofServ::RegisterDataSets", "collection '%s' is empty", o->GetName());
4771  }
4772  } else {
4773  ::Info("TProofServ::RegisterDataSets", "dataset registration not allowed");
4774  return -1;
4775  }
4776  }
4777  }
4778  // Cleanup all temporary stuff possibly created by each worker
4779  TIter nxrm(&torm);
4780  while ((o = nxrm())) out->Remove(o);
4781  torm.SetOwner(kTRUE);
4782  // Remove tags
4783  TIter nxtg(&tags);
4784  while((o = nxtg())) {
4785  TObject *oo = 0;
4786  while ((oo = out->FindObject(o->GetName()))) { out->Remove(oo); }
4787  }
4788  tags.SetOwner(kTRUE);
4789 
4790  PDB(kDataset, 1) ::Info("TProofServ::RegisterDataSets", "exit");
4791  // Done
4792  return 0;
4793 }
4794 
4795 ////////////////////////////////////////////////////////////////////////////////
4796 /// Handle request for list of queries.
4797 
4799 {
4800  PDB(kGlobal, 1)
4801  Info("HandleQueryList", "Enter");
4802 
4803  Bool_t all;
4804  (*mess) >> all;
4805 
4806  TList *ql = new TList;
4807  Int_t ntot = 0, npre = 0, ndraw= 0;
4808  if (fQMgr) {
4809  if (all) {
4810  // Rescan
4811  TString qdir = fQueryDir;
4812  Int_t idx = qdir.Index("session-");
4813  if (idx != kNPOS)
4814  qdir.Remove(idx);
4815  fQMgr->ScanPreviousQueries(qdir);
4816  // Send also information about previous queries, if any
4817  if (fQMgr->PreviousQueries()) {
4818  TIter nxq(fQMgr->PreviousQueries());
4819  TProofQueryResult *pqr = 0;
4820  while ((pqr = (TProofQueryResult *)nxq())) {
4821  ntot++;
4822  pqr->fSeqNum = ntot;
4823  ql->Add(pqr);
4824  }
4825  }
4826  }
4827 
4828  npre = ntot;
4829  if (fQMgr->Queries()) {
4830  // Add info about queries in this session
4831  TIter nxq(fQMgr->Queries());
4832  TProofQueryResult *pqr = 0;
4833  TQueryResult *pqm = 0;
4834  while ((pqr = (TProofQueryResult *)nxq())) {
4835  ntot++;
4836  if ((pqm = pqr->CloneInfo())) {
4837  pqm->fSeqNum = ntot;
4838  ql->Add(pqm);
4839  } else {
4840  Warning("HandleQueryList", "unable to clone TProofQueryResult '%s:%s'",
4841  pqr->GetName(), pqr->GetTitle());
4842  }
4843  }
4844  }
4845  // Number of draw queries
4846  ndraw = fQMgr->DrawQueries();
4847  }
4848 
4850  m << npre << ndraw << ql;
4851  fSocket->Send(m);
4852  delete ql;
4853 
4854  // Done
4855  return;
4856 }
4857 
4858 ////////////////////////////////////////////////////////////////////////////////
4859 /// Handle remove request.
4860 
4862 {
4863  PDB(kGlobal, 1)
4864  Info("HandleRemove", "Enter");
4865 
4866  TString queryref;
4867  (*mess) >> queryref;
4868 
4869  if (slb) *slb = queryref;
4870 
4871  if (queryref == "cleanupqueue") {
4872  // Remove pending requests
4873  Int_t pend = CleanupWaitingQueries();
4874  // Notify
4875  Info("HandleRemove", "%d queries removed from the waiting list", pend);
4876  // We are done
4877  return;
4878  }
4879 
4880  if (queryref == "cleanupdir") {
4881 
4882  // Cleanup previous sessions results
4883  Int_t nd = (fQMgr) ? fQMgr->CleanupQueriesDir() : -1;
4884 
4885  // Notify
4886  Info("HandleRemove", "%d directories removed", nd);
4887  // We are done
4888  return;
4889  }
4890 
4891 
4892  if (fQMgr) {
4893  TProofLockPath *lck = 0;
4894  if (fQMgr->LockSession(queryref, &lck) == 0) {
4895 
4896  // Remove query
4897  TList qtorm;
4898  fQMgr->RemoveQuery(queryref, &qtorm);
4899  CleanupWaitingQueries(kFALSE, &qtorm);
4900 
4901  // Unlock and remove the lock file
4902  if (lck) {
4903  gSystem->Unlink(lck->GetName());
4904  SafeDelete(lck);
4905  }
4906 
4907  // We are done
4908  return;
4909  }
4910  } else {
4911  Warning("HandleRemove", "query result manager undefined!");
4912  }
4913 
4914  // Notify failure
4915  Info("HandleRemove",
4916  "query %s could not be removed (unable to lock session)", queryref.Data());
4917 
4918  // Done
4919  return;
4920 }
4921 
4922 ////////////////////////////////////////////////////////////////////////////////
4923 /// Handle retrieve request.
4924 
4926 {
4927  PDB(kGlobal, 1)
4928  Info("HandleRetrieve", "Enter");
4929 
4930  TString queryref;
4931  (*mess) >> queryref;
4932 
4933  if (slb) *slb = queryref;
4934 
4935  // Parse reference string
4936  Int_t qry = -1;
4937  TString qdir;
4938  if (fQMgr) fQMgr->LocateQuery(queryref, qry, qdir);
4939 
4940  TString fout = qdir;
4941  fout += "/query-result.root";
4942 
4943  TFile *f = TFile::Open(fout,"READ");
4944  TProofQueryResult *pqr = 0;
4945  if (f) {
4946  f->ReadKeys();
4947  TIter nxk(f->GetListOfKeys());
4948  TKey *k = 0;
4949  while ((k = (TKey *)nxk())) {
4950  if (!strcmp(k->GetClassName(), "TProofQueryResult")) {
4951  pqr = (TProofQueryResult *) f->Get(k->GetName());
4952  // For backward compatibility
4953  if (pqr && fProtocol < 13) {
4954  TDSet *d = 0;
4955  TObject *o = 0;
4956  TIter nxi(pqr->GetInputList());
4957  while ((o = nxi()))
4958  if ((d = dynamic_cast<TDSet *>(o)))
4959  break;
4960  d->SetWriteV3(kTRUE);
4961  }
4962  if (pqr) {
4963 
4964  // Message for the client
4965  Float_t qsz = (Float_t) f->GetSize();
4966  Int_t ilb = 0;
4967  static const char *clb[4] = { "bytes", "KB", "MB", "GB" };
4968  while (qsz > 1000. && ilb < 3) {
4969  qsz /= 1000.;
4970  ilb++;
4971  }
4972  SendAsynMessage(TString::Format("%s: sending result of %s:%s (%.1f %s)",
4973  fPrefix.Data(), pqr->GetTitle(), pqr->GetName(),
4974  qsz, clb[ilb]));
4976  } else {
4977  Info("HandleRetrieve",
4978  "query not found in file %s",fout.Data());
4979  // Notify not found
4981  }
4982  break;
4983  }
4984  }
4985  f->Close();
4986  delete f;
4987  } else {
4988  Info("HandleRetrieve",
4989  "file cannot be open (%s)",fout.Data());
4990  // Notify not found
4992  return;
4993  }
4994 
4995  // Done
4996  return;
4997 }
4998 
4999 ////////////////////////////////////////////////////////////////////////////////
5000 /// Handle lib, inc search paths modification request
5001 
5003 {
5004  TString type;
5005  Bool_t add;
5006  TString path;
5007  Int_t rc = 1;
5008  (*mess) >> type >> add >> path;
5009  if (mess->BufferSize() > mess->Length()) (*mess) >> rc;
5010 
5011  // Check type of action
5012  if ((type != "lib") && (type != "inc")) {
5013  Error("HandleLibIncPath","unknown action type: %s", type.Data());
5014  return rc;
5015  }
5016 
5017  // Separators can be either commas or blanks
5018  path.ReplaceAll(","," ");
5019 
5020  // Decompose lists
5021  TObjArray *op = 0;
5022  if (path.Length() > 0 && path != "-") {
5023  if (!(op = path.Tokenize(" "))) {
5024  Error("HandleLibIncPath","decomposing path %s", path.Data());
5025  return rc;
5026  }
5027  }
5028 
5029  if (add) {
5030 
5031  if (type == "lib") {
5032 
5033  // Add libs
5034  TIter nxl(op, kIterBackward);
5035  TObjString *lib = 0;
5036  while ((lib = (TObjString *) nxl())) {
5037  // Expand path
5038  TString xlib = lib->GetName();
5039  gSystem->ExpandPathName(xlib);
5040  // Add to the dynamic lib search path if it exists and can be read