ROOT  6.07/01
Reference Guide
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
TProofLite.cxx
Go to the documentation of this file.
1 // @(#)root/proof:$Id: 7735e42a1b96a9f40ae76bd884acac883a178dee $
2 // Author: G. Ganis March 2008
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 /** \class TProofLite
13 \ingroup proofkernel
14 
15 This class starts a PROOF session on the local machine: no daemons,
16 client and master merged, communications via UNIX-like sockets.
17 By default the number of workers started is NumberOfCores+1; a
18 different number can be forced on construction.
19 
20 */
21 
22 #include "TProofLite.h"
23 
24 #ifdef WIN32
25 # include <io.h>
26 # include "snprintf.h"
27 #endif
28 #include "RConfigure.h"
29 #include "TDSet.h"
30 #include "TEnv.h"
31 #include "TError.h"
32 #include "TFile.h"
33 #include "TFileCollection.h"
34 #include "TFileInfo.h"
35 #include "THashList.h"
36 #include "TMessage.h"
37 #include "TMonitor.h"
38 #include "TObjString.h"
39 #include "TPluginManager.h"
40 #include "TDataSetManager.h"
41 #include "TDataSetManagerFile.h"
42 #include "TParameter.h"
43 #include "TPRegexp.h"
44 #include "TProofQueryResult.h"
45 #include "TProofServ.h"
46 #include "TQueryResultManager.h"
47 #include "TROOT.h"
48 #include "TServerSocket.h"
49 #include "TSlave.h"
50 #include "TSortedList.h"
51 #include "TTree.h"
52 #include "TVirtualProofPlayer.h"
53 #include "TSelector.h"
54 
56 
57 Int_t TProofLite::fgWrksMax = -2; // Unitialized max number of workers
58 
59 ////////////////////////////////////////////////////////////////////////////////
60 /// Create a PROOF environment. Starting PROOF involves either connecting
61 /// to a master server, which in turn will start a set of slave servers, or
62 /// directly starting as master server (if master = ""). Masterurl is of
63 /// the form: [proof[s]://]host[:port]. Conffile is the name of the config
64 /// file describing the remote PROOF cluster (this argument alows you to
65 /// describe different cluster configurations).
66 /// The default is proof.conf. Confdir is the directory where the config
67 /// file and other PROOF related files are (like motd and noproof files).
68 /// Loglevel is the log level (default = 1). User specified custom config
69 /// files will be first looked for in $HOME/.conffile.
70 
71 TProofLite::TProofLite(const char *url, const char *conffile, const char *confdir,
72  Int_t loglevel, const char *alias, TProofMgr *mgr)
73 {
74  fUrl.SetUrl(url);
75 
76  // Default initializations
77  fServSock = 0;
78  fCacheLock = 0;
79  fQueryLock = 0;
80  fQMgr = 0;
81  fDataSetManager = 0;
82  fDataSetStgRepo = 0;
83  fReInvalid = new TPMERegexp("[^A-Za-z0-9._-]");
84  InitMembers();
85 
86  // This may be needed during init
87  fManager = mgr;
88 
89  // Default server type
90  fServType = TProofMgr::kProofLite;
91 
92  // Default query mode
93  fQueryMode = kSync;
94 
95  // Client and master are merged
96  fMasterServ = kTRUE;
97  if (fManager) SetBit(TProof::kIsClient);
98  SetBit(TProof::kIsMaster);
99 
100  // Flag that we are a client
101  if (!gSystem->Getenv("ROOTPROOFCLIENT")) gSystem->Setenv("ROOTPROOFCLIENT","");
102 
103  // Protocol and Host
104  fUrl.SetProtocol("proof");
105  fUrl.SetHost("__lite__");
106  fUrl.SetPort(1093);
107 
108  // User
109  if (strlen(fUrl.GetUser()) <= 0) {
110  // Get user logon name
112  if (pw) {
113  fUrl.SetUser(pw->fUser);
114  delete pw;
115  }
116  }
117  fMaster = gSystem->HostName();
118 
119  // Analysise the conffile field
120  ParseConfigField(conffile);
121 
122  // Determine the number of workers giving priority to users request.
123  // Otherwise use the system information, if available, or just start
124  // the minimal number, i.e. 2 .
125  if ((fNWorkers = GetNumberOfWorkers(url)) > 0) {
126 
127  TString stup;
128  if (gProofServ) {
129  Int_t port = gEnv->GetValue("ProofServ.XpdPort", 1093);
130  stup.Form("%s @ %s:%d ", gProofServ->GetOrdinal(), gSystem->HostName(), port);
131  }
132  Printf(" +++ Starting PROOF-Lite %swith %d workers +++", stup.Data(), fNWorkers);
133  // Init the session now
134  Init(url, conffile, confdir, loglevel, alias);
135  }
136 
137  // For final cleanup
138  if (!gROOT->GetListOfProofs()->FindObject(this))
139  gROOT->GetListOfProofs()->Add(this);
140 
141  // Still needed by the packetizers: needs to be changed
142  gProof = this;
143 }
144 
145 ////////////////////////////////////////////////////////////////////////////////
146 /// Start the PROOF environment. Starting PROOF involves either connecting
147 /// to a master server, which in turn will start a set of slave servers, or
148 /// directly starting as master server (if master = ""). For a description
149 /// of the arguments see the TProof ctor. Returns the number of started
150 /// master or slave servers, returns 0 in case of error, in which case
151 /// fValid remains false.
152 
153 Int_t TProofLite::Init(const char *, const char *conffile,
154  const char *confdir, Int_t loglevel, const char *)
155 {
157 
158  fValid = kFALSE;
159 
160  // Connected to terminal?
161  fTty = (isatty(0) == 0 || isatty(1) == 0) ? kFALSE : kTRUE;
162 
163  if (TestBit(TProof::kIsMaster)) {
164  // Fill default conf file and conf dir
165  if (!conffile || !conffile[0])
167  if (!confdir || !confdir[0])
169  } else {
170  fConfDir = confdir;
171  fConfFile = conffile;
172  }
173 
174  // The sandbox for this session
175  if (CreateSandbox() != 0) {
176  Error("Init", "could not create/assert sandbox for this session");
177  return 0;
178  }
179 
180  // UNIX path for communication with workers
181  TString sockpathdir = gEnv->GetValue("ProofLite.SockPathDir", gSystem->TempDirectory());
182  if (sockpathdir.IsNull()) sockpathdir = gSystem->TempDirectory();
183  if (sockpathdir(sockpathdir.Length()-1) == '/') sockpathdir.Remove(sockpathdir.Length()-1);
184  fSockPath.Form("%s/plite-%d", sockpathdir.Data(), gSystem->GetPid());
185  if (fSockPath.Length() > 104) {
186  // Sort of hardcoded limit length for Unix systems
187  Error("Init", "Unix socket path '%s' is too long (%d bytes):",
189  Error("Init", "use 'ProofLite.SockPathDir' to create it under a directory different"
190  " from '%s'", sockpathdir.Data());
191  return 0;
192  }
193 
194  fLogLevel = loglevel;
197  fImage = "<local>";
198  fIntHandler = 0;
199  fStatus = 0;
200  fRecvMessages = new TList;
202  fSlaveInfo = 0;
203  fChains = new TList;
204  fAvailablePackages = 0;
205  fEnabledPackages = 0;
207  fInputData = 0;
209 
212 
213  // Timeout for some collect actions
214  fCollectTimeout = gEnv->GetValue("Proof.CollectTimeout", -1);
215 
216  // Should the workers be started dynamically; default: no
218  fDynamicStartupStep = -1;
219  fDynamicStartupNMax = -1;
220  TString dynconf = gEnv->GetValue("Proof.SimulateDynamicStartup", "");
221  if (dynconf.Length() > 0) {
223  fLastPollWorkers_s = time(0);
224  // Extract parameters
225  Int_t from = 0;
226  TString p;
227  if (dynconf.Tokenize(p, from, ":"))
228  if (p.IsDigit()) fDynamicStartupStep = p.Atoi();
229  if (dynconf.Tokenize(p, from, ":"))
230  if (p.IsDigit()) fDynamicStartupNMax = p.Atoi();
231  }
232 
233 
234  fProgressDialog = 0;
236 
237  // Client logging of messages from the workers
238  fRedirLog = kFALSE;
239  if (TestBit(TProof::kIsClient)) {
240  fLogFileName = Form("%s/session-%s.log", fWorkDir.Data(), GetName());
241  if ((fLogFileW = fopen(fLogFileName.Data(), "w")) == 0)
242  Error("Init", "could not create temporary logfile %s", fLogFileName.Data());
243  if ((fLogFileR = fopen(fLogFileName.Data(), "r")) == 0)
244  Error("Init", "could not open logfile %s for reading", fLogFileName.Data());
245  }
247 
250  TString(fCacheDir).ReplaceAll("/","%").Data()));
251 
252  // Create 'queries' locker instance and lock it
255  TString(fQueryDir).ReplaceAll("/","%").Data()));
256  fQueryLock->Lock();
257  // Create the query manager
260 
261  // Apply quotas, if any
262  Int_t maxq = gEnv->GetValue("ProofLite.MaxQueriesSaved", 10);
263  if (fQMgr && fQMgr->ApplyMaxQueries(maxq) != 0)
264  Warning("Init", "problems applying fMaxQueries");
265 
266  if (InitDataSetManager() != 0)
267  Warning("Init", "problems initializing the dataset manager");
268 
269  // Status of cluster
270  fNotIdle = 0;
271 
272  // Query type
273  fSync = kTRUE;
274 
275  // List of queries
276  fQueries = 0;
277  fOtherQueries = 0;
278  fDrawQueries = 0;
279  fMaxDrawQueries = 1;
280  fSeqNum = 0;
281 
282  // Remote ID of the session
283  fSessionID = -1;
284 
285  // Part of active query
286  fWaitingSlaves = 0;
287 
288  // Make remote PROOF player
289  fPlayer = 0;
290  MakePlayer("lite");
291 
292  fFeedback = new TList;
293  fFeedback->SetOwner();
294  fFeedback->SetName("FeedbackList");
296 
297  // Sort workers by descending performance index
299  fActiveSlaves = new TList;
300  fInactiveSlaves = new TList;
301  fUniqueSlaves = new TList;
302  fAllUniqueSlaves = new TList;
303  fNonUniqueMasters = new TList;
304  fBadSlaves = new TList;
305  fAllMonitor = new TMonitor;
306  fActiveMonitor = new TMonitor;
307  fUniqueMonitor = new TMonitor;
309  fCurrentMonitor = 0;
310  fServSock = 0;
311 
314 
315  // Control how to start the workers; copy-on-write (fork) is *very*
316  // experimental and available on Unix only.
318  if (gEnv->GetValue("ProofLite.ForkStartup", 0) != 0) {
319 #ifndef WIN32
321 #else
322  Warning("Init", "fork-based workers startup is not available on Windows - ignoring");
323 #endif
324  }
325 
326  fPackageLock = 0;
328  fLoadedMacros = 0;
330  if (TestBit(TProof::kIsClient)) {
331 
332  // List of directories where to look for global packages
333  TString globpack = gEnv->GetValue("Proof.GlobalPackageDirs","");
334  if (globpack.Length() > 0) {
335  Int_t ng = 0;
336  Int_t from = 0;
337  TString ldir;
338  while (globpack.Tokenize(ldir, from, ":")) {
340  if (gSystem->AccessPathName(ldir, kReadPermission)) {
341  Warning("Init", "directory for global packages %s does not"
342  " exist or is not readable", ldir.Data());
343  } else {
344  // Add to the list, key will be "G<ng>", i.e. "G0", "G1", ...
345  TString key = Form("G%d", ng++);
346  if (!fGlobalPackageDirList) {
349  }
350  fGlobalPackageDirList->Add(new TNamed(key,ldir));
351  }
352  }
353  }
354 
355  TString lockpath(fPackageDir);
356  lockpath.ReplaceAll("/", "%");
358  fPackageLock = new TProofLockPath(lockpath.Data());
359 
361  fEnabledPackagesOnClient->SetOwner();
362  }
363 
364  // Start workers
365  if (SetupWorkers(0) != 0) {
366  Error("Init", "problems setting up workers");
367  return 0;
368  }
369 
370  // we are now properly initialized
371  fValid = kTRUE;
372 
373  // De-activate monitor (will be activated in Collect)
375 
376  // By default go into parallel mode
377  GoParallel(-1, kFALSE);
378 
379  // Send relevant initial state to slaves
381 
382  SetActive(kFALSE);
383 
384  if (IsValid()) {
385  // Activate input handler
387  // Set PROOF to running state
389  }
390  // We register the session as a socket so that cleanup is done properly
392  gROOT->GetListOfSockets()->Add(this);
393 
394  AskParallel();
395 
396  return fActiveSlaves->GetSize();
397 }
398 ////////////////////////////////////////////////////////////////////////////////
399 /// Destructor
400 
402 {
403  // Shutdown the workers
404  RemoveWorkers(0);
405 
406  if (!(fQMgr && fQMgr->Queries() && fQMgr->Queries()->GetSize())) {
407  // needed in case fQueryDir is on NFS ?!
408  gSystem->MakeDirectory(fQueryDir+"/.delete");
409  gSystem->Exec(Form("%s %s", kRM, fQueryDir.Data()));
410  }
411 
412  // Remove lock file
413  if (fQueryLock) {
415  fQueryLock->Unlock();
416  }
417 
421 
422  // Cleanup the socket
425 }
426 
427 ////////////////////////////////////////////////////////////////////////////////
428 /// Static method to determine the number of workers giving priority to users request.
429 /// Otherwise use the system information, if available, or just start
430 /// the minimal number, i.e. 2 .
431 
433 {
434  Bool_t notify = kFALSE;
435  if (fgWrksMax == -2) {
436  // Find the max number of workers, if any
437  TString sysname = "system.rootrc";
438 #ifdef ROOTETCDIR
439  char *s = gSystem->ConcatFileName(ROOTETCDIR, sysname);
440 #else
441  TString etc = gRootDir;
442 #ifdef WIN32
443  etc += "\\etc";
444 #else
445  etc += "/etc";
446 #endif
447  char *s = gSystem->ConcatFileName(etc, sysname);
448 #endif
449  TEnv sysenv(0);
450  sysenv.ReadFile(s, kEnvGlobal);
451  fgWrksMax = sysenv.GetValue("ProofLite.MaxWorkers", -1);
452  // Notify once the user if its will is changed
453  notify = kTRUE;
454  if (s) delete[] s;
455  }
456  if (fgWrksMax == 0) {
457  ::Error("TProofLite::GetNumberOfWorkers",
458  "PROOF-Lite disabled by the system administrator: sorry!");
459  return 0;
460  }
461 
462  TString nw;
463  Int_t nWorkers = -1;
464  Bool_t urlSetting = kFALSE;
465  if (url && strlen(url)) {
466  nw = url;
467  Int_t in = nw.Index("workers=");
468  if (in != kNPOS) {
469  nw.Remove(0, in + strlen("workers="));
470  while (!nw.IsDigit())
471  nw.Remove(nw.Length()-1);
472  if (!nw.IsNull()) {
473  if ((nWorkers = nw.Atoi()) <= 0) {
474  ::Warning("TProofLite::GetNumberOfWorkers",
475  "number of workers specified by 'workers='"
476  " is non-positive: using default");
477  } else {
478  urlSetting = kFALSE;
479  }
480  }
481  }
482  }
483  if (!urlSetting && fgProofEnvList) {
484  // Check PROOF_NWORKERS
485  TNamed *nm = (TNamed *) fgProofEnvList->FindObject("PROOF_NWORKERS");
486  if (nm) {
487  nw = nm->GetTitle();
488  if (nw.IsDigit()) {
489  if ((nWorkers = nw.Atoi()) == 0) {
490  ::Warning("TProofLite::GetNumberOfWorkers",
491  "number of workers specified by 'workers='"
492  " is non-positive: using default");
493  }
494  }
495  }
496  }
497  if (nWorkers <= 0) {
498  nWorkers = gEnv->GetValue("ProofLite.Workers", -1);
499  if (nWorkers <= 0) {
500  SysInfo_t si;
501  if (gSystem->GetSysInfo(&si) == 0 && si.fCpus > 2) {
502  nWorkers = si.fCpus;
503  } else {
504  // Two workers by default
505  nWorkers = 2;
506  }
507  if (notify) notify = kFALSE;
508  }
509  }
510  // Apply the max, if any
511  if (fgWrksMax > 0 && fgWrksMax < nWorkers) {
512  if (notify)
513  ::Warning("TProofLite::GetNumberOfWorkers", "number of PROOF-Lite workers limited by"
514  " the system administrator to %d", fgWrksMax);
515  nWorkers = fgWrksMax;
516  }
517 
518  // Done
519  return nWorkers;
520 }
521 
522 ////////////////////////////////////////////////////////////////////////////////
523 /// Start up PROOF workers.
524 
526 {
527  // Create server socket on the assigned UNIX sock path
528  if (!fServSock) {
529  if ((fServSock = new TServerSocket(fSockPath))) {
531  // Remove from the list so that cleanup can be done in the correct order
532  gROOT->GetListOfSockets()->Remove(fServSock);
533  }
534  }
535  if (!fServSock || !fServSock->IsValid()) {
536  Error("SetupWorkers",
537  "unable to create server socket for internal communications");
539  return -1;
540  }
541 
542  // Create a monitor and add the socket to it
543  TMonitor *mon = new TMonitor;
544  mon->Add(fServSock);
545 
546  TList started;
547  TSlave *wrk = 0;
548  Int_t nWrksDone = 0, nWrksTot = -1;
549  TString fullord;
550 
551  if (opt == 0) {
552  nWrksTot = fForkStartup ? 1 : fNWorkers;
553  // Now we create the worker applications which will call us back to finalize
554  // the setup
555  Int_t ord = 0;
556  for (; ord < nWrksTot; ord++) {
557 
558  // Ordinal for this worker server
559  const char *o = (gProofServ) ? gProofServ->GetOrdinal() : "0";
560  fullord.Form("%s.%d", o, ord);
561 
562  // Create environment files
563  SetProofServEnv(fullord);
564 
565  // Create worker server and add to the list
566  if ((wrk = CreateSlave("lite", fullord, 100, fImage, fWorkDir)))
567  started.Add(wrk);
568 
569  // Notify
570  NotifyStartUp("Opening connections to workers", ++nWrksDone, nWrksTot);
571 
572  } //end of worker loop
573  } else {
574  if (!fForkStartup) {
575  Warning("SetupWorkers", "standard startup: workers already started");
576  return -1;
577  }
578  nWrksTot = fNWorkers - 1;
579  // Now we create the worker applications which will call us back to finalize
580  // the setup
581  TString clones;
582  Int_t ord = 0;
583  for (; ord < nWrksTot; ord++) {
584 
585  // Ordinal for this worker server
586  const char *o = (gProofServ) ? gProofServ->GetOrdinal() : "0";
587  fullord.Form("%s.%d", o, ord + 1);
588  if (!clones.IsNull()) clones += " ";
589  clones += fullord;
590 
591  // Create worker server and add to the list
592  if ((wrk = CreateSlave("lite", fullord, -1, fImage, fWorkDir)))
593  started.Add(wrk);
594 
595  // Notify
596  NotifyStartUp("Opening connections to workers", ++nWrksDone, nWrksTot);
597 
598  } //end of worker loop
599 
600  // Send the request
602  m << clones;
603  Broadcast(m, kActive);
604  }
605 
606  // Wait for call backs
607  nWrksDone = 0;
608  nWrksTot = started.GetSize();
609  Int_t nSelects = 0;
610  Int_t to = gEnv->GetValue("ProofLite.StartupTimeOut", 5) * 1000;
611  while (started.GetSize() > 0 && nSelects < nWrksTot) {
612 
613  // Wait for activity on the socket for max 5 secs
614  TSocket *xs = mon->Select(to);
615 
616  // Count attempts and check
617  nSelects++;
618  if (xs == (TSocket *) -1) continue;
619 
620  // Get the connection
621  TSocket *s = fServSock->Accept();
622  if (s && s->IsValid()) {
623  // Receive ordinal
624  TMessage *msg = 0;
625  if (s->Recv(msg) < 0) {
626  Warning("SetupWorkers", "problems receiving message from accepted socket!");
627  } else {
628  if (msg) {
629  TString ord;
630  *msg >> ord;
631  // Find who is calling back
632  if ((wrk = (TSlave *) started.FindObject(ord))) {
633  // Remove it from the started list
634  started.Remove(wrk);
635 
636  // Assign tis socket the selected worker
637  wrk->SetSocket(s);
638  // Remove socket from global TROOT socket list. Only the TProof object,
639  // representing all worker sockets, will be added to this list. This will
640  // ensure the correct termination of all proof servers in case the
641  // root session terminates.
643  gROOT->GetListOfSockets()->Remove(s);
644  }
645  if (wrk->IsValid()) {
646  // Set the input handler
647  wrk->SetInputHandler(new TProofInputHandler(this, wrk->GetSocket()));
648  // Set fParallel to 1 for workers since they do not
649  // report their fParallel with a LOG_DONE message
650  wrk->fParallel = 1;
651  // Finalize setup of the server
652  wrk->SetupServ(TSlave::kSlave, 0);
653  }
654 
655  // Monitor good workers
656  fSlaves->Add(wrk);
657  if (wrk->IsValid()) {
658  if (opt == 1) fActiveSlaves->Add(wrk);
659  fAllMonitor->Add(wrk->GetSocket());
660  // Record also in the list for termination
661  if (startedWorkers) startedWorkers->Add(wrk);
662  // Notify startup operations
663  NotifyStartUp("Setting up worker servers", ++nWrksDone, nWrksTot);
664  } else {
665  // Flag as bad
666  fBadSlaves->Add(wrk);
667  }
668  }
669  } else {
670  Warning("SetupWorkers", "received empty message from accepted socket!");
671  }
672  }
673  }
674  }
675 
676  // Cleanup the monitor and the server socket
677  mon->DeActivateAll();
678  delete mon;
679 
680  // Create Progress dialog, if needed
681  if (!gROOT->IsBatch() && !fProgressDialog) {
682  if ((fProgressDialog =
683  gROOT->GetPluginManager()->FindHandler("TProofProgressDialog")))
684  if (fProgressDialog->LoadPlugin() == -1)
685  fProgressDialog = 0;
686  }
687 
688  if (opt == 1) {
689  // Collect replies
690  Collect(kActive);
691  // Update group view
692  SendGroupView();
693  // By default go into parallel mode
694  SetParallel(-1, 0);
695  }
696  // Done
697  return 0;
698 }
699 
700 ////////////////////////////////////////////////////////////////////////////////
701 /// Notify setting-up operation message
702 
703 void TProofLite::NotifyStartUp(const char *action, Int_t done, Int_t tot)
704 {
705  Int_t frac = (Int_t) (done*100.)/tot;
706  char msg[512] = {0};
707  if (frac >= 100) {
708  snprintf(msg, 512, "%s: OK (%d workers) \n",
709  action, tot);
710  } else {
711  snprintf(msg, 512, "%s: %d out of %d (%d %%)\r",
712  action, done, tot, frac);
713  }
714  fprintf(stderr,"%s", msg);
715 }
716 
717 ////////////////////////////////////////////////////////////////////////////////
718 /// Create environment files for worker 'ord'
719 
721 {
722  // Check input
723  if (!ord || strlen(ord) <= 0) {
724  Error("SetProofServEnv", "ordinal string undefined");
725  return -1;
726  }
727 
728  // ROOT env file
729  TString rcfile(Form("%s/worker-%s.rootrc", fWorkDir.Data(), ord));
730  FILE *frc = fopen(rcfile.Data(), "w");
731  if (!frc) {
732  Error("SetProofServEnv", "cannot open rc file %s", rcfile.Data());
733  return -1;
734  }
735 
736  // The session working dir depends on the role
737  fprintf(frc,"# The session working dir\n");
738  fprintf(frc,"ProofServ.SessionDir: %s/worker-%s\n", fWorkDir.Data(), ord);
739 
740  // The session unique tag
741  fprintf(frc,"# Session tag\n");
742  fprintf(frc,"ProofServ.SessionTag: %s\n", GetName());
743 
744  // Log / Debug level
745  fprintf(frc,"# Proof Log/Debug level\n");
746  fprintf(frc,"Proof.DebugLevel: %d\n", gDebug);
747 
748  // Ordinal number
749  fprintf(frc,"# Ordinal number\n");
750  fprintf(frc,"ProofServ.Ordinal: %s\n", ord);
751 
752  // ROOT Version tag
753  fprintf(frc,"# ROOT Version tag\n");
754  fprintf(frc,"ProofServ.RootVersionTag: %s\n", gROOT->GetVersion());
755 
756  // Work dir
757  TString sandbox = fSandbox;
758  if (GetSandbox(sandbox, kFALSE, "ProofServ.Sandbox") != 0)
759  Warning("SetProofServEnv", "problems getting sandbox string for worker");
760  fprintf(frc,"# Users sandbox\n");
761  fprintf(frc, "ProofServ.Sandbox: %s\n", sandbox.Data());
762 
763  // Cache dir
764  fprintf(frc,"# Users cache\n");
765  fprintf(frc, "ProofServ.CacheDir: %s\n", fCacheDir.Data());
766 
767  // Package dir
768  fprintf(frc,"# Users packages\n");
769  fprintf(frc, "ProofServ.PackageDir: %s\n", fPackageDir.Data());
770 
771  // Image
772  fprintf(frc,"# Server image\n");
773  fprintf(frc, "ProofServ.Image: %s\n", fImage.Data());
774 
775  // Set Open socket
776  fprintf(frc,"# Open socket\n");
777  fprintf(frc, "ProofServ.OpenSock: %s\n", fSockPath.Data());
778 
779  // Client Protocol
780  fprintf(frc,"# Client Protocol\n");
781  fprintf(frc, "ProofServ.ClientVersion: %d\n", kPROOF_Protocol);
782 
783  // ROOT env file created
784  fclose(frc);
785 
786  // System env file
787  TString envfile(Form("%s/worker-%s.env", fWorkDir.Data(), ord));
788  FILE *fenv = fopen(envfile.Data(), "w");
789  if (!fenv) {
790  Error("SetProofServEnv", "cannot open env file %s", envfile.Data());
791  return -1;
792  }
793  // ROOTSYS
794 #ifdef R__HAVE_CONFIG
795  fprintf(fenv, "export ROOTSYS=%s\n", ROOTPREFIX);
796 #else
797  fprintf(fenv, "export ROOTSYS=%s\n", gSystem->Getenv("ROOTSYS"));
798 #endif
799  // Conf dir
800 #ifdef R__HAVE_CONFIG
801  fprintf(fenv, "export ROOTCONFDIR=%s\n", ROOTETCDIR);
802 #else
803  fprintf(fenv, "export ROOTCONFDIR=%s\n", gSystem->Getenv("ROOTSYS"));
804 #endif
805  // TMPDIR
806  fprintf(fenv, "export TMPDIR=%s\n", gSystem->TempDirectory());
807  // Log file in the log dir
808  TString logfile(Form("%s/worker-%s.log", fWorkDir.Data(), ord));
809  fprintf(fenv, "export ROOTPROOFLOGFILE=%s\n", logfile.Data());
810  // RC file
811  fprintf(fenv, "export ROOTRCFILE=%s\n", rcfile.Data());
812  // ROOT version tag (needed in building packages)
813  fprintf(fenv, "export ROOTVERSIONTAG=%s\n", gROOT->GetVersion());
814  // This flag can be used to identify the type of worker; for example, in BUILD.sh or SETUP.C ...
815  fprintf(fenv, "export ROOTPROOFLITE=%d\n", fNWorkers);
816  // Local files are on the local file system
817  fprintf(fenv, "export LOCALDATASERVER=\"file://\"\n");
818  // Set the user envs
819  if (fgProofEnvList) {
820  TString namelist;
821  TIter nxenv(fgProofEnvList);
822  TNamed *env = 0;
823  while ((env = (TNamed *)nxenv())) {
824  TString senv(env->GetTitle());
825  ResolveKeywords(senv, ord, logfile.Data());
826  fprintf(fenv, "export %s=%s\n", env->GetName(), senv.Data());
827  if (namelist.Length() > 0)
828  namelist += ',';
829  namelist += env->GetName();
830  }
831  fprintf(fenv, "export PROOF_ALLVARS=%s\n", namelist.Data());
832  }
833 
834  // System env file created
835  fclose(fenv);
836 
837  // Done
838  return 0;
839 }
840 
841 ////////////////////////////////////////////////////////////////////////////////
842 /// Resolve some keywords in 's'
843 /// <logfilewrk>, <user>, <rootsys>, <cpupin>
844 
846  const char *logfile)
847 {
848  if (!logfile) return;
849 
850  // Log file
851  if (s.Contains("<logfilewrk>") && logfile) {
852  TString lfr(logfile);
853  if (lfr.EndsWith(".log")) lfr.Remove(lfr.Last('.'));
854  s.ReplaceAll("<logfilewrk>", lfr.Data());
855  }
856 
857  // user
858  if (gSystem->Getenv("USER") && s.Contains("<user>")) {
859  s.ReplaceAll("<user>", gSystem->Getenv("USER"));
860  }
861 
862  // rootsys
863  if (gSystem->Getenv("ROOTSYS") && s.Contains("<rootsys>")) {
864  s.ReplaceAll("<rootsys>", gSystem->Getenv("ROOTSYS"));
865  }
866 
867  // cpupin: pin to this CPU num (from 0 to ncpus-1)
868  if (s.Contains("<cpupin>")) {
869  TString o = ord;
870  Int_t n = o.Index('.');
871  if (n != kNPOS) {
872 
873  o.Remove(0, n+1);
874  n = o.Atoi(); // n is ord
875 
876  TString cpuPinList;
877  {
878  const TList *envVars = GetEnvVars();
879  TNamed *var;
880  if (envVars) {
881  var = dynamic_cast<TNamed *>(envVars->FindObject("PROOF_SLAVE_CPUPIN_ORDER"));
882  if (var) cpuPinList = var->GetTitle();
883  }
884  }
885 
886  UInt_t nCpus = 1;
887  {
888  SysInfo_t si;
889  if (gSystem->GetSysInfo(&si) == 0 && (si.fCpus > 0))
890  nCpus = si.fCpus;
891  else nCpus = 1; // fallback
892  }
893 
894  if (cpuPinList.IsNull() || (cpuPinList == "*")) {
895  // Use processors in order
896  n = n % nCpus;
897  }
898  else {
899  // Use processors in user's order
900  // n is now the ordinal, converting to idx
901  n = n % (cpuPinList.CountChar('+')+1);
902  TString tok;
903  Ssiz_t from = 0;
904  for (Int_t i=0; cpuPinList.Tokenize(tok, from, "\\+"); i++) {
905  if (i == n) {
906  n = (tok.Atoi() % nCpus);
907  break;
908  }
909  }
910  }
911 
912  o.Form("%d", n);
913  }
914  else {
915  o = "0"; // should not happen
916  }
917  s.ReplaceAll("<cpupin>", o);
918  }
919 }
920 
921 ////////////////////////////////////////////////////////////////////////////////
922 /// Create the sandbox for this session
923 
925 {
926  // Make sure the sandbox area exist and is writable
927  if (GetSandbox(fSandbox, kTRUE, "ProofLite.Sandbox") != 0) return -1;
928 
929  // Package Dir
930  fPackageDir = gEnv->GetValue("Proof.PackageDir", "");
931  if (fPackageDir.IsNull())
933  if (AssertPath(fPackageDir, kTRUE) != 0) return -1;
934 
935  // Cache Dir
936  fCacheDir = gEnv->GetValue("Proof.CacheDir", "");
937  if (fCacheDir.IsNull())
939  if (AssertPath(fCacheDir, kTRUE) != 0) return -1;
940 
941  // Data Set Dir
942  fDataSetDir = gEnv->GetValue("Proof.DataSetDir", "");
943  if (fDataSetDir.IsNull())
945  if (AssertPath(fDataSetDir, kTRUE) != 0) return -1;
946 
947  // Session unique tag (name of this TProof instance)
948  TString stag;
949  stag.Form("%s-%d-%d", gSystem->HostName(), (int)time(0), gSystem->GetPid());
950  SetName(stag.Data());
951 
952  Int_t subpath = gEnv->GetValue("ProofLite.SubPath", 1);
953  // Subpath for this session in the fSandbox (<sandbox>/path-to-working-dir)
954  TString sessdir;
955  if (subpath != 0) {
956  sessdir = gSystem->WorkingDirectory();
957  sessdir.ReplaceAll(gSystem->HomeDirectory(),"");
958  sessdir.ReplaceAll("/","-");
959  sessdir.Replace(0,1,"/",1);
960  sessdir.Insert(0, fSandbox.Data());
961  } else {
962  // USe the sandbox
963  sessdir = fSandbox;
964  }
965 
966  // Session working and queries dir
967  fWorkDir.Form("%s/session-%s", sessdir.Data(), stag.Data());
968  if (AssertPath(fWorkDir, kTRUE) != 0) return -1;
969 
970  // Create symlink to the last session
971  TString lastsess;
972  lastsess.Form("%s/last-lite-session", sessdir.Data());
973  gSystem->Unlink(lastsess);
974  gSystem->Symlink(fWorkDir, lastsess);
975 
976  // Queries Dir: local to the working dir, unless required differently
977  fQueryDir = gEnv->GetValue("Proof.QueryDir", "");
978  if (fQueryDir.IsNull())
979  fQueryDir.Form("%s/%s", sessdir.Data(), kPROOF_QueryDir);
980  if (AssertPath(fQueryDir, kTRUE) != 0) return -1;
981 
982  // Cleanup old sessions dirs
983  CleanupSandbox();
984 
985  // Done
986  return 0;
987 }
988 
989 ////////////////////////////////////////////////////////////////////////////////
990 /// Print status of PROOF-Lite cluster.
991 
992 void TProofLite::Print(Option_t *option) const
993 {
994  TString ord;
995  if (gProofServ) ord.Form("%s ", gProofServ->GetOrdinal());
996  if (IsParallel())
997  Printf("*** PROOF-Lite cluster %s(parallel mode, %d workers):", ord.Data(), GetParallel());
998  else
999  Printf("*** PROOF-Lite cluster %s(sequential mode)", ord.Data());
1000 
1001  if (gProofServ) {
1002  TString url(gSystem->HostName());
1003  // Add port to URL, if defined
1004  Int_t port = gEnv->GetValue("ProofServ.XpdPort", 1093);
1005  if (port > -1) url.Form("%s:%d",gSystem->HostName(), port);
1006  Printf("URL: %s", url.Data());
1007  } else {
1008  Printf("Host name: %s", gSystem->HostName());
1009  }
1010  Printf("User: %s", GetUser());
1011  TString ver(gROOT->GetVersion());
1012  ver += TString::Format("|%s", gROOT->GetGitCommit());
1013  if (gSystem->Getenv("ROOTVERSIONTAG"))
1014  ver += TString::Format("|%s", gSystem->Getenv("ROOTVERSIONTAG"));
1015  Printf("ROOT version|rev|tag: %s", ver.Data());
1016  Printf("Architecture-Compiler: %s-%s", gSystem->GetBuildArch(),
1018  Printf("Protocol version: %d", GetClientProtocol());
1019  Printf("Working directory: %s", gSystem->WorkingDirectory());
1020  Printf("Communication path: %s", fSockPath.Data());
1021  Printf("Log level: %d", GetLogLevel());
1022  Printf("Number of workers: %d", GetNumberOfSlaves());
1023  Printf("Number of active workers: %d", GetNumberOfActiveSlaves());
1024  Printf("Number of unique workers: %d", GetNumberOfUniqueSlaves());
1025  Printf("Number of inactive workers: %d", GetNumberOfInactiveSlaves());
1026  Printf("Number of bad workers: %d", GetNumberOfBadSlaves());
1027  Printf("Total MB's processed: %.2f", float(GetBytesRead())/(1024*1024));
1028  Printf("Total real time used (s): %.3f", GetRealTime());
1029  Printf("Total CPU time used (s): %.3f", GetCpuTime());
1030  if (TString(option).Contains("a", TString::kIgnoreCase) && GetNumberOfSlaves()) {
1031  Printf("List of workers:");
1032  TIter nextslave(fSlaves);
1033  while (TSlave* sl = dynamic_cast<TSlave*>(nextslave())) {
1034  if (sl->IsValid())
1035  sl->Print(option);
1036  }
1037  }
1038 }
1039 
1040 ////////////////////////////////////////////////////////////////////////////////
1041 /// Create a TProofQueryResult instance for this query.
1042 
1044  Long64_t fst, TDSet *dset,
1045  const char *selec)
1046 {
1047  // Increment sequential number
1048  Int_t seqnum = -1;
1049  if (fQMgr) {
1051  seqnum = fQMgr->SeqNum();
1052  }
1053 
1054  // Create the instance and add it to the list
1055  TProofQueryResult *pqr = new TProofQueryResult(seqnum, opt,
1056  fPlayer->GetInputList(), nent,
1057  fst, dset, selec,
1058  (dset ? dset->GetEntryList() : 0));
1059  // Title is the session identifier
1060  pqr->SetTitle(GetName());
1061 
1062  return pqr;
1063 }
1064 
1065 ////////////////////////////////////////////////////////////////////////////////
1066 /// Set query in running state.
1067 
1069 {
1070  // Record current position in the log file at start
1071  fflush(fLogFileW);
1072  Int_t startlog = lseek(fileno(fLogFileW), (off_t) 0, SEEK_END);
1073 
1074  // Add some header to logs
1075  Printf(" ");
1076  Info("SetQueryRunning", "starting query: %d", pq->GetSeqNum());
1077 
1078  // Build the list of loaded PAR packages
1079  TString parlist = "";
1081  TObjString *os= 0;
1082  while ((os = (TObjString *)nxp())) {
1083  if (parlist.Length() <= 0)
1084  parlist = os->GetName();
1085  else
1086  parlist += Form(";%s",os->GetName());
1087  }
1088 
1089  // Set in running state
1090  pq->SetRunning(startlog, parlist, GetParallel());
1091 
1092  // Bytes and CPU at start (we will calculate the differential at end)
1093  AskStatistics();
1095 }
1096 
1097 ////////////////////////////////////////////////////////////////////////////////
1098 /// Execute the specified drawing action on a data set (TDSet).
1099 /// Event- or Entry-lists should be set in the data set object using
1100 /// TDSet::SetEntryList.
1101 /// Returns -1 in case of error or number of selected events otherwise.
1102 
1103 Long64_t TProofLite::DrawSelect(TDSet *dset, const char *varexp,
1104  const char *selection, Option_t *option,
1106 {
1107  if (!IsValid()) return -1;
1108 
1109  // Make sure that asynchronous processing is not active
1110  if (!IsIdle()) {
1111  Info("DrawSelect","not idle, asynchronous Draw not supported");
1112  return -1;
1113  }
1114  TString opt(option);
1115  Int_t idx = opt.Index("ASYN", 0, TString::kIgnoreCase);
1116  if (idx != kNPOS)
1117  opt.Replace(idx,4,"");
1118 
1119  // Fill the internal variables
1120  fVarExp = varexp;
1121  fSelection = selection;
1122 
1123  return Process(dset, "draw:", opt, nentries, first);
1124 }
1125 
1126 ////////////////////////////////////////////////////////////////////////////////
1127 /// Process a data set (TDSet) using the specified selector (.C) file.
1128 /// Entry- or event-lists should be set in the data set object using
1129 /// TDSet::SetEntryList.
1130 /// The return value is -1 in case of error and TSelector::GetStatus() in
1131 /// in case of success.
1132 
1133 Long64_t TProofLite::Process(TDSet *dset, const char *selector, Option_t *option,
1135 {
1136  // For the time being cannot accept other queries if not idle, even if in async
1137  // mode; needs to set up an event handler to manage that
1138 
1139  TString opt(option), optfb, outfile;
1140  // Enable feedback, if required
1141  if (opt.Contains("fb=") || opt.Contains("feedback=")) SetFeedback(opt, optfb, 0);
1142  // Define output file, either from 'opt' or the default one
1143  if (HandleOutputOptions(opt, outfile, 0) != 0) return -1;
1144 
1145  // Resolve query mode
1146  fSync = (GetQueryMode(opt) == kSync);
1147  if (!fSync) {
1148  Info("Process","asynchronous mode not yet supported in PROOF-Lite");
1149  return -1;
1150  }
1151 
1152  if (!IsIdle()) {
1153  // Notify submission
1154  Info("Process", "not idle: cannot accept queries");
1155  return -1;
1156  }
1157 
1158  // Cleanup old temporary datasets
1159  if (IsIdle() && fRunningDSets && fRunningDSets->GetSize() > 0) {
1161  fRunningDSets->Delete();
1162  }
1163 
1164  if (!IsValid() || !fQMgr || !fPlayer) {
1165  Error("Process", "invalid sesion or query-result manager undefined!");
1166  return -1;
1167  }
1168 
1169  // Make sure that all enabled workers get some work, unless stated
1170  // differently
1171  if (!fPlayer->GetInputList()->FindObject("PROOF_MaxSlavesPerNode"))
1172  SetParameter("PROOF_MaxSlavesPerNode", (Long_t)0);
1173 
1174  Bool_t hasNoData = (!dset || (dset && dset->TestBit(TDSet::kEmpty))) ? kTRUE : kFALSE;
1175 
1176  // If just a name was given to identify the dataset, retrieve it from the
1177  // local files
1178  // Make sure the dataset contains the information needed
1179  TString emsg;
1180  if ((!hasNoData) && dset->GetListOfElements()->GetSize() == 0) {
1181  if (TProof::AssertDataSet(dset, fPlayer->GetInputList(), fDataSetManager, emsg) != 0) {
1182  Error("Process", "from AssertDataSet: %s", emsg.Data());
1183  return -1;
1184  }
1185  if (dset->GetListOfElements()->GetSize() == 0) {
1186  Error("Process", "no files to process!");
1187  return -1;
1188  }
1189  } else if (hasNoData) {
1190  // Check if we are required to process with TPacketizerFile a registered dataset
1191  TNamed *ftp = dynamic_cast<TNamed *>(fPlayer->GetInputList()->FindObject("PROOF_FilesToProcess"));
1192  if (ftp) {
1193  TString dsn(ftp->GetTitle());
1194  if (!dsn.Contains(":") || dsn.BeginsWith("dataset:")) {
1195  dsn.ReplaceAll("dataset:", "");
1196  // Make sure we have something in input and a dataset manager
1197  if (!fDataSetManager) {
1198  emsg.Form("dataset manager not initialized!");
1199  } else {
1200  TFileCollection *fc = 0;
1201  // Get the dataset
1202  if (!(fc = fDataSetManager->GetDataSet(dsn))) {
1203  emsg.Form("requested dataset '%s' does not exists", dsn.Data());
1204  } else {
1205  TMap *fcmap = TProofServ::GetDataSetNodeMap(fc, emsg);
1206  if (fcmap) {
1207  fPlayer->GetInputList()->Remove(ftp);
1208  delete ftp;
1209  fcmap->SetOwner(kTRUE);
1210  fcmap->SetName("PROOF_FilesToProcess");
1211  fPlayer->GetInputList()->Add(fcmap);
1212  }
1213  }
1214  }
1215  if (!emsg.IsNull()) {
1216  Error("HandleProcess", "%s", emsg.Data());
1217  return -1;
1218  }
1219  }
1220  }
1221  }
1222 
1223  TString selec(selector), varexp, selection, objname;
1224  // If a draw query, extract the relevant info
1225  if (selec.BeginsWith("draw:")) {
1226  varexp = fVarExp;
1227  selection = fSelection;
1228  // Decode now the expression
1229  if (fPlayer->GetDrawArgs(varexp, selection, opt, selec, objname) != 0) {
1230  Error("Process", "draw query: error parsing arguments '%s', '%s', '%s'",
1231  varexp.Data(), selection.Data(), opt.Data());
1232  return -1;
1233  }
1234  }
1235 
1236  // Create instance of query results (the data set is added after Process)
1237  TProofQueryResult *pq = MakeQueryResult(nentries, opt, first, 0, selec);
1238 
1239  // Check if queries must be saved into files
1240  // Automatic saving is controlled by ProofLite.AutoSaveQueries
1241  Bool_t savequeries =
1242  (!strcmp(gEnv->GetValue("ProofLite.AutoSaveQueries", "off"), "on")) ? kTRUE : kFALSE;
1243 
1244  // Keep queries in memory and how many (-1 = all, 0 = none, ...)
1245  Int_t memqueries = gEnv->GetValue("ProofLite.MaxQueriesMemory", 1);
1246 
1247  // If not a draw action add the query to the main list
1248  if (!(pq->IsDraw())) {
1249  if (fQMgr->Queries()) {
1250  if (memqueries != 0) fQMgr->Queries()->Add(pq);
1251  if (memqueries >= 0 && fQMgr->Queries()->GetSize() > memqueries) {
1252  // Remove oldest
1253  TObject *qfst = fQMgr->Queries()->First();
1254  fQMgr->Queries()->Remove(qfst);
1255  delete qfst;
1256  }
1257  }
1258  // Also save it to queries dir
1259  if (savequeries) fQMgr->SaveQuery(pq);
1260  }
1261 
1262  // Set the query number
1263  fSeqNum = pq->GetSeqNum();
1264 
1265  // Set in running state
1266  SetQueryRunning(pq);
1267 
1268  // Save to queries dir, if not standard draw
1269  if (!(pq->IsDraw())) {
1270  if (savequeries) fQMgr->SaveQuery(pq);
1271  } else {
1273  }
1274 
1275  // Start or reset the progress dialog
1276  if (!gROOT->IsBatch()) {
1277  Int_t dsz = (dset && dset->GetListOfElements()) ? dset->GetListOfElements()->GetSize() : -1;
1278  if (fProgressDialog &&
1280  if (!fProgressDialogStarted) {
1281  fProgressDialog->ExecPlugin(5, this, selec.Data(), dsz,
1282  first, nentries);
1284  } else {
1285  ResetProgressDialog(selec.Data(), dsz, first, nentries);
1286  }
1287  }
1289  }
1290 
1291  // Add query results to the player lists
1292  if (!(pq->IsDraw()))
1293  fPlayer->AddQueryResult(pq);
1294 
1295  // Set query currently processed
1296  fPlayer->SetCurrentQuery(pq);
1297 
1298  // Make sure the unique query tag is available as TNamed object in the
1299  // input list so that it can be used in TSelectors for monitoring
1300  TNamed *qtag = (TNamed *) fPlayer->GetInputList()->FindObject("PROOF_QueryTag");
1301  if (qtag) {
1302  qtag->SetTitle(Form("%s:%s",pq->GetTitle(),pq->GetName()));
1303  } else {
1304  TObject *o = fPlayer->GetInputList()->FindObject("PROOF_QueryTag");
1305  if (o) fPlayer->GetInputList()->Remove(o);
1306  fPlayer->AddInput(new TNamed("PROOF_QueryTag",
1307  Form("%s:%s",pq->GetTitle(),pq->GetName())));
1308  }
1309 
1310  // Set PROOF to running state
1312 
1313  // deactivate the default application interrupt handler
1314  // ctrl-c's will be forwarded to PROOF to stop the processing
1315  TSignalHandler *sh = 0;
1316  if (fSync) {
1317  if (gApplication)
1319  }
1320 
1321  // Make sure we get a fresh result
1322  fOutputList.Clear();
1323 
1324  // Start the additional workers now if using fork-based startup
1325  TList *startedWorkers = 0;
1326  if (fForkStartup) {
1327  startedWorkers = new TList;
1328  startedWorkers->SetOwner(kFALSE);
1329  SetupWorkers(1, startedWorkers);
1330  }
1331 
1332  // This is the end of preparation
1333  fQuerySTW.Reset();
1334 
1335  Long64_t rv = 0;
1336  if (!(pq->IsDraw())) {
1337  if (selector && strlen(selector)) {
1338  rv = fPlayer->Process(dset, selec, opt, nentries, first);
1339  } else {
1340  rv = fPlayer->Process(dset, fSelector, opt, nentries, first);
1341  }
1342  } else {
1343  rv = fPlayer->DrawSelect(dset, varexp, selection, opt, nentries, first);
1344  }
1345 
1346  // This is the end of merging
1347  fQuerySTW.Stop();
1348  Float_t rt = fQuerySTW.RealTime();
1349  // Update the query content
1350  TQueryResult *qr = GetQueryResult();
1351  if (qr) {
1352  qr->SetTermTime(rt);
1353  // Preparation time is always null in PROOF-Lite
1354  }
1355 
1356  // Disable feedback, if required
1357  if (!optfb.IsNull()) SetFeedback(opt, optfb, 1);
1358 
1359  if (fSync) {
1360 
1361  // Terminate additional workers if using fork-based startup
1362  if (fForkStartup && startedWorkers) {
1363  RemoveWorkers(startedWorkers);
1364  SafeDelete(startedWorkers);
1365  }
1366 
1367  // reactivate the default application interrupt handler
1368  if (sh)
1370 
1371  // Return number of events processed
1374  ? kTRUE : kFALSE;
1375  if (abort) fPlayer->StopProcess(kTRUE);
1376  Emit("StopProcess(Bool_t)", abort);
1377  }
1378 
1379  // In PROOFLite this has to be done once only in TProofLite::Process
1381  // If the last object, notify the GUI that the result arrived
1382  QueryResultReady(Form("%s:%s", pq->GetTitle(), pq->GetName()));
1383  // Processing is over
1384  UpdateDialog();
1385 
1386  // Save the data set into the TQueryResult (should be done after Process to avoid
1387  // improper deletion during collection)
1388  if (rv == 0 && dset && !dset->TestBit(TDSet::kEmpty) && pq->GetInputList()) {
1389  pq->GetInputList()->Add(dset);
1390  if (dset->GetEntryList())
1391  pq->GetInputList()->Add(dset->GetEntryList());
1392  }
1393 
1394  // Register any dataset produced during this processing, if required
1396  TNamed *psr = (TNamed *) fPlayer->GetOutputList()->FindObject("PROOFSERV_RegisterDataSet");
1397  if (psr) {
1398  TString err;
1400  fPlayer->GetOutputList(), fDataSetManager, err) != 0)
1401  Warning("ProcessNext", "problems registering produced datasets: %s", err.Data());
1402  fPlayer->GetOutputList()->Remove(psr);
1403  delete psr;
1404  }
1405  }
1406 
1407  // Complete filling of the TQueryResult instance
1408  AskStatistics();
1409  if (!(pq->IsDraw())) {
1410  if (fQMgr->FinalizeQuery(pq, this, fPlayer)) {
1411  if (savequeries) fQMgr->SaveQuery(pq, -1);
1412  }
1413  }
1414 
1415  // Remove aborted queries from the list
1418  if (fQMgr) fQMgr->RemoveQuery(pq);
1419  } else {
1420  // If the last object, notify the GUI that the result arrived
1421  QueryResultReady(Form("%s:%s", pq->GetTitle(), pq->GetName()));
1422  // Keep in memory only light info about a query
1423  if (!(pq->IsDraw()) && memqueries >= 0) {
1424  if (fQMgr && fQMgr->Queries()) {
1425  TQueryResult *pqr = pq->CloneInfo();
1426  if (pqr) fQMgr->Queries()->Add(pqr);
1427  // Remove from the fQueries list
1428  fQMgr->Queries()->Remove(pq);
1429  }
1430  }
1431  // To get the prompt back
1432  TString msg;
1433  msg.Form("Lite-0: all output objects have been merged ");
1434  fprintf(stderr, "%s\n", msg.Data());
1435  }
1436  // Save the performance info, if required
1437  if (!fPerfTree.IsNull()) {
1438  if (SavePerfTree() != 0) Error("Process", "saving performance info ...");
1439  // Must be re-enabled each time
1440  SetPerfTree(0);
1441  }
1442  }
1443  // Finalise output file settings (opt is ignored in here)
1444  if (HandleOutputOptions(opt, outfile, 1) != 0) return -1;
1445 
1446  // Retrieve status from the output list
1447  if (rv >= 0) {
1448  TParameter<Long64_t> *sst =
1449  (TParameter<Long64_t> *) fOutputList.FindObject("PROOF_SelectorStatus");
1450  if (sst) rv = sst->GetVal();
1451  }
1452 
1453 
1454  // Done
1455  return rv;
1456 }
1457 
1458 ////////////////////////////////////////////////////////////////////////////////
1459 /// Create in each worker sandbox symlinks to the files in the list
1460 /// Used to make the cache information available to workers.
1461 
1463 {
1464  Int_t rc = 0;
1465  if (files) {
1466  TList *wls = (wrks) ? wrks : fActiveSlaves;
1467  TIter nxf(files);
1468  TObjString *os = 0;
1469  while ((os = (TObjString *) nxf())) {
1470  // Expand target
1471  TString tgt(os->GetName());
1472  gSystem->ExpandPathName(tgt);
1473  // Loop over active workers
1474  TIter nxw(wls);
1475  TSlave *wrk = 0;
1476  while ((wrk = (TSlave *) nxw())) {
1477  // Link name
1478  TString lnk = Form("%s/%s", wrk->GetWorkDir(), gSystem->BaseName(os->GetName()));
1479  gSystem->Unlink(lnk);
1480  if (gSystem->Symlink(tgt, lnk) != 0) {
1481  rc++;
1482  Warning("CreateSymLinks", "problems creating sym link: %s", lnk.Data());
1483  } else {
1484  PDB(kGlobal,1)
1485  Info("CreateSymLinks", "created sym link: %s", lnk.Data());
1486  }
1487  }
1488  }
1489  } else {
1490  Warning("CreateSymLinks", "files list is undefined");
1491  }
1492  // Done
1493  return rc;
1494 }
1495 
1496 ////////////////////////////////////////////////////////////////////////////////
1497 /// Initialize the dataset manager from directives or from defaults
1498 /// Return 0 on success, -1 on failure
1499 
1501 {
1502  fDataSetManager = 0;
1503 
1504  // Default user and group
1505  TString user("???"), group("default");
1506  UserGroup_t *pw = gSystem->GetUserInfo();
1507  if (pw) {
1508  user = pw->fUser;
1509  delete pw;
1510  }
1511 
1512  // Dataset manager instance via plug-in
1513  TPluginHandler *h = 0;
1514  TString dsm = gEnv->GetValue("Proof.DataSetManager", "");
1515  if (!dsm.IsNull()) {
1516  // Get plugin manager to load the appropriate TDataSetManager
1517  if (gROOT->GetPluginManager()) {
1518  // Find the appropriate handler
1519  h = gROOT->GetPluginManager()->FindHandler("TDataSetManager", dsm);
1520  if (h && h->LoadPlugin() != -1) {
1521  // make instance of the dataset manager
1522  fDataSetManager =
1523  reinterpret_cast<TDataSetManager*>(h->ExecPlugin(3, group.Data(),
1524  user.Data(), dsm.Data()));
1525  }
1526  }
1527  }
1529  Warning("InitDataSetManager", "dataset manager plug-in initialization failed");
1531  }
1532 
1533  // If no valid dataset manager has been created we instantiate the default one
1534  if (!fDataSetManager) {
1535  TString opts("Av:");
1536  TString dsetdir = gEnv->GetValue("ProofServ.DataSetDir", "");
1537  if (dsetdir.IsNull()) {
1538  // Use the default in the sandbox
1539  dsetdir = fDataSetDir;
1540  opts += "Sb:";
1541  }
1542  // Find the appropriate handler
1543  if (!h) {
1544  h = gROOT->GetPluginManager()->FindHandler("TDataSetManager", "file");
1545  if (h && h->LoadPlugin() == -1) h = 0;
1546  }
1547  if (h) {
1548  // make instance of the dataset manager
1549  fDataSetManager = reinterpret_cast<TDataSetManager*>(h->ExecPlugin(3,
1550  group.Data(), user.Data(),
1551  Form("dir:%s opt:%s", dsetdir.Data(), opts.Data())));
1552  }
1554  Warning("InitDataSetManager", "default dataset manager plug-in initialization failed");
1556  }
1557  }
1558 
1559  if (gDebug > 0 && fDataSetManager) {
1560  Info("InitDataSetManager", "datasetmgr Cq: %d, Ar: %d, Av: %d, Ti: %d, Sb: %d",
1566  }
1567 
1568  // Dataset manager for staging requests
1569  TString dsReqCfg = gEnv->GetValue("Proof.DataSetStagingRequests", "");
1570  if (!dsReqCfg.IsNull()) {
1571  TPMERegexp reReqDir("(^| )(dir:)?([^ ]+)( |$)");
1572 
1573  if (reReqDir.Match(dsReqCfg) == 5) {
1574  TString dsDirFmt;
1575  dsDirFmt.Form("dir:%s perms:open", reReqDir[3].Data());
1576  fDataSetStgRepo = new TDataSetManagerFile("_stage_", "_stage_", dsDirFmt);
1578  Warning("InitDataSetManager", "failed init of dataset staging requests repository");
1580  }
1581  } else {
1582  Warning("InitDataSetManager", "specify, with [dir:]<path>, a valid path for staging requests");
1583  }
1584  } else if (gDebug > 0) {
1585  Warning("InitDataSetManager", "no repository for staging requests available");
1586  }
1587 
1588  // Done
1589  return (fDataSetManager ? 0 : -1);
1590 }
1591 
1592 ////////////////////////////////////////////////////////////////////////////////
1593 /// List contents of file cache. If all is true show all caches also on
1594 /// slaves. If everything is ok all caches are to be the same.
1595 
1597 {
1598  if (!IsValid()) return;
1599 
1600  Printf("*** Local file cache %s ***", fCacheDir.Data());
1601  gSystem->Exec(Form("%s %s", kLS, fCacheDir.Data()));
1602 }
1603 
1604 ////////////////////////////////////////////////////////////////////////////////
1605 /// Remove files from all file caches.
1606 
1607 void TProofLite::ClearCache(const char *file)
1608 {
1609  if (!IsValid()) return;
1610 
1611  fCacheLock->Lock();
1612  if (!file || strlen(file) <= 0) {
1613  gSystem->Exec(Form("%s %s/*", kRM, fCacheDir.Data()));
1614  } else {
1615  gSystem->Exec(Form("%s %s/%s", kRM, fCacheDir.Data(), file));
1616  }
1617  fCacheLock->Unlock();
1618 }
1619 
1620 ////////////////////////////////////////////////////////////////////////////////
1621 /// Copy the specified macro in the cache directory. The macro file is
1622 /// uploaded if new or updated. If existing, the corresponding header
1623 /// basename(macro).h or .hh, is also uploaded. For the other arguments
1624 /// see TProof::Load().
1625 /// Returns 0 in case of success and -1 in case of error.
1626 
1627 Int_t TProofLite::Load(const char *macro, Bool_t notOnClient, Bool_t uniqueOnly,
1628  TList *wrks)
1629 {
1630  if (!IsValid()) return -1;
1631 
1632  if (!macro || !macro[0]) {
1633  Error("Load", "need to specify a macro name");
1634  return -1;
1635  }
1636 
1637  TString macs(macro), mac;
1638  Int_t from = 0;
1639  while (macs.Tokenize(mac, from, ",")) {
1640  if (IsIdle()) {
1641  if (CopyMacroToCache(mac) < 0) return -1;
1642  } else {
1643  // The name
1644  TString macn = gSystem->BaseName(mac);
1645  macn.Remove(macn.Last('.'));
1646  // Relevant pointers
1647  TList cachedFiles;
1648  TString cacheDir = fCacheDir;
1649  gSystem->ExpandPathName(cacheDir);
1650  void * dirp = gSystem->OpenDirectory(cacheDir);
1651  if (dirp) {
1652  const char *e = 0;
1653  while ((e = gSystem->GetDirEntry(dirp))) {
1654  if (!strncmp(e, macn.Data(), macn.Length())) {
1655  TString fncache = Form("%s/%s", cacheDir.Data(), e);
1656  cachedFiles.Add(new TObjString(fncache.Data()));
1657  }
1658  }
1659  gSystem->FreeDirectory(dirp);
1660  }
1661  // Create the relevant symlinks
1662  CreateSymLinks(&cachedFiles, wrks);
1663  }
1664  }
1665 
1666  return TProof::Load(macro, notOnClient, uniqueOnly, wrks);
1667 }
1668 
1669 ////////////////////////////////////////////////////////////////////////////////
1670 /// Copy a macro, and its possible associated .h[h] file,
1671 /// to the cache directory, from where the workers can get the file.
1672 /// If headerRequired is 1, return -1 in case the header is not found.
1673 /// If headerRequired is 0, try to copy header too.
1674 /// If headerRequired is -1, don't look for header, only copy macro.
1675 /// If the selector pionter is not 0, consider the macro to be a selector
1676 /// and try to load the selector and set it to the pointer.
1677 /// The mask 'opt' is an or of ESendFileOpt:
1678 /// kCpBin (0x8) Retrieve from the cache the binaries associated
1679 /// with the file
1680 /// kCp (0x10) Retrieve the files from the cache
1681 /// Return -1 in case of error, 0 otherwise.
1682 
1683 Int_t TProofLite::CopyMacroToCache(const char *macro, Int_t headerRequired,
1684  TSelector **selector, Int_t opt, TList *wrks)
1685 {
1686  // Relevant pointers
1687  TString cacheDir = fCacheDir;
1688  gSystem->ExpandPathName(cacheDir);
1689  TProofLockPath *cacheLock = fCacheLock;
1690 
1691  // Split out the aclic mode, if any
1692  TString name = macro;
1693  TString acmode, args, io;
1694  name = gSystem->SplitAclicMode(name, acmode, args, io);
1695 
1696  PDB(kGlobal,1)
1697  Info("CopyMacroToCache", "enter: names: %s, %s", macro, name.Data());
1698 
1699  // Make sure that the file exists
1700  if (gSystem->AccessPathName(name, kReadPermission)) {
1701  Error("CopyMacroToCache", "file %s not found or not readable", name.Data());
1702  return -1;
1703  }
1704 
1705  // Update the macro path
1707  TString np(gSystem->DirName(name));
1708  if (!np.IsNull()) {
1709  np += ":";
1710  if (!mp.BeginsWith(np) && !mp.Contains(":"+np)) {
1711  Int_t ip = (mp.BeginsWith(".:")) ? 2 : 0;
1712  mp.Insert(ip, np);
1713  TROOT::SetMacroPath(mp);
1714  PDB(kGlobal,1)
1715  Info("CopyMacroToCache", "macro path set to '%s'", TROOT::GetMacroPath());
1716  }
1717  }
1718 
1719  // Check the header file
1720  Int_t dot = name.Last('.');
1721  const char *hext[] = { ".h", ".hh", "" };
1722  TString hname, checkedext;
1723  Int_t i = 0;
1724  while (strlen(hext[i]) > 0) {
1725  hname = name(0, dot);
1726  hname += hext[i];
1727  if (!gSystem->AccessPathName(hname, kReadPermission))
1728  break;
1729  if (!checkedext.IsNull()) checkedext += ",";
1730  checkedext += hext[i];
1731  hname = "";
1732  i++;
1733  }
1734  if (hname.IsNull() && headerRequired == 1) {
1735  Error("CopyMacroToCache", "header file for %s not found or not readable "
1736  "(checked extensions: %s)", name.Data(), checkedext.Data());
1737  return -1;
1738  }
1739  if (headerRequired < 0)
1740  hname = "";
1741 
1742  cacheLock->Lock();
1743 
1744  // Check these files with those in the cache (if any)
1745  Bool_t useCacheBinaries = kFALSE;
1746  TString cachedname = Form("%s/%s", cacheDir.Data(), gSystem->BaseName(name));
1747  TString cachedhname;
1748  if (!hname.IsNull())
1749  cachedhname = Form("%s/%s", cacheDir.Data(), gSystem->BaseName(hname));
1750  if (!gSystem->AccessPathName(cachedname, kReadPermission)) {
1751  TMD5 *md5 = TMD5::FileChecksum(name);
1752  TMD5 *md5cache = TMD5::FileChecksum(cachedname);
1753  if (md5 && md5cache && (*md5 == *md5cache))
1754  useCacheBinaries = kTRUE;
1755  if (!hname.IsNull()) {
1756  if (!gSystem->AccessPathName(cachedhname, kReadPermission)) {
1757  TMD5 *md5h = TMD5::FileChecksum(hname);
1758  TMD5 *md5hcache = TMD5::FileChecksum(cachedhname);
1759  if (md5h && md5hcache && (*md5h != *md5hcache))
1760  useCacheBinaries = kFALSE;
1761  SafeDelete(md5h);
1762  SafeDelete(md5hcache);
1763  }
1764  }
1765  SafeDelete(md5);
1766  SafeDelete(md5cache);
1767  }
1768 
1769  // Create version file name template
1770  TString vername(Form(".%s", name.Data()));
1771  dot = vername.Last('.');
1772  if (dot != kNPOS)
1773  vername.Remove(dot);
1774  vername += ".binversion";
1775  Bool_t savever = kFALSE;
1776 
1777  // Check binary version
1778  if (useCacheBinaries) {
1779  TString v, r;
1780  FILE *f = fopen(Form("%s/%s", cacheDir.Data(), vername.Data()), "r");
1781  if (f) {
1782  v.Gets(f);
1783  r.Gets(f);
1784  fclose(f);
1785  }
1786  if (!f || v != gROOT->GetVersion() || r != gROOT->GetGitCommit())
1787  useCacheBinaries = kFALSE;
1788  }
1789 
1790  // Create binary name template
1791  TString binname = gSystem->BaseName(name);
1792  dot = binname.Last('.');
1793  if (dot != kNPOS)
1794  binname.Replace(dot,1,"_");
1795  binname += ".";
1796 
1797  FileStat_t stlocal, stcache;
1798  void *dirp = 0;
1799  if (useCacheBinaries) {
1800  // Loop over binaries in the cache and copy them locally if newer then the local
1801  // versions or there is no local version
1802  dirp = gSystem->OpenDirectory(cacheDir);
1803  if (dirp) {
1804  const char *e = 0;
1805  while ((e = gSystem->GetDirEntry(dirp))) {
1806  if (!strncmp(e, binname.Data(), binname.Length())) {
1807  TString fncache = Form("%s/%s", cacheDir.Data(), e);
1808  Bool_t docp = kTRUE;
1809  if (!gSystem->GetPathInfo(fncache, stcache)) {
1810  Int_t rc = gSystem->GetPathInfo(e, stlocal);
1811  if (rc == 0 && (stlocal.fMtime >= stcache.fMtime))
1812  docp = kFALSE;
1813  // Copy the file, if needed
1814  if (docp) {
1815  gSystem->Exec(Form("%s %s", kRM, e));
1816  PDB(kGlobal,2)
1817  Info("CopyMacroToCache",
1818  "retrieving %s from cache", fncache.Data());
1819  gSystem->Exec(Form("%s %s %s", kCP, fncache.Data(), e));
1820  }
1821  }
1822  }
1823  }
1824  gSystem->FreeDirectory(dirp);
1825  }
1826  }
1827  cacheLock->Unlock();
1828 
1829  if (selector) {
1830  // Now init the selector in optimized way
1831  if (!(*selector = TSelector::GetSelector(macro))) {
1832  Error("CopyMacroToCache", "could not create a selector from %s", macro);
1833  return -1;
1834  }
1835  }
1836 
1837  cacheLock->Lock();
1838 
1839  TList *cachedFiles = new TList;
1840  // Save information in the cache now for later usage
1841  dirp = gSystem->OpenDirectory(".");
1842  if (dirp) {
1843  const char *e = 0;
1844  while ((e = gSystem->GetDirEntry(dirp))) {
1845  if (!strncmp(e, binname.Data(), binname.Length())) {
1846  Bool_t docp = kTRUE;
1847  if (!gSystem->GetPathInfo(e, stlocal)) {
1848  TString fncache = Form("%s/%s", cacheDir.Data(), e);
1849  Int_t rc = gSystem->GetPathInfo(fncache, stcache);
1850  if (rc == 0 && (stlocal.fMtime <= stcache.fMtime))
1851  docp = kFALSE;
1852  // Copy the file, if needed
1853  if (docp) {
1854  gSystem->Exec(Form("%s %s", kRM, fncache.Data()));
1855  PDB(kGlobal,2)
1856  Info("CopyMacroToCache","caching %s ...", e);
1857  gSystem->Exec(Form("%s %s %s", kCP, e, fncache.Data()));
1858  savever = kTRUE;
1859  }
1860  if (opt & kCpBin)
1861  cachedFiles->Add(new TObjString(fncache.Data()));
1862  }
1863  }
1864  }
1865  gSystem->FreeDirectory(dirp);
1866  }
1867 
1868  // Save binary version if requested
1869  if (savever) {
1870  FILE *f = fopen(Form("%s/%s", cacheDir.Data(), vername.Data()), "w");
1871  if (f) {
1872  fputs(gROOT->GetVersion(), f);
1873  fputs(Form("\n%s", gROOT->GetGitCommit()), f);
1874  fclose(f);
1875  }
1876  }
1877 
1878  // Save also the selector info, if needed
1879  if (!useCacheBinaries) {
1880  gSystem->Exec(Form("%s %s", kRM, cachedname.Data()));
1881  PDB(kGlobal,2)
1882  Info("CopyMacroToCache","caching %s ...", name.Data());
1883  gSystem->Exec(Form("%s %s %s", kCP, name.Data(), cachedname.Data()));
1884  if (!hname.IsNull()) {
1885  gSystem->Exec(Form("%s %s", kRM, cachedhname.Data()));
1886  PDB(kGlobal,2)
1887  Info("CopyMacroToCache","caching %s ...", hname.Data());
1888  gSystem->Exec(Form("%s %s %s", kCP, hname.Data(), cachedhname.Data()));
1889  }
1890  }
1891  if (opt & kCp) {
1892  cachedFiles->Add(new TObjString(cachedname.Data()));
1893  if (!hname.IsNull())
1894  cachedFiles->Add(new TObjString(cachedhname.Data()));
1895  }
1896 
1897  cacheLock->Unlock();
1898 
1899  // Create symlinks
1900  if (opt & (kCp | kCpBin))
1901  CreateSymLinks(cachedFiles, wrks);
1902 
1903  cachedFiles->SetOwner();
1904  delete cachedFiles;
1905 
1906  return 0;
1907 }
1908 
1909 ////////////////////////////////////////////////////////////////////////////////
1910 /// Remove old sessions dirs keep at most 'Proof.MaxOldSessions' (default 10)
1911 
1913 {
1914  Int_t maxold = gEnv->GetValue("Proof.MaxOldSessions", 1);
1915 
1916  if (maxold < 0) return 0;
1917 
1918  TSortedList *olddirs = new TSortedList(kFALSE);
1919 
1920  TString sandbox = gSystem->DirName(fWorkDir.Data());
1921 
1922  void *dirp = gSystem->OpenDirectory(sandbox);
1923  if (dirp) {
1924  const char *e = 0;
1925  while ((e = gSystem->GetDirEntry(dirp))) {
1926  if (!strncmp(e, "session-", 8) && !strstr(e, GetName())) {
1927  TString d(e);
1928  Int_t i = d.Last('-');
1929  if (i != kNPOS) d.Remove(i);
1930  i = d.Last('-');
1931  if (i != kNPOS) d.Remove(0,i+1);
1932  TString path = Form("%s/%s", sandbox.Data(), e);
1933  olddirs->Add(new TNamed(d, path));
1934  }
1935  }
1936  gSystem->FreeDirectory(dirp);
1937  }
1938 
1939  // Clean it up, if required
1940  Bool_t notify = kTRUE;
1941  while (olddirs->GetSize() > maxold) {
1942  if (notify && gDebug > 0)
1943  Printf("Cleaning sandbox at: %s", sandbox.Data());
1944  notify = kFALSE;
1945  TNamed *n = (TNamed *) olddirs->Last();
1946  if (n) {
1947  gSystem->Exec(Form("%s %s", kRM, n->GetTitle()));
1948  olddirs->Remove(n);
1949  delete n;
1950  }
1951  }
1952 
1953  // Cleanup
1954  olddirs->SetOwner();
1955  delete olddirs;
1956 
1957  // Done
1958  return 0;
1959 }
1960 
1961 ////////////////////////////////////////////////////////////////////////////////
1962 /// Get the list of queries.
1963 
1965 {
1966  Bool_t all = ((strchr(opt,'A') || strchr(opt,'a'))) ? kTRUE : kFALSE;
1967 
1968  TList *ql = new TList;
1969  Int_t ntot = 0, npre = 0, ndraw= 0;
1970  if (fQMgr) {
1971  if (all) {
1972  // Rescan
1973  TString qdir = fQueryDir;
1974  Int_t idx = qdir.Index("session-");
1975  if (idx != kNPOS)
1976  qdir.Remove(idx);
1977  fQMgr->ScanPreviousQueries(qdir);
1978  // Gather also information about previous queries, if any
1979  if (fQMgr->PreviousQueries()) {
1980  TIter nxq(fQMgr->PreviousQueries());
1981  TProofQueryResult *pqr = 0;
1982  while ((pqr = (TProofQueryResult *)nxq())) {
1983  ntot++;
1984  pqr->fSeqNum = ntot;
1985  ql->Add(pqr);
1986  }
1987  }
1988  }
1989 
1990  npre = ntot;
1991  if (fQMgr->Queries()) {
1992  // Add info about queries in this session
1993  TIter nxq(fQMgr->Queries());
1994  TProofQueryResult *pqr = 0;
1995  TQueryResult *pqm = 0;
1996  while ((pqr = (TProofQueryResult *)nxq())) {
1997  ntot++;
1998  if ((pqm = pqr->CloneInfo())) {
1999  pqm->fSeqNum = ntot;
2000  ql->Add(pqm);
2001  } else {
2002  Warning("GetListOfQueries", "unable to clone TProofQueryResult '%s:%s'",
2003  pqr->GetName(), pqr->GetTitle());
2004  }
2005  }
2006  }
2007  // Number of draw queries
2008  ndraw = fQMgr->DrawQueries();
2009  }
2010 
2011  fOtherQueries = npre;
2012  fDrawQueries = ndraw;
2013  if (fQueries) {
2014  fQueries->Delete();
2015  delete fQueries;
2016  fQueries = 0;
2017  }
2018  fQueries = ql;
2019 
2020  // This should have been filled by now
2021  return fQueries;
2022 }
2023 
2024 ////////////////////////////////////////////////////////////////////////////////
2025 /// Register the 'dataSet' on the cluster under the current
2026 /// user, group and the given 'dataSetName'.
2027 /// Fails if a dataset named 'dataSetName' already exists, unless 'optStr'
2028 /// contains 'O', in which case the old dataset is overwritten.
2029 /// If 'optStr' contains 'V' the dataset files are verified (default no
2030 /// verification).
2031 /// Returns kTRUE on success.
2032 
2034  TFileCollection *dataSet, const char* optStr)
2035 {
2036  if (!fDataSetManager) {
2037  Info("RegisterDataSet", "dataset manager not available");
2038  return kFALSE;
2039  }
2040 
2041  if (!uri || strlen(uri) <= 0) {
2042  Info("RegisterDataSet", "specifying a dataset name is mandatory");
2043  return kFALSE;
2044  }
2045 
2046  Bool_t parallelverify = kFALSE;
2047  TString sopt(optStr);
2048  if (sopt.Contains("V") && !sopt.Contains("S")) {
2049  // We do verification in parallel later on; just register for now
2050  parallelverify = kTRUE;
2051  sopt.ReplaceAll("V", "");
2052  }
2053  // This would screw up things remotely, make sure is not there
2054  sopt.ReplaceAll("S", "");
2055 
2056  Bool_t result = kTRUE;
2058  // Check the list
2059  if (!dataSet || dataSet->GetList()->GetSize() == 0) {
2060  Error("RegisterDataSet", "can not save an empty list.");
2061  result = kFALSE;
2062  }
2063  // Register the dataset (quota checks are done inside here)
2064  result = (fDataSetManager->RegisterDataSet(uri, dataSet, sopt) == 0)
2065  ? kTRUE : kFALSE;
2066  } else {
2067  Info("RegisterDataSet", "dataset registration not allowed");
2068  result = kFALSE;
2069  }
2070 
2071  if (!result)
2072  Error("RegisterDataSet", "dataset was not saved");
2073 
2074  // If old server or not verifying in parallel we are done
2075  if (!parallelverify) return result;
2076 
2077  // If we are here it means that we will verify in parallel
2078  sopt += "V";
2079  if (VerifyDataSet(uri, sopt) < 0){
2080  Error("RegisterDataSet", "problems verifying dataset '%s'", uri);
2081  return kFALSE;
2082  }
2083 
2084  // Done
2085  return kTRUE;
2086 }
2087 
2088 ////////////////////////////////////////////////////////////////////////////////
2089 /// Set/Change the name of the default tree. The tree name may contain
2090 /// subdir specification in the form "subdir/name".
2091 /// Returns 0 on success, -1 otherwise.
2092 
2093 Int_t TProofLite::SetDataSetTreeName(const char *dataset, const char *treename)
2094 {
2095  if (!fDataSetManager) {
2096  Info("ExistsDataSet", "dataset manager not available");
2097  return kFALSE;
2098  }
2099 
2100  if (!dataset || strlen(dataset) <= 0) {
2101  Info("SetDataSetTreeName", "specifying a dataset name is mandatory");
2102  return -1;
2103  }
2104 
2105  if (!treename || strlen(treename) <= 0) {
2106  Info("SetDataSetTreeName", "specifying a tree name is mandatory");
2107  return -1;
2108  }
2109 
2110  TUri uri(dataset);
2111  TString fragment(treename);
2112  if (!fragment.BeginsWith("/")) fragment.Insert(0, "/");
2113  uri.SetFragment(fragment);
2114 
2115  return fDataSetManager->ScanDataSet(uri.GetUri().Data(),
2117 }
2118 
2119 ////////////////////////////////////////////////////////////////////////////////
2120 /// Returns kTRUE if 'dataset' described by 'uri' exists, kFALSE otherwise
2121 
2123 {
2124  if (!fDataSetManager) {
2125  Info("ExistsDataSet", "dataset manager not available");
2126  return kFALSE;
2127  }
2128 
2129  if (!uri || strlen(uri) <= 0) {
2130  Error("ExistsDataSet", "dataset name missing");
2131  return kFALSE;
2132  }
2133 
2134  // Check if the dataset exists
2135  return fDataSetManager->ExistsDataSet(uri);
2136 }
2137 
2138 ////////////////////////////////////////////////////////////////////////////////
2139 /// lists all datasets that match given uri
2140 
2141 TMap *TProofLite::GetDataSets(const char *uri, const char *srvex)
2142 {
2143  if (!fDataSetManager) {
2144  Info("GetDataSets", "dataset manager not available");
2145  return (TMap *)0;
2146  }
2147 
2148  // Get the datasets and return the map
2149  if (srvex && strlen(srvex) > 0) {
2150  return fDataSetManager->GetSubDataSets(uri, srvex);
2151  } else {
2153  return fDataSetManager->GetDataSets(uri, opt);
2154  }
2155 }
2156 
2157 ////////////////////////////////////////////////////////////////////////////////
2158 /// Shows datasets in locations that match the uri
2159 /// By default shows the user's datasets and global ones
2160 
2161 void TProofLite::ShowDataSets(const char *uri, const char *opt)
2162 {
2163  if (!fDataSetManager) {
2164  Info("GetDataSet", "dataset manager not available");
2165  return;
2166  }
2167 
2168  fDataSetManager->ShowDataSets(uri, opt);
2169 }
2170 
2171 ////////////////////////////////////////////////////////////////////////////////
2172 /// Get a list of TFileInfo objects describing the files of the specified
2173 /// dataset.
2174 
2175 TFileCollection *TProofLite::GetDataSet(const char *uri, const char *)
2176 {
2177  if (!fDataSetManager) {
2178  Info("GetDataSet", "dataset manager not available");
2179  return (TFileCollection *)0;
2180  }
2181 
2182  if (!uri || strlen(uri) <= 0) {
2183  Info("GetDataSet", "specifying a dataset name is mandatory");
2184  return 0;
2185  }
2186 
2187  // Return the list
2188  return fDataSetManager->GetDataSet(uri);
2189 }
2190 
2191 ////////////////////////////////////////////////////////////////////////////////
2192 /// Remove the specified dataset from the PROOF cluster.
2193 /// Files are not deleted.
2194 
2195 Int_t TProofLite::RemoveDataSet(const char *uri, const char *)
2196 {
2197  if (!fDataSetManager) {
2198  Info("RemoveDataSet", "dataset manager not available");
2199  return -1;
2200  }
2201 
2203  if (!fDataSetManager->RemoveDataSet(uri)) {
2204  // Failure
2205  return -1;
2206  }
2207  } else {
2208  Info("RemoveDataSet", "dataset creation / removal not allowed");
2209  return -1;
2210  }
2211 
2212  // Done
2213  return 0;
2214 }
2215 
2216 ////////////////////////////////////////////////////////////////////////////////
2217 /// Allows users to request staging of a particular dataset. Requests are
2218 /// saved in a special dataset repository and must be honored by the endpoint.
2219 /// This is the special PROOF-Lite re-implementation of the TProof function
2220 /// and includes code originally implemented in TProofServ.
2221 
2223 {
2224  if (!dataset) {
2225  Error("RequestStagingDataSet", "invalid dataset specified");
2226  return kFALSE;
2227  }
2228 
2229  if (!fDataSetStgRepo) {
2230  Error("RequestStagingDataSet", "no dataset staging request repository available");
2231  return kFALSE;
2232  }
2233 
2234  TString dsUser, dsGroup, dsName, dsTree;
2235 
2236  // Transform input URI in a valid dataset name
2237  TString validUri = dataset;
2238  while (fReInvalid->Substitute(validUri, "_")) {}
2239 
2240  // Check if dataset exists beforehand: if it does, staging has already been requested
2241  if (fDataSetStgRepo->ExistsDataSet(validUri.Data())) {
2242  Warning("RequestStagingDataSet", "staging of %s already requested", dataset);
2243  return kFALSE;
2244  }
2245 
2246  // Try to get dataset from current manager
2248  if (!fc || (fc->GetNFiles() == 0)) {
2249  Error("RequestStagingDataSet", "empty dataset or no dataset returned");
2250  if (fc) delete fc;
2251  return kFALSE;
2252  }
2253 
2254  // Reset all staged bits and remove unnecessary URLs (all but last)
2255  TIter it(fc->GetList());
2256  TFileInfo *fi;
2257  while ((fi = dynamic_cast<TFileInfo *>(it.Next()))) {
2259  Int_t nToErase = fi->GetNUrls() - 1;
2260  for (Int_t i=0; i<nToErase; i++)
2261  fi->RemoveUrlAt(0);
2262  }
2263 
2264  fc->Update(); // absolutely necessary
2265 
2266  // Save request
2267  fDataSetStgRepo->ParseUri(validUri, &dsGroup, &dsUser, &dsName);
2268  if (fDataSetStgRepo->WriteDataSet(dsGroup, dsUser, dsName, fc) == 0) {
2269  // Error, can't save dataset
2270  Error("RequestStagingDataSet", "can't register staging request for %s", dataset);
2271  delete fc;
2272  return kFALSE;
2273  }
2274 
2275  Info("RequestStagingDataSet", "Staging request registered for %s", dataset);
2276  delete fc;
2277 
2278  return kTRUE;
2279 }
2280 
2281 ////////////////////////////////////////////////////////////////////////////////
2282 /// Cancels a dataset staging request. Returns kTRUE on success, kFALSE on
2283 /// failure. Dataset not found equals to a failure. PROOF-Lite
2284 /// re-implementation of the equivalent function in TProofServ.
2285 
2287 {
2288  if (!dataset) {
2289  Error("CancelStagingDataSet", "invalid dataset specified");
2290  return kFALSE;
2291  }
2292 
2293  if (!fDataSetStgRepo) {
2294  Error("CancelStagingDataSet", "no dataset staging request repository available");
2295  return kFALSE;
2296  }
2297 
2298  // Transform URI in a valid dataset name
2299  TString validUri = dataset;
2300  while (fReInvalid->Substitute(validUri, "_")) {}
2301 
2302  if (!fDataSetStgRepo->RemoveDataSet(validUri.Data()))
2303  return kFALSE;
2304 
2305  return kTRUE;
2306 }
2307 
2308 ////////////////////////////////////////////////////////////////////////////////
2309 /// Obtains a TFileCollection showing the staging status of the specified
2310 /// dataset. A valid dataset manager and dataset staging requests repository
2311 /// must be present on the endpoint. PROOF-Lite version of the equivalent
2312 /// function from TProofServ.
2313 
2315 {
2316  if (!dataset) {
2317  Error("GetStagingStatusDataSet", "invalid dataset specified");
2318  return 0;
2319  }
2320 
2321  if (!fDataSetStgRepo) {
2322  Error("GetStagingStatusDataSet", "no dataset staging request repository available");
2323  return 0;
2324  }
2325 
2326  // Transform URI in a valid dataset name
2327  TString validUri = dataset;
2328  while (fReInvalid->Substitute(validUri, "_")) {}
2329 
2330  // Get the list
2332  if (!fc) {
2333  // No such dataset (not an error)
2334  Info("GetStagingStatusDataSet", "no pending staging request for %s", dataset);
2335  return 0;
2336  }
2337 
2338  // Dataset found: return it (must be cleaned by caller)
2339  return fc;
2340 }
2341 
2342 ////////////////////////////////////////////////////////////////////////////////
2343 /// Verify if all files in the specified dataset are available.
2344 /// Print a list and return the number of missing files.
2345 
2346 Int_t TProofLite::VerifyDataSet(const char *uri, const char *optStr)
2347 {
2348  if (!fDataSetManager) {
2349  Info("VerifyDataSet", "dataset manager not available");
2350  return -1;
2351  }
2352 
2353  Int_t rc = -1;
2354  TString sopt(optStr);
2355  if (sopt.Contains("S")) {
2356 
2358  rc = fDataSetManager->ScanDataSet(uri);
2359  } else {
2360  Info("VerifyDataSet", "dataset verification not allowed");
2361  rc = -1;
2362  }
2363  return rc;
2364  }
2365 
2366  // Done
2367  return VerifyDataSetParallel(uri, optStr);
2368 }
2369 
2370 ////////////////////////////////////////////////////////////////////////////////
2371 /// Clear the content of the dataset cache, if any (matching 'dataset', if defined).
2372 
2373 void TProofLite::ClearDataSetCache(const char *dataset)
2374 {
2376  // Done
2377  return;
2378 }
2379 
2380 ////////////////////////////////////////////////////////////////////////////////
2381 /// Display the content of the dataset cache, if any (matching 'dataset', if defined).
2382 
2383 void TProofLite::ShowDataSetCache(const char *dataset)
2384 {
2385  // For PROOF-Lite act locally
2386  if (fDataSetManager) fDataSetManager->ShowCache(dataset);
2387  // Done
2388  return;
2389 }
2390 
2391 ////////////////////////////////////////////////////////////////////////////////
2392 /// Make sure that the input data objects are available to the workers in a
2393 /// dedicated file in the cache; the objects are taken from the dedicated list
2394 /// and / or the specified file.
2395 /// If the fInputData is empty the specified file is sent over.
2396 /// If there is no specified file, a file named "inputdata.root" is created locally
2397 /// with the content of fInputData and sent over to the master.
2398 /// If both fInputData and the specified file are not empty, a copy of the file
2399 /// is made locally and augmented with the content of fInputData.
2400 
2402 {
2403  // Prepare the file
2404  TString dataFile;
2405  PrepareInputDataFile(dataFile);
2406 
2407  // Make sure it is in the cache, if not empty
2408  if (dataFile.Length() > 0) {
2409 
2410  if (!dataFile.BeginsWith(fCacheDir)) {
2411  // Destination
2412  TString dst;
2413  dst.Form("%s/%s", fCacheDir.Data(), gSystem->BaseName(dataFile));
2414  // Remove it first if it exists
2415  if (!gSystem->AccessPathName(dst))
2416  gSystem->Unlink(dst);
2417  // Copy the file
2418  if (gSystem->CopyFile(dataFile, dst) != 0)
2419  Warning("SendInputDataFile", "problems copying '%s' to '%s'",
2420  dataFile.Data(), dst.Data());
2421  }
2422 
2423  // Set the name in the input list so that the workers can find it
2424  AddInput(new TNamed("PROOF_InputDataFile", Form("%s", gSystem->BaseName(dataFile))));
2425  }
2426 }
2427 
2428 ////////////////////////////////////////////////////////////////////////////////
2429 /// Handle remove request.
2430 
2432 {
2433  PDB(kGlobal, 1)
2434  Info("Remove", "Enter: %s, %d", ref, all);
2435 
2436  if (all) {
2437  // Remove also local copies, if any
2438  if (fPlayer)
2439  fPlayer->RemoveQueryResult(ref);
2440  }
2441 
2442  TString queryref(ref);
2443 
2444  if (queryref == "cleanupdir") {
2445 
2446  // Cleanup previous sessions results
2447  Int_t nd = (fQMgr) ? fQMgr->CleanupQueriesDir() : -1;
2448 
2449  // Notify
2450  Info("Remove", "%d directories removed", nd);
2451  // We are done
2452  return 0;
2453  }
2454 
2455 
2456  if (fQMgr) {
2457  TProofLockPath *lck = 0;
2458  if (fQMgr->LockSession(queryref, &lck) == 0) {
2459 
2460  // Remove query
2461  fQMgr->RemoveQuery(queryref, 0);
2462 
2463  // Unlock and remove the lock file
2464  if (lck) {
2465  gSystem->Unlink(lck->GetName());
2466  SafeDelete(lck);
2467  }
2468 
2469  // We are done
2470  return 0;
2471  }
2472  } else {
2473  Warning("Remove", "query result manager undefined!");
2474  }
2475 
2476  // Notify failure
2477  Info("Remove",
2478  "query %s could not be removed (unable to lock session)", queryref.Data());
2479 
2480  // Done
2481  return -1;
2482 }
2483 
2484 ////////////////////////////////////////////////////////////////////////////////
2485 /// Creates a tree header (a tree with nonexisting files) object for
2486 /// the DataSet.
2487 
2489 {
2490  TTree *t = 0;
2491  if (!dset) {
2492  Error("GetTreeHeader", "undefined TDSet");
2493  return t;
2494  }
2495 
2496  dset->Reset();
2497  TDSetElement *e = dset->Next();
2498  Long64_t entries = 0;
2499  TFile *f = 0;
2500  if (!e) {
2501  PDB(kGlobal, 1) Info("GetTreeHeader", "empty TDSet");
2502  } else {
2503  f = TFile::Open(e->GetFileName());
2504  t = 0;
2505  if (f) {
2506  t = (TTree*) f->Get(e->GetObjName());
2507  if (t) {
2508  t->SetMaxVirtualSize(0);
2509  t->DropBaskets();
2510  entries = t->GetEntries();
2511 
2512  // compute #entries in all the files
2513  while ((e = dset->Next()) != 0) {
2514  TFile *f1 = TFile::Open(e->GetFileName());
2515  if (f1) {
2516  TTree *t1 = (TTree*) f1->Get(e->GetObjName());
2517  if (t1) {
2518  entries += t1->GetEntries();
2519  delete t1;
2520  }
2521  delete f1;
2522  }
2523  }
2524  t->SetMaxEntryLoop(entries); // this field will hold the total number of entries ;)
2525  }
2526  }
2527  }
2528  // Done
2529  return t;
2530 }
2531 
2532 ////////////////////////////////////////////////////////////////////////////////
2533 /// Add to the fUniqueSlave list the active slaves that have a unique
2534 /// (user) file system image. This information is used to transfer files
2535 /// only once to nodes that share a file system (an image). Submasters
2536 /// which are not in fUniqueSlaves are put in the fNonUniqueMasters
2537 /// list. That list is used to trigger the transferring of files to
2538 /// the submaster's unique slaves without the need to transfer the file
2539 /// to the submaster.
2540 
2542 {
2543  fUniqueSlaves->Clear();
2548 
2549  if (fActiveSlaves->GetSize() <= 0) return;
2550 
2551  TSlave *wrk = dynamic_cast<TSlave*>(fActiveSlaves->First());
2552  if (!wrk) {
2553  Error("FindUniqueSlaves", "first object in fActiveSlaves not a TSlave: embarrasing!");
2554  return;
2555  }
2556  fUniqueSlaves->Add(wrk);
2557  fAllUniqueSlaves->Add(wrk);
2558  fUniqueMonitor->Add(wrk->GetSocket());
2559  fAllUniqueMonitor->Add(wrk->GetSocket());
2560 
2561  // will be actiavted in Collect()
2564 }
2565 
2566 ////////////////////////////////////////////////////////////////////////////////
2567 /// List contents of the data directory in the sandbox.
2568 /// This is the place where files produced by the client queries are kept
2569 
2571 {
2572  if (!IsValid()) return;
2573 
2574  // Get worker infos
2575  TList *wrki = GetListOfSlaveInfos();
2576  TSlaveInfo *wi = 0;
2577  TIter nxwi(wrki);
2578  while ((wi = (TSlaveInfo *) nxwi())) {
2579  ShowDataDir(wi->GetDataDir());
2580  }
2581 }
2582 
2583 ////////////////////////////////////////////////////////////////////////////////
2584 /// List contents of the data directory 'dirname'
2585 
2586 void TProofLite::ShowDataDir(const char *dirname)
2587 {
2588  if (!dirname) return;
2589 
2590  FileStat_t dirst;
2591  if (gSystem->GetPathInfo(dirname, dirst) != 0) return;
2592  if (!R_ISDIR(dirst.fMode)) return;
2593 
2594  void *dirp = gSystem->OpenDirectory(dirname);
2595  TString fn;
2596  const char *ent = 0;
2597  while ((ent = gSystem->GetDirEntry(dirp))) {
2598  fn.Form("%s/%s", dirname, ent);
2599  FileStat_t st;
2600  if (gSystem->GetPathInfo(fn.Data(), st) == 0) {
2601  if (R_ISREG(st.fMode)) {
2602  Printf("lite:0| %s", fn.Data());
2603  } else if (R_ISREG(st.fMode)) {
2604  ShowDataDir(fn.Data());
2605  }
2606  }
2607  }
2608  // Done
2609  return;
2610 }
2611 
2612 ////////////////////////////////////////////////////////////////////////////////
2613 /// Simulate dynamic addition, for test purposes.
2614 /// Here we decide how many workers to add, we create them and set the
2615 /// environment.
2616 /// This call is called regularly by Collect if the opton is enabled.
2617 /// Returns the number of new workers added, or <0 on errors.
2618 
2620 {
2621  // Max workers
2622  if (fDynamicStartupNMax <= 0) {
2623  SysInfo_t si;
2624  if (gSystem->GetSysInfo(&si) == 0 && si.fCpus > 2) {
2626  } else {
2627  fDynamicStartupNMax = 2;
2628  }
2629  }
2630  if (fNWorkers >= fDynamicStartupNMax) {
2631  // Max reached: disable
2632  Info("PollForNewWorkers", "max reached: %d workers started", fNWorkers);
2634  return 0;
2635  }
2636 
2637  // Number of new workers
2638  Int_t nAdd = (fDynamicStartupStep > 0) ? fDynamicStartupStep : 1;
2639 
2640  // Create a monitor and add the socket to it
2641  TMonitor *mon = new TMonitor;
2642  mon->Add(fServSock);
2643 
2644  TList started;
2645  TSlave *wrk = 0;
2646  Int_t nWrksDone = 0, nWrksTot = -1;
2647  TString fullord;
2648 
2649  nWrksTot = fNWorkers + nAdd;
2650  // Now we create the worker applications which will call us back to finalize
2651  // the setup
2652  Int_t ord = fNWorkers;
2653  for (; ord < nWrksTot; ord++) {
2654 
2655  // Ordinal for this worker server
2656  fullord = Form("0.%d", ord);
2657 
2658  // Create environment files
2659  SetProofServEnv(fullord);
2660 
2661  // Create worker server and add to the list
2662  if ((wrk = CreateSlave("lite", fullord, 100, fImage, fWorkDir)))
2663  started.Add(wrk);
2664 
2665  PDB(kGlobal, 3)
2666  Info("PollForNewWorkers", "additional worker '%s' started", fullord.Data());
2667 
2668  // Notify
2669  NotifyStartUp("Opening connections to workers", ++nWrksDone, nWrksTot);
2670 
2671  } //end of worker loop
2672  fNWorkers = nWrksTot;
2673 
2674  // A list of TSlave objects for workers that are being added
2675  TList *addedWorkers = new TList();
2676  addedWorkers->SetOwner(kFALSE);
2677 
2678  // Wait for call backs
2679  nWrksDone = 0;
2680  nWrksTot = started.GetSize();
2681  Int_t nSelects = 0;
2682  Int_t to = gEnv->GetValue("ProofLite.StartupTimeOut", 5) * 1000;
2683  while (started.GetSize() > 0 && nSelects < nWrksTot) {
2684 
2685  // Wait for activity on the socket for max 5 secs
2686  TSocket *xs = mon->Select(to);
2687 
2688  // Count attempts and check
2689  nSelects++;
2690  if (xs == (TSocket *) -1) continue;
2691 
2692  // Get the connection
2693  TSocket *s = fServSock->Accept();
2694  if (s && s->IsValid()) {
2695  // Receive ordinal
2696  TMessage *msg = 0;
2697  if (s->Recv(msg) < 0) {
2698  Warning("PollForNewWorkers", "problems receiving message from accepted socket!");
2699  } else {
2700  if (msg) {
2701  *msg >> fullord;
2702  // Find who is calling back
2703  if ((wrk = (TSlave *) started.FindObject(fullord))) {
2704  // Remove it from the started list
2705  started.Remove(wrk);
2706 
2707  // Assign tis socket the selected worker
2708  wrk->SetSocket(s);
2709  // Remove socket from global TROOT socket list. Only the TProof object,
2710  // representing all worker sockets, will be added to this list. This will
2711  // ensure the correct termination of all proof servers in case the
2712  // root session terminates.
2714  gROOT->GetListOfSockets()->Remove(s);
2715  }
2716  if (wrk->IsValid()) {
2717  // Set the input handler
2718  wrk->SetInputHandler(new TProofInputHandler(this, wrk->GetSocket()));
2719  // Set fParallel to 1 for workers since they do not
2720  // report their fParallel with a LOG_DONE message
2721  wrk->fParallel = 1;
2722  // Finalize setup of the server
2723  wrk->SetupServ(TSlave::kSlave, 0);
2724  }
2725 
2726  // Monitor good workers
2727  fSlaves->Add(wrk);
2728  if (wrk->IsValid()) {
2729  fActiveSlaves->Add(wrk); // Is this required? Check!
2730  fAllMonitor->Add(wrk->GetSocket());
2731  // Record also in the list for termination
2732  if (addedWorkers) addedWorkers->Add(wrk);
2733  // Notify startup operations
2734  NotifyStartUp("Setting up added worker servers", ++nWrksDone, nWrksTot);
2735  } else {
2736  // Flag as bad
2737  fBadSlaves->Add(wrk);
2738  }
2739  }
2740  } else {
2741  Warning("PollForNewWorkers", "received empty message from accepted socket!");
2742  }
2743  }
2744  }
2745  }
2746 
2747  // Cleanup the monitor and the server socket
2748  mon->DeActivateAll();
2749  delete mon;
2750 
2751  Broadcast(kPROOF_GETSTATS, addedWorkers);
2752  Collect(addedWorkers, fCollectTimeout);
2753 
2754  // Update group view
2755  // SendGroupView();
2756 
2757  // By default go into parallel mode
2758  // SetParallel(-1, 0);
2759  SendCurrentState(addedWorkers);
2760 
2761  // Set worker processing environment
2762  SetupWorkersEnv(addedWorkers, kTRUE);
2763 
2764  // We are adding workers dynamically to an existing process, we
2765  // should invoke a special player's Process() to set only added workers
2766  // to the proper state
2767  if (fPlayer) {
2768  PDB(kGlobal, 3)
2769  Info("PollForNewWorkers", "Will send the PROCESS message to selected workers");
2770  fPlayer->JoinProcess(addedWorkers);
2771  }
2772 
2773  // Cleanup fwhat remained from startup
2774  Collect(addedWorkers);
2775 
2776  // Activate
2777  TIter naw(addedWorkers);
2778  while ((wrk = (TSlave *)naw())) {
2779  fActiveMonitor->Add(wrk->GetSocket());
2780  }
2781  // Cleanup
2782  delete addedWorkers;
2783 
2784  // Done
2785  return nWrksDone;
2786 }
Int_t SetProofServEnv(const char *ord)
Create environment files for worker 'ord'.
Definition: TProofLite.cxx:720
const char * GetName() const
Returns name of object.
Definition: TObjString.h:42
void SetQueryRunning(TProofQueryResult *pq)
Set query in running state.
Int_t GetNumberOfUniqueSlaves() const
Return number of unique slaves, i.e.
Definition: TProof.cxx:2012
virtual const char * BaseName(const char *pathname)
Base name of a file name. Base name of /user/root is root.
Definition: TSystem.cxx:912
virtual Int_t GetDrawArgs(const char *var, const char *sel, Option_t *opt, TString &selector, TString &objname)=0
virtual const char * GetTitle() const
Returns title of object.
Definition: TNamed.h:52
virtual Bool_t AccessPathName(const char *path, EAccessMode mode=kFileExists)
Returns FALSE if one can access a file using the specified access mode.
Definition: TSystem.cxx:1213
Int_t VerifyDataSet(const char *uri, const char *=0)
Verify if all files in the specified dataset are available.
This class starts a PROOF session on the local machine: no daemons, client and master merged...
Definition: TProofLite.h:42
Ssiz_t Last(char c) const
Find last occurrence of a character c.
Definition: TString.cxx:851
Bool_t RequestStagingDataSet(const char *dataset)
Allows users to request staging of a particular dataset.
Long64_t GetNFiles() const
virtual TList * GetInputList() const =0
virtual TString SplitAclicMode(const char *filename, TString &mode, TString &args, TString &io) const
This method split a filename of the form: ~~~ {.cpp} [path/]macro.C[+|++[k|f|g|O|c|s|d|v|-]][(args)]...
Definition: TSystem.cxx:3994
TQueryResultManager * fQMgr
Definition: TProofLite.h:64
virtual int GetPid()
Get process id.
Definition: TSystem.cxx:711
TString fPerfTree
Definition: TProof.h:591
Bool_t IsDraw() const
Definition: TQueryResult.h:152
TString fPackageDir
Definition: TProof.h:563
virtual void Delete(Option_t *option="")
Remove all objects from the list AND delete all heap based objects.
Definition: TList.cxx:405
void ActivateAsyncInput()
Activate the a-sync input handler.
Definition: TProof.cxx:4411
void AskParallel()
Ask the for the number of parallel slaves.
Definition: TProof.cxx:2084
TPMERegexp * fReInvalid
Definition: TProofLite.h:69
Double_t RealTime()
Stop the stopwatch (if it is running) and return the realtime (in seconds) passed between the start a...
Definition: TStopwatch.cxx:108
Int_t GetSeqNum() const
Definition: TQueryResult.h:123
void ShowDataSetCache(const char *dataset=0)
Display the content of the dataset cache, if any (matching 'dataset', if defined).
TMonitor * fAllUniqueMonitor
Definition: TProof.h:517
virtual Int_t ClearCache(const char *uri)
Clear cached information matching uri.
virtual void AddInput(TObject *inp)=0
long long Long64_t
Definition: RtypesCore.h:69
TFileCollection * GetDataSet(const char *uri, const char *=0)
Get a list of TFileInfo objects describing the files of the specified dataset.
const char * GetDataDir() const
Definition: TProof.h:255
TSocket * GetSocket() const
Definition: TSlave.h:138
virtual TDSetElement * Next(Long64_t totalEntries=-1)
Returns next TDSetElement.
Definition: TDSet.cxx:394
Bool_t IsValid() const
Definition: TProof.h:973
void PrepareInputDataFile(TString &dataFile)
Prepare the file with the input data objects to be sent the master; the objects are taken from the de...
Definition: TProof.cxx:10217
Int_t Load(const char *macro, Bool_t notOnClient=kFALSE, Bool_t uniqueOnly=kTRUE, TList *wrks=0)
Copy the specified macro in the cache directory.
void SetPerfTree(const char *pf="perftree.root", Bool_t withWrks=kFALSE)
Enable/Disable saving of the performance tree.
Definition: TProof.cxx:13203
virtual const char * WorkingDirectory()
Return working directory.
Definition: TSystem.cxx:865
static TMD5 * FileChecksum(const char *file)
Returns checksum of specified file.
Definition: TMD5.cxx:473
TString fConfDir
Definition: TProof.h:603
Bool_t ExistsDataSet(const char *uri)
Returns kTRUE if 'dataset' described by 'uri' exists, kFALSE otherwise.
const char *const kCP
Definition: TProof.h:167
const char *const kLS
Definition: TProof.h:169
Ssiz_t Length() const
Definition: TString.h:390
Int_t fOtherQueries
Definition: TProof.h:554
Collectable string class.
Definition: TObjString.h:32
float Float_t
Definition: RtypesCore.h:53
TMonitor * mon
Definition: hserv2.C:32
virtual TVirtualProofPlayer * MakePlayer(const char *player=0, TSocket *s=0)
Construct a TProofPlayer object.
Definition: TProof.cxx:10789
const char Option_t
Definition: RtypesCore.h:62
virtual ~TProofLite()
Destructor.
Definition: TProofLite.cxx:401
Bool_t fSync
Definition: TProof.h:537
virtual const char * GetBuildArch() const
Return the build architecture.
Definition: TSystem.cxx:3638
Bool_t RegisterDataSet(const char *dsName, TFileCollection *ds, const char *opt="")
Register the 'dataSet' on the cluster under the current user, group and the given 'dataSetName'...
virtual Bool_t IsValid() const
Definition: TSocket.h:162
TFileCollection * GetStagingStatusDataSet(const char *dataset)
Obtains a TFileCollection showing the staging status of the specified dataset.
TString & ReplaceAll(const TString &s1, const TString &s2)
Definition: TString.h:635
void SetupWorkersEnv(TList *wrks, Bool_t increasingpool=kFALSE)
Set up packages, loaded macros, include and lib paths ...
Definition: TProof.cxx:1541
void SendInputDataFile()
Make sure that the input data objects are available to the workers in a dedicated file in the cache; ...
int GetPathInfo(const char *path, Long_t *id, Long_t *size, Long_t *flags, Long_t *modtime)
Get info about a file: id, size, flags, modification time.
Definition: TSystem.cxx:1311
virtual Long64_t DrawSelect(TDSet *set, const char *varexp, const char *selection, Option_t *option="", Long64_t nentries=-1, Long64_t firstentry=0)=0
virtual Int_t Recv(TMessage *&mess)
Receive a TMessage object.
Definition: TSocket.cxx:818
This class implements a data set to be used for PROOF processing.
Definition: TDSet.h:153
virtual void SetName(const char *name)
Change (i.e.
Definition: TNamed.cxx:128
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
TH1 * h
Definition: legend2.C:5
void SetParameter(const char *par, const char *value)
Set input list parameter.
Definition: TProof.cxx:10400
virtual Bool_t RemoveDataSet(const char *uri)
Removes the indicated dataset.
The PROOF manager interacts with the PROOF server coordinator to create or destroy a PROOF session...
Definition: TProofMgr.h:53
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition: TObject.cxx:892
static const TList * GetEnvVars()
Get environemnt variables.
Definition: TProof.cxx:12329
Int_t LockSession(const char *sessiontag, TProofLockPath **lck)
Try locking query area of session tagged sessiontag.
A ROOT file is a suite of consecutive data records (TKey instances) with a well defined format...
Definition: TFile.h:45
virtual EExitStatus GetExitStatus() const =0
TList * fAllUniqueSlaves
Definition: TProof.h:513
virtual int MakeDirectory(const char *name)
Make a directory.
Definition: TSystem.cxx:821
virtual void AddSignalHandler(TSignalHandler *sh)
Add a signal handler to list of system signal handlers.
Definition: TSystem.cxx:536
virtual const char * HomeDirectory(const char *userName=0)
Return the user's home directory.
Definition: TSystem.cxx:873
TString fSelection
Definition: TProofLite.h:60
TString fImage
Definition: TProof.h:604
virtual TFileCollection * GetDataSet(const char *uri, const char *server=0)
Utility function used in various methods for user dataset upload.
Int_t PollForNewWorkers()
Simulate dynamic addition, for test purposes.
virtual TObject * Get(const char *namecycle)
Return pointer to object identified by namecycle.
void SetSocket(TSocket *s)
Definition: TSlave.h:116
#define R__ASSERT(e)
Definition: TError.h:98
#define gROOT
Definition: TROOT.h:344
TString fLogFileName
Definition: TProof.h:542
TProofLockPath * fCacheLock
Definition: TProofLite.h:62
virtual void Add(TSocket *sock, Int_t interest=kRead)
Add socket to the monitor's active list.
Definition: TMonitor.cxx:168
Int_t LoadPlugin()
Load the plugin library for this handler.
virtual void SetCurrentQuery(TQueryResult *q)=0
virtual const char * TempDirectory() const
Return a user configured or systemwide directory to create temporary files in.
Definition: TSystem.cxx:1395
TList * fQueries
Definition: TProof.h:553
The TEnv class reads config files, by default named .rootrc.
Definition: TEnv.h:128
Basic string class.
Definition: TString.h:137
void ClearDataSetCache(const char *dataset=0)
Clear the content of the dataset cache, if any (matching 'dataset', if defined).
Int_t SetDataSetTreeName(const char *dataset, const char *treename)
Set/Change the name of the default tree.
int Int_t
Definition: RtypesCore.h:41
virtual const char * DirName(const char *pathname)
Return the directory name in pathname.
Definition: TSystem.cxx:980
bool Bool_t
Definition: RtypesCore.h:59
void NotifyStartUp(const char *action, Int_t done, Int_t tot)
Notify setting-up operation message.
Definition: TProofLite.cxx:703
TQueryResult * CloneInfo()
Return an instance of TQueryResult containing only the local info fields, i.e.
TList * fUniqueSlaves
Definition: TProof.h:512
virtual Bool_t JoinProcess(TList *workers)=0
R__EXTERN TVirtualMutex * gROOTMutex
Definition: TROOT.h:63
const Bool_t kFALSE
Definition: Rtypes.h:92
TList * fWaitingSlaves
Definition: TProof.h:552
virtual TObject * FindObject(const char *name) const
Find an object in this list using its name.
Definition: TList.cxx:497
Int_t Broadcast(const TMessage &mess, TList *slaves)
Broadcast a message to all slaves in the specified list.
Definition: TProof.cxx:2482
const char *const kRM
Definition: TProof.h:168
Int_t GetNumberOfBadSlaves() const
Return number of bad slaves.
Definition: TProof.cxx:2021
virtual void RemoveAll()
Remove all sockets from the monitor.
Definition: TMonitor.cxx:241
virtual void ShowDataSets(const char *uri="*", const char *opt="")
Prints formatted information about the dataset 'uri'.
const char *const kPROOF_PackageLockFile
Definition: TProof.h:158
This class represents a RFC 3986 compatible URI.
Definition: TUri.h:39
Int_t DrawQueries() const
Long_t fMtime
Definition: TSystem.h:142
void Print(Option_t *option="") const
Print status of PROOF-Lite cluster.
Definition: TProofLite.cxx:992
static Int_t fgWrksMax
Definition: TProofLite.h:71
R__EXTERN TApplication * gApplication
Definition: TApplication.h:171
virtual void DeActivateAll()
De-activate all activated sockets.
Definition: TMonitor.cxx:302
void ShowData()
List contents of the data directory in the sandbox.
void ShowDataDir(const char *dirname)
List contents of the data directory 'dirname'.
Long_t ExecPlugin(int nargs, const T &...params)
void ScanPreviousQueries(const char *dir)
Scan the queries directory for the results of previous queries.
TLatex * t1
Definition: textangle.C:20
Bool_t BeginsWith(const char *s, ECaseCompare cmp=kExact) const
Definition: TString.h:558
static void ResolveKeywords(TString &fname, const char *path=0)
Replace <ord>, <user>, <u>, <group>, <stag>, <qnum>, <file>, <rver> and <build> placeholders in fname...
Long64_t GetBytesRead() const
Definition: TProof.h:965
TString & Insert(Ssiz_t pos, const char *s)
Definition: TString.h:592
TList * fInputData
Definition: TProof.h:569
Int_t SendCurrentState(ESlaves list=kActive)
Transfer the current state of the master to the active slave servers.
Definition: TProof.cxx:6756
TSignalHandler * GetSignalHandler() const
Definition: TApplication.h:112
TVirtualProofPlayer * fPlayer
Definition: TProof.h:525
TFile * f
Bool_t R_ISREG(Int_t mode)
Definition: TSystem.h:129
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
Definition: TObject.cxx:732
void ResolveKeywords(TString &s, const char *ord, const char *logfile)
Resolve some keywords in 's' <logfilewrk>, <user>, <rootsys>, <cpupin>
Definition: TProofLite.cxx:845
TString & Replace(Ssiz_t pos, Ssiz_t n, const char *s)
Definition: TString.h:625
static const char * GetMacroPath()
Get macro search path. Static utility function.
Definition: TROOT.cxx:2446
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=1, Int_t netopt=0)
Create / open a file.
Definition: TFile.cxx:3851
Int_t fMode
Definition: TSystem.h:138
const char * GetObjName() const
Definition: TDSet.h:122
Bool_t fForkStartup
Definition: TProofLite.h:54
virtual Long64_t Process(TDSet *set, const char *selector, Option_t *option="", Long64_t nentries=-1, Long64_t firstentry=0)=0
Int_t SavePerfTree(const char *pf=0, const char *qref=0)
Save performance information from TPerfStats to file 'pf'.
Definition: TProof.cxx:13225
TDataSetManagerFile * fDataSetStgRepo
Definition: TProofLite.h:67
const char * Data() const
Definition: TString.h:349
TSignalHandler * fIntHandler
Definition: TProof.h:522
Manages an element of a TDSet.
Definition: TDSet.h:68
Int_t fDrawQueries
Definition: TProof.h:555
TList * fChains
Definition: TProof.h:527
static struct mg_connection * fc(struct mg_context *ctx)
Definition: civetweb.c:839
Int_t Update(Long64_t avgsize=-1)
Update accumulated information about the elements of the collection (e.g.
virtual const char * GetDirEntry(void *dirp)
Get a directory entry. Returns 0 if no more entries.
Definition: TSystem.cxx:847
#define SafeDelete(p)
Definition: RConfig.h:436
TPluginHandler * fProgressDialog
Definition: TProof.h:523
TList * fBadSlaves
Definition: TProof.h:608
virtual TList * GetOutputList() const =0
virtual int Unlink(const char *name)
Unlink, i.e. remove, a file.
Definition: TSystem.cxx:1294
TString fDataSetDir
Definition: TProofLite.h:51
Int_t CreateSymLinks(TList *files, TList *wrks=0)
Create in each worker sandbox symlinks to the files in the list Used to make the cache information av...
Double_t dot(const TVector2 &v1, const TVector2 &v2)
Definition: CsgOps.cxx:333
Bool_t fSendGroupView
list returned by kPROOF_GETSLAVEINFO
Definition: TProof.h:505
void Stop()
Stop the stopwatch.
Definition: TStopwatch.cxx:75
static TString Format(const char *fmt,...)
Static method which formats a string using a printf style format descriptor and return a TString...
Definition: TString.cxx:2321
#define PDB(mask, level)
Definition: TProofDebug.h:58
void UpdateDialog()
Final update of the progress dialog.
Definition: TProof.cxx:4354
TList * fEnabledPackagesOnClient
Definition: TProof.h:566
THashList * fGlobalPackageDirList
Definition: TProof.h:564
THashList implements a hybrid collection class consisting of a hash table and a list to store TObject...
Definition: THashList.h:36
TDataSetManager * fDataSetManager
Definition: TProofLite.h:66
const char * ord
Definition: TXSlave.cxx:46
TSlave * CreateSlave(const char *url, const char *ord, Int_t perf, const char *image, const char *workdir)
Create a new TSlave of type TSlave::kSlave.
Definition: TProof.cxx:1860
This code implements the MD5 message-digest algorithm.
Definition: TMD5.h:46
TString fCacheDir
Definition: TProofLite.h:49
The TNamed class is the base class for all named ROOT classes.
Definition: TNamed.h:33
Long64_t Process(TDSet *dset, const char *sel, Option_t *o="", Long64_t nent=-1, Long64_t fst=0)
Process a data set (TDSet) using the specified selector (.C) file.
EQueryMode GetQueryMode(Option_t *mode=0) const
Find out the query mode based on the current setting and 'mode'.
Definition: TProof.cxx:6117
const char *const kPROOF_QueryDir
Definition: TProof.h:154
UChar_t mod R__LOCKGUARD2(gSrvAuthenticateMutex)
Int_t Init(const char *masterurl, const char *conffile, const char *confdir, Int_t loglevel, const char *alias=0)
Start the PROOF environment.
Definition: TProofLite.cxx:153
virtual Int_t RegisterDataSet(const char *uri, TFileCollection *dataSet, const char *opt)
Register a dataset, perfoming quota checkings, if needed.
int d
Definition: tornado.py:11
void Init(TClassEdit::TInterpreterLookupHelper *helper)
Definition: TClassEdit.cxx:118
Int_t fParallel
Definition: TSlave.h:101
Bool_t CancelStagingDataSet(const char *dataset)
Cancels a dataset staging request.
virtual const char * Getenv(const char *env)
Get environment variable.
Definition: TSystem.cxx:1575
TProofLockPath * fPackageLock
Definition: TProof.h:565
Int_t Collect(const TSlave *sl, Long_t timeout=-1, Int_t endtype=-1, Bool_t deactonfail=kFALSE)
Collect responses from slave sl.
Definition: TProof.cxx:2676
TList * GetListOfElements() const
Definition: TDSet.h:231
static Int_t RegisterDataSets(TList *in, TList *out, TDataSetManager *dsm, TString &e)
Register TFileCollections in 'out' as datasets according to the rules in 'in'.
Int_t ApplyMaxQueries(Int_t mxq)
Scan the queries directory and remove the oldest ones (and relative dirs, if empty) in such a way onl...
A sorted doubly linked list.
Definition: TSortedList.h:30
TString fConfFile
Definition: TProof.h:602
std::vector< std::vector< double > > Data
tuple np
Definition: multifit.py:30
virtual UserGroup_t * GetUserInfo(Int_t uid)
Returns all user info in the UserGroup_t structure.
Definition: TSystem.cxx:1511
const char * GetUser() const
Definition: TProof.h:942
Int_t Atoi() const
Return integer value of string.
Definition: TString.cxx:1951
TQueryResult * GetQueryResult(const char *ref=0)
Return pointer to the full TQueryResult instance owned by the player and referenced by 'ref'...
Definition: TProof.cxx:2155
Bool_t IsParallel() const
Definition: TProof.h:975
TList * fSlaves
Definition: TProof.h:606
Int_t fNotIdle
Definition: TProof.h:536
virtual void SetOutputList(TList *out, Bool_t adopt=kTRUE)
Set / change the output list.
virtual TSocket * Accept(UChar_t Opt=0)
Accept a connection on a server socket.
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition: TObject.cxx:918
const char * GetWorkDir() const
Definition: TSlave.h:131
const Bool_t kSortDescending
Definition: TList.h:41
friend class TProofInputHandler
Definition: TProof.h:350
TList * fInactiveSlaves
Definition: TProof.h:511
A container class for query results.
Definition: TQueryResult.h:44
Int_t fCollectTimeout
Definition: TProof.h:617
Long64_t GetEntries() const
Definition: TQueryResult.h:130
TList * GetListOfSlaveInfos()
Returns list of TSlaveInfo's. In case of error return 0.
Definition: TProof.cxx:2328
Int_t fDynamicStartupNMax
Definition: TProofLite.h:57
static TList * fgProofEnvList
Definition: TProof.h:579
TSocket * Select()
Return pointer to socket for which an event is waiting.
Definition: TMonitor.cxx:322
virtual void SetProcessInfo(Long64_t ent, Float_t cpu=0., Long64_t siz=-1, Float_t inittime=0., Float_t proctime=0.)
Set processing info.
void Emit(const char *signal)
Acitvate signal without args.
Definition: TQObject.cxx:559
TString fQueryDir
Definition: TProofLite.h:50
void SetTermTime(Float_t termtime)
Definition: TQueryResult.h:108
TString fSandbox
Definition: TProofLite.h:48
A doubly linked list.
Definition: TList.h:47
TObject * GetEntryList() const
Definition: TDSet.h:251
Bool_t fRedirLog
Definition: TProof.h:541
TMonitor * fUniqueMonitor
Definition: TProof.h:516
const char *const kPROOF_ConfFile
Definition: TProof.h:148
Bool_t RemoveDataSet(const char *group, const char *user, const char *dsName)
Removes the indicated dataset.
Int_t RemoveDataSet(const char *uri, const char *=0)
Remove the specified dataset from the PROOF cluster.
virtual Int_t ReadFile(const char *fname, EEnvLevel level)
Read and parse the resource file for a certain level.
Definition: TEnv.cxx:595
const char *const kPROOF_QueryLockFile
Definition: TProof.h:159
Int_t GetNumberOfActiveSlaves() const
Return number of active slaves, i.e.
Definition: TProof.cxx:1994
TMonitor * fAllMonitor
Definition: TProof.h:609
virtual void AddQueryResult(TQueryResult *q)=0
void ShowDataSets(const char *uri="", const char *=0)
Shows datasets in locations that match the uri By default shows the user's datasets and global ones...
virtual TMap * GetSubDataSets(const char *uri, const char *excludeservers)
Partition dataset 'ds' accordingly to the servers.
tuple outfile
Definition: mrt.py:21
TThread * t[5]
Definition: threadsh1.C:13
TString fUser
Definition: TSystem.h:152
Int_t fLogLevel
Definition: TProof.h:500
TList * fActiveSlaves
Definition: TProof.h:508
Long64_t DrawSelect(TDSet *dset, const char *varexp, const char *selection="", Option_t *option="", Long64_t nentries=-1, Long64_t firstentry=0)
Execute the specified drawing action on a data set (TDSet).
void QueryResultReady(const char *ref)
Notify availability of a query result.
Definition: TProof.cxx:9947
Int_t fNWorkers
Definition: TProofLite.h:47
Bool_t fValid
Definition: TProof.h:495
const char * GetFileName() const
Definition: TDSet.h:113
virtual void Setenv(const char *name, const char *value)
Set environment variable.
Definition: TSystem.cxx:1559
Int_t fCpus
Definition: TSystem.h:165
ROOT::R::TRInterface & r
Definition: Object.C:4
Bool_t EndsWith(const char *pat, ECaseCompare cmp=kExact) const
Return true if string ends with the specified string.
Definition: TString.cxx:2207
Class managing the query-result area.
R__EXTERN TSystem * gSystem
Definition: TSystem.h:545
TString fWorkDir
Definition: TProof.h:498
const TString GetUri() const
Returns the whole URI - an implementation of chapter 5.3 component recomposition. ...
Definition: TUri.cxx:139
SVector< double, 2 > v
Definition: Dict.h:5
const char *const kPROOF_DataSetDir
Definition: TProof.h:155
TMonitor * fCurrentMonitor
Definition: TProof.h:518
THashList * GetList()
Int_t SetupWorkers(Int_t opt=0, TList *wrks=0)
Start up PROOF workers.
Definition: TProofLite.cxx:525
virtual Int_t GetValue(const char *name, Int_t dflt)
Returns the integer value for a resource.
Definition: TEnv.cxx:494
Int_t GetLogLevel() const
Definition: TProof.h:952
virtual TObject * Remove(TObject *obj)
Remove object from the list.
Definition: TList.cxx:675
const char *const kPROOF_ConfDir
Definition: TProof.h:149
Bool_t Gets(FILE *fp, Bool_t chop=kTRUE)
Read one line from the stream, including the , or until EOF.
Definition: Stringio.cxx:198
virtual Bool_t ExistsDataSet(const char *uri)
Checks if the indicated dataset exits.
Bool_t ParseUri(const char *uri, TString *dsGroup=0, TString *dsUser=0, TString *dsName=0, TString *dsTree=0, Bool_t onlyCurrent=kFALSE, Bool_t wildcards=kFALSE)
Parses a (relative) URI that describes a DataSet on the cluster.
Int_t fSeqNum
Definition: TProof.h:557
void ClearCache(const char *file=0)
Remove files from all file caches.
Int_t fDynamicStartupStep
Definition: TProofLite.h:56
virtual Int_t ShowCache(const char *uri)
Show cached information matching uri.
void SetActive(Bool_t=kTRUE)
Definition: TProof.h:1024
Int_t WriteDataSet(const char *group, const char *user, const char *dsName, TFileCollection *dataset, UInt_t option=0, TMD5 *checksum=0)
Writes indicated dataset.
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
Definition: TString.cxx:2308
TList * fSlaveInfo
Definition: TProof.h:504
unsigned int UInt_t
Definition: RtypesCore.h:42
TList * GetInputList()
Definition: TQueryResult.h:128
Bool_t TestBit(UInt_t f) const
Definition: TObject.h:173
TMarker * m
Definition: textangle.C:8
const char *const kPROOF_CacheLockFile
Definition: TProof.h:157
Int_t ScanDataSet(const char *uri, const char *opt)
Scans the dataset indicated by 'uri' following the 'opts' directives.
char * Form(const char *fmt,...)
void SetRunning(Int_t startlog, const char *par, Int_t nwrks)
Call when running starts.
bool first
Definition: line3Dfit.C:48
TServerSocket * fServSock
Definition: TProofLite.h:53
virtual const char * GetName() const
Returns name of object.
Definition: TNamed.h:51
Bool_t SetFragment(const TString &fragment)
Set fragment component of URI: fragment = *( pchar / "/" / "?" ).
Definition: TUri.cxx:497
TList * GetListOfQueries(Option_t *opt="")
Get the list of queries.
Bool_t fProgressDialogStarted
Definition: TProof.h:524
virtual Int_t Exec(const char *shellcmd)
Execute a command.
Definition: TSystem.cxx:657
const Int_t kPROOF_Protocol
Definition: TProof.h:146
Int_t SendGroupView()
Send to all active slaves servers the current slave group size and their unique id.
Definition: TProof.cxx:6458
TStopwatch fQuerySTW
Definition: TProof.h:627
Bool_t FinalizeQuery(TProofQueryResult *pq, TProof *proof, TVirtualProofPlayer *player)
Final steps after Process() to complete the TQueryResult instance.
Int_t CountChar(Int_t c) const
Return number of times character c occurs in the string.
Definition: TString.cxx:430
Bool_t IsNull() const
Definition: TString.h:387
Int_t RemoveWorkers(TList *wrks)
Used for shuting down the workres after a query is finished.
Definition: TProof.cxx:1604
FILE * fLogFileR
Definition: TProof.h:544
virtual const char * GetBuildCompilerVersion() const
Return the build compiler version.
Definition: TSystem.cxx:3654
void SetName(const char *name)
Definition: TCollection.h:116
Int_t fProtocol
Definition: TProof.h:605
static Int_t GetNumberOfWorkers(const char *url=0)
Static method to determine the number of workers giving priority to users request.
Definition: TProofLite.cxx:432
Bool_t fLogToWindowOnly
Definition: TProof.h:545
TList * fFeedback
Definition: TProof.h:526
virtual void FreeDirectory(void *dirp)
Free a directory.
Definition: TSystem.cxx:839
void AddInput(TObject *obj)
Add objects that might be needed during the processing of the selector (see Process()).
Definition: TProof.cxx:10312
TObjArray * Tokenize(const TString &delim) const
This function is used to isolate sequential tokens in a TString.
Definition: TString.cxx:2227
#define Printf
Definition: TGeoToOCC.h:18
TMap * GetDataSets(const char *uri="", const char *=0)
lists all datasets that match given uri
TList * fRunningDSets
Definition: TProof.h:615
Int_t VerifyDataSetParallel(const char *uri, const char *optStr)
Internal function for parallel dataset verification used TProof::VerifyDataSet and TProofLite::Verify...
Definition: TProof.cxx:11759
Bool_t fDynamicStartup
Definition: TProof.h:623
virtual void RemoveQueryResult(const char *ref)=0
static TSelector * GetSelector(const char *filename)
The code in filename is loaded (interpreted or compiled, see below), filename must contain a valid cl...
Definition: TSelector.cxx:140
Int_t AssertPath(const char *path, Bool_t writable)
Make sure that 'path' exists; if 'writable' is kTRUE, make also sure that the path is writable...
Definition: TProof.cxx:1282
virtual TObject * Last() const
Return the last object in the list. Returns 0 when list is empty.
Definition: TList.cxx:581
Int_t CreateSandbox()
Create the sandbox for this session.
Definition: TProofLite.cxx:924
TString & Remove(Ssiz_t pos)
Definition: TString.h:616
long Long_t
Definition: RtypesCore.h:50
Float_t GetRealTime() const
Definition: TProof.h:966
int Ssiz_t
Definition: RtypesCore.h:63
TString fSockPath
Definition: TProofLite.h:52
Int_t HandleOutputOptions(TString &opt, TString &target, Int_t action)
Extract from opt information about output handling settings.
Definition: TProof.cxx:4938
R__EXTERN TProof * gProof
Definition: TProof.h:1113
virtual TSignalHandler * RemoveSignalHandler(TSignalHandler *sh)
Remove a signal handler from list of signal handlers.
Definition: TSystem.cxx:546
Bool_t fEndMaster
Definition: TProof.h:561
virtual Int_t GetSize() const
Definition: TCollection.h:95
void SetFeedback(TString &opt, TString &optfb, Int_t action)
Extract from opt in optfb information about wanted feedback settings.
Definition: TProof.cxx:5233
#define ClassImp(name)
Definition: Rtypes.h:279
tuple file
Definition: fildir.py:20
Int_t GoParallel(Int_t nodes, Bool_t accept=kFALSE, Bool_t random=kFALSE)
Go in parallel mode with at most "nodes" slaves.
Definition: TProof.cxx:7271
virtual const char * HostName()
Return the system's host name.
Definition: TSystem.cxx:307
virtual int Symlink(const char *from, const char *to)
Create a symbolic link from file1 to file2.
Definition: TSystem.cxx:1285
Int_t Lock()
Locks the directory.
TList * fEnabledPackages
Definition: TProof.h:614
TList * fNonUniqueMasters
Definition: TProof.h:514
TMap implements an associative array of (key,value) pairs using a THashTable for efficient retrieval ...
Definition: TMap.h:44
R__EXTERN TEnv * gEnv
Definition: TEnv.h:174
Int_t CleanupSandbox()
Remove old sessions dirs keep at most 'Proof.MaxOldSessions' (default 10)
TList * fRecvMessages
Definition: TProof.h:503
const char * GetOrdinal() const
Definition: TProofServ.h:266
TNamed()
Definition: TNamed.h:40
TList * fEnabledPackagesOnCluster
Definition: TProof.h:567
int nentries
Definition: THbookFile.cxx:89
virtual void Reset()
Reset or initialize access to the elements.
Definition: TDSet.cxx:1347
void SetRunStatus(ERunStatus rst)
Definition: TProof.h:710
void RemoveQuery(TQueryResult *qr, Bool_t soft=kFALSE)
Remove everything about query qr.
Int_t Unlock()
Unlock the directory.
virtual void StopProcess(Bool_t abort, Int_t timeout=-1)=0
virtual void Clear(Option_t *option="")
Remove all objects from the list.
Definition: TList.cxx:349
Int_t GetSandbox(TString &sb, Bool_t assert=kFALSE, const char *rc=0)
Set the sandbox path from ' Proof.Sandbox' or the alternative var 'rc'.
Definition: TProof.cxx:1033
TQueryResult version adapted to PROOF neeeds.
static TMap * GetDataSetNodeMap(TFileCollection *fc, TString &emsg)
Get a map {server-name, list-of-files} for collection 'fc' to be used in TPacketizerFile.
void SaveQuery(TProofQueryResult *qr, const char *fout=0)
Save current status of query 'qr' to file name fout.
Int_t Match(const TString &s, UInt_t start=0)
Runs a match on s against the regex 'this' was created with.
Definition: TPRegexp.cxx:704
#define name(a, b)
Definition: linkTestLib0.cpp:5
FILE * fLogFileW
Definition: TProof.h:543
Mother of all ROOT objects.
Definition: TObject.h:58
Float_t GetCpuTime() const
Definition: TProof.h:967
virtual TObject * First() const
Return the first object in the list. Returns 0 when list is empty.
Definition: TList.cxx:557
Bool_t IsDigit() const
Returns true if all characters in string are digits (0-9) or white spaces, i.e.
Definition: TString.cxx:1793
TSelector * fSelector
Definition: TProof.h:625
Int_t GetNumberOfSlaves() const
Return number of slaves as described in the config file.
Definition: TProof.cxx:1985
TString fVarExp
Definition: TProofLite.h:59
Bool_t ExistsDataSet(const char *group, const char *user, const char *dsName)
Checks if the indicated dataset exits.
Int_t InitDataSetManager()
Initialize the dataset manager from directives or from defaults Return 0 on success, -1 on failure.
Bool_t R_ISDIR(Int_t mode)
Definition: TSystem.h:126
Bool_t IsIdle() const
Definition: TProof.h:976
static void SetMacroPath(const char *newpath)
Set or extend the macro search path.
Definition: TROOT.cxx:2480
R__EXTERN TProofServ * gProofServ
Definition: TProofServ.h:360
virtual void Add(TObject *obj)
Definition: TList.h:81
const Ssiz_t kNPOS
Definition: Rtypes.h:115
Wrapper for PCRE library (Perl Compatible Regular Expressions).
Definition: TPRegexp.h:103
Class that contains a list of TFileInfo's and accumulated meta data information about its entries...
TProofOutputList fOutputList
Definition: TProof.h:572
R__EXTERN const char * gRootDir
Definition: TSystem.h:233
TProofLockPath * fQueryLock
Definition: TProofLite.h:63
TFileCollection * GetDataSet(const char *uri, const char *srv=0)
Utility function used in various methods for user dataset upload.
TList * Queries() const
Int_t CleanupQueriesDir()
Remove all queries results referring to previous sessions.
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
Definition: TString.h:567
Int_t fSessionID
Definition: TProof.h:559
void AskStatistics()
Ask the for the statistics of the slaves.
Definition: TProof.cxx:2029
TF1 * f1
Definition: legend1.C:11
Int_t GetNumberOfInactiveSlaves() const
Return number of inactive slaves, i.e.
Definition: TProof.cxx:2003
TTree * GetTreeHeader(TDSet *tdset)
Creates a tree header (a tree with nonexisting files) object for the DataSet.
Int_t fStatus
Definition: TProof.h:501
Int_t SetParallel(Int_t nodes=-1, Bool_t random=kFALSE)
Tell PROOF how many slaves to use in parallel.
Definition: TProof.cxx:7138
virtual int CopyFile(const char *from, const char *to, Bool_t overwrite=kFALSE)
Copy a file.
Definition: TSystem.cxx:1258
virtual void * OpenDirectory(const char *name)
Open a directory. Returns 0 if directory does not exist.
Definition: TSystem.cxx:830
ClassImp(TSlaveInfo) Int_t TSlaveInfo const TSlaveInfo * si
Used to sort slaveinfos by ordinal.
Definition: TProof.cxx:183
Int_t GetClientProtocol() const
Definition: TProof.h:950
R__EXTERN Int_t gDebug
Definition: Rtypes.h:128
void Reset()
Definition: TStopwatch.h:54
virtual Long64_t GetEntries() const
Definition: TTree.h:386
TList * fAvailablePackages
Definition: TProof.h:613
A TTree object has a header with a name and a title.
Definition: TTree.h:98
double result[121]
Class describing a generic file including meta information.
Definition: TFileInfo.h:50
Int_t SendInitialState()
Transfer the initial (i.e.
Definition: TProof.cxx:6772
void ResetBit(UInt_t f)
Definition: TObject.h:172
const AParamType & GetVal() const
Definition: TParameter.h:77
virtual Bool_t ExpandPathName(TString &path)
Expand a pathname getting rid of special shell characters like ~.
Definition: TSystem.cxx:1191
TList * fTerminatedSlaveInfos
Definition: TProof.h:607
virtual Bool_t IsValid() const
Definition: TSlave.h:154
void Add(TObject *obj)
TProofQueryResult * MakeQueryResult(Long64_t nent, const char *opt, Long64_t fst, TDSet *dset, const char *selec)
Create a TProofQueryResult instance for this query.
TList * fLoadedMacros
Definition: TProof.h:578
Int_t Remove(const char *ref, Bool_t all)
Handle remove request.
Ssiz_t Index(const char *pat, Ssiz_t i=0, ECaseCompare cmp=kExact) const
Definition: TString.h:582
virtual Int_t Load(const char *macro, Bool_t notOnClient=kFALSE, Bool_t uniqueOnly=kTRUE, TList *wrks=0)
Load the specified macro on master, workers and, if notOnClient is kFALSE, on the client...
Definition: TProof.cxx:9209
Int_t Substitute(TString &s, const TString &r, Bool_t doDollarSubst=kTRUE)
Substitute matching part of s with r, dollar back-ref substitution is performed if doDollarSubst is t...
Definition: TPRegexp.cxx:870
Class describing a PROOF worker server.
Definition: TSlave.h:50
void FindUniqueSlaves()
Add to the fUniqueSlave list the active slaves that have a unique (user) file system image...
A TSelector object is used by the TTree::Draw, TTree::Scan, TTree::Process to navigate in a TTree and...
Definition: TSelector.h:39
const Bool_t kTRUE
Definition: Rtypes.h:91
void ResetProgressDialog(const char *sel, Int_t sz, Long64_t fst, Long64_t ent)
Reset progress dialog.
Definition: TProof.cxx:9877
virtual void SetTitle(const char *title="")
Change (i.e. set) the title of the TNamed.
Definition: TNamed.cxx:152
TList * PreviousQueries() const
Bool_t fTty
Definition: TProof.h:496
virtual TMap * GetDataSets(const char *uri, UInt_t=TDataSetManager::kExport)
Returns all datasets for the <group> and <user> specified by <uri>.
void ShowCache(Bool_t all=kFALSE)
List contents of file cache.
virtual char * ConcatFileName(const char *dir, const char *name)
Concatenate a directory and a file name. User must delete returned string.
Definition: TSystem.cxx:1028
tuple all
Definition: na49view.py:13
Int_t GetParallel() const
Returns number of slaves active in parallel mode.
Definition: TProof.cxx:2311
const Int_t n
Definition: legend1.C:16
virtual Int_t SetupServ(Int_t stype, const char *conffile)
Init a PROOF slave object.
Definition: TSlave.cxx:179
TMonitor * fActiveMonitor
Definition: TProof.h:515
virtual TList * GetListOfResults() const =0
virtual int GetSysInfo(SysInfo_t *info) const
Returns static system info, like OS type, CPU type, number of CPUs RAM size, etc into the SysInfo_t s...
Definition: TSystem.cxx:2360
Long64_t fLastPollWorkers_s
Definition: TProof.h:507
Int_t fMaxDrawQueries
Definition: TProof.h:556
Int_t CopyMacroToCache(const char *macro, Int_t headerRequired=0, TSelector **selector=0, Int_t opt=0, TList *wrks=0)
Copy a macro, and its possible associated .h[h] file, to the cache directory, from where the workers ...
const char *const kPROOF_CacheDir
Definition: TProof.h:151
const char *const kPROOF_PackDir
Definition: TProof.h:152
void SetInputHandler(TFileHandler *ih)
Adopt and register input handler for this slave.
Definition: TSlave.cxx:394
static Int_t AssertDataSet(TDSet *dset, TList *input, TDataSetManager *mgr, TString &emsg)
Make sure that dataset is in the form to be processed.
Definition: TProof.cxx:12593
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition: TObject.cxx:904