ROOT  6.07/01
Reference Guide
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
TProofPlayer.cxx
Go to the documentation of this file.
1 // @(#)root/proofplayer:$Id$
2 // Author: Maarten Ballintijn 07/01/02
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2001, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 /** \class TProofPlayer
13 \ingroup proofkernel
14 
15 Internal class steering processing in PROOF.
16 Instances of the TProofPlayer class are created on the worker nodes
17 per session and do the processing.
18 Instances of its subclass - TProofPlayerRemote are created per each
19 query on the master(s) and on the client. On the master(s),
20 TProofPlayerRemote coordinate processing, check the dataset, create
21 the packetizer and take care of merging the results of the workers.
22 The instance on the client collects information on the input
23 (dataset and selector), it invokes the Begin() method and finalizes
24 the query by calling Terminate().
25 
26 */
27 
28 #include "TProofDraw.h"
29 #include "TProofPlayer.h"
30 #include "THashList.h"
31 #include "TEnv.h"
32 #include "TEventIter.h"
33 #include "TVirtualPacketizer.h"
34 #include "TSelector.h"
35 #include "TSocket.h"
36 #include "TProofServ.h"
37 #include "TProof.h"
38 #include "TProofOutputFile.h"
39 #include "TProofSuperMaster.h"
40 #include "TSlave.h"
41 #include "TClass.h"
42 #include "TROOT.h"
43 #include "TError.h"
44 #include "TException.h"
45 #include "MessageTypes.h"
46 #include "TMessage.h"
47 #include "TDSetProxy.h"
48 #include "TString.h"
49 #include "TSystem.h"
50 #include "TFile.h"
51 #include "TFileCollection.h"
52 #include "TFileInfo.h"
53 #include "TFileMerger.h"
54 #include "TProofDebug.h"
55 #include "TTimer.h"
56 #include "TMap.h"
57 #include "TPerfStats.h"
58 #include "TStatus.h"
59 #include "TEventList.h"
60 #include "TProofLimitsFinder.h"
61 #include "THashList.h"
62 #include "TSortedList.h"
63 #include "TTree.h"
64 #include "TEntryList.h"
65 #include "TDSet.h"
66 #include "TDrawFeedback.h"
67 #include "TNamed.h"
68 #include "TObjString.h"
69 #include "TQueryResult.h"
70 #include "TMD5.h"
71 #include "TMethodCall.h"
72 #include "TObjArray.h"
73 #include "TH1.h"
74 #include "TVirtualMonitoring.h"
75 #include "TParameter.h"
77 #include "TStopwatch.h"
78 
79 // Timeout exception
80 #define kPEX_STOPPED 1001
81 #define kPEX_ABORTED 1002
82 
83 // To flag an abort condition: use a local static variable to avoid
84 // warnings about problems with longjumps
85 static Bool_t gAbort = kFALSE;
86 
87 class TAutoBinVal : public TNamed {
88 private:
89  Double_t fXmin, fXmax, fYmin, fYmax, fZmin, fZmax;
90 
91 public:
92  TAutoBinVal(const char *name, Double_t xmin, Double_t xmax, Double_t ymin,
93  Double_t ymax, Double_t zmin, Double_t zmax) : TNamed(name,"")
94  {
95  fXmin = xmin; fXmax = xmax;
96  fYmin = ymin; fYmax = ymax;
97  fZmin = zmin; fZmax = zmax;
98  }
99  void GetAll(Double_t& xmin, Double_t& xmax, Double_t& ymin,
100  Double_t& ymax, Double_t& zmin, Double_t& zmax)
101  {
102  xmin = fXmin; xmax = fXmax;
103  ymin = fYmin; ymax = fYmax;
104  zmin = fZmin; zmax = fZmax;
105  }
106 
107 };
108 
109 //
110 // Special timer to dispatch pending events while processing
111 ////////////////////////////////////////////////////////////////////////////////
112 
113 class TDispatchTimer : public TTimer {
114 private:
115  TProofPlayer *fPlayer;
116 
117 public:
118  TDispatchTimer(TProofPlayer *p) : TTimer(1000, kFALSE), fPlayer(p) { }
119 
120  Bool_t Notify();
121 };
122 ////////////////////////////////////////////////////////////////////////////////
123 /// Handle expiration of the timer associated with dispatching pending
124 /// events while processing. We must act as fast as possible here, so
125 /// we just set a flag submitting a request for dispatching pending events
126 
128 {
129  if (gDebug > 0) printf("TDispatchTimer::Notify: called!\n");
130 
131  fPlayer->SetBit(TProofPlayer::kDispatchOneEvent);
132 
133  // Needed for the next shot
134  Reset();
135  return kTRUE;
136 }
137 
138 //
139 // Special timer to notify reach of max packet proc time
140 ////////////////////////////////////////////////////////////////////////////////
141 
142 class TProctimeTimer : public TTimer {
143 private:
144  TProofPlayer *fPlayer;
145 
146 public:
147  TProctimeTimer(TProofPlayer *p, Long_t to) : TTimer(to, kFALSE), fPlayer(p) { }
148 
149  Bool_t Notify();
150 };
151 ////////////////////////////////////////////////////////////////////////////////
152 /// Handle expiration of the timer associated with dispatching pending
153 /// events while processing. We must act as fast as possible here, so
154 /// we just set a flag submitting a request for dispatching pending events
155 
157 {
158  if (gDebug > 0) printf("TProctimeTimer::Notify: called!\n");
159 
160  fPlayer->SetBit(TProofPlayer::kMaxProcTimeReached);
161 
162  // One shot only
163  return kTRUE;
164 }
165 
166 //
167 // Special timer to handle stop/abort request via exception raising
168 ////////////////////////////////////////////////////////////////////////////////
169 
170 class TStopTimer : public TTimer {
171 private:
172  Bool_t fAbort;
173  TProofPlayer *fPlayer;
174 
175 public:
176  TStopTimer(TProofPlayer *p, Bool_t abort, Int_t to);
177 
178  Bool_t Notify();
179 };
180 
181 ////////////////////////////////////////////////////////////////////////////////
182 /// Constructor for the timer to stop/abort processing.
183 /// The 'timeout' is in seconds.
184 /// Make sure that 'to' make sense, i.e. not larger than 10 days;
185 /// the minimum value is 10 ms (0 does not seem to start the timer ...).
186 
187 TStopTimer::TStopTimer(TProofPlayer *p, Bool_t abort, Int_t to)
188  : TTimer(((to <= 0 || to > 864000) ? 10 : to * 1000), kFALSE)
189 {
190  if (gDebug > 0)
191  Info ("TStopTimer","enter: %d, timeout: %d", abort, to);
192 
193  fPlayer = p;
194  fAbort = abort;
195 
196  if (gDebug > 1)
197  Info ("TStopTimer","timeout set to %s ms", fTime.AsString());
198 }
199 
200 ////////////////////////////////////////////////////////////////////////////////
201 /// Handle the signal coming from the expiration of the timer
202 /// associated with an abort or stop request.
203 /// We raise an exception which will be processed in the
204 /// event loop.
205 
207 {
208  if (gDebug > 0) printf("TStopTimer::Notify: called!\n");
209 
210  if (fAbort)
212  else
214 
215  return kTRUE;
216 }
217 
218 //------------------------------------------------------------------------------
219 
221 
222 THashList *TProofPlayer::fgDrawInputPars = 0;
223 
224 ////////////////////////////////////////////////////////////////////////////////
225 /// Default ctor.
226 
228  : fAutoBins(0), fOutput(0), fSelector(0), fCreateSelObj(kTRUE), fSelectorClass(0),
229  fFeedbackTimer(0), fFeedbackPeriod(2000),
230  fEvIter(0), fSelStatus(0),
231  fTotalEvents(0), fReadBytesRun(0), fReadCallsRun(0), fProcessedRun(0),
232  fQueryResults(0), fQuery(0), fPreviousQuery(0), fDrawQueries(0),
233  fMaxDrawQueries(1), fStopTimer(0), fDispatchTimer(0),
234  fProcTimeTimer(0), fProcTime(0),
235  fOutputFile(0),
236  fSaveMemThreshold(-1), fSavePartialResults(kFALSE), fSaveResultsPerPacket(kFALSE)
237 {
238  fInput = new TList;
239  fExitStatus = kFinished;
240  fProgressStatus = new TProofProgressStatus();
242  ResetBit(TProofPlayer::kIsProcessing);
245 
246  static Bool_t initLimitsFinder = kFALSE;
247  if (!initLimitsFinder && gProofServ && !gProofServ->IsMaster()) {
249  initLimitsFinder = kTRUE;
250  }
251 
252 }
253 
254 ////////////////////////////////////////////////////////////////////////////////
255 /// Destructor.
256 
258 {
259  fInput->Clear("nodelete");
261  // The output list is owned by fSelector and destroyed in there
262  SafeDelete(fSelector);
263  SafeDelete(fFeedbackTimer);
264  SafeDelete(fEvIter);
265  SafeDelete(fQueryResults);
266  SafeDelete(fDispatchTimer);
267  SafeDelete(fProcTimeTimer);
268  SafeDelete(fProcTime);
269  SafeDelete(fStopTimer);
270 }
271 
272 ////////////////////////////////////////////////////////////////////////////////
273 /// Set processing bit according to 'on'
274 
276 {
277  if (on)
279  else
281 }
282 
283 ////////////////////////////////////////////////////////////////////////////////
284 /// Stop the process after this event. If timeout is positive, start
285 /// a timer firing after timeout seconds to hard-stop time-expensive
286 /// events.
287 
289 {
290  if (gDebug > 0)
291  Info ("StopProcess","abort: %d, timeout: %d", abort, timeout);
292 
293  if (fEvIter != 0)
294  fEvIter->StopProcess(abort);
295  Long_t to = 1;
296  if (abort == kTRUE) {
298  } else {
300  to = timeout;
301  }
302  // Start countdown, if needed
303  if (to > 0)
304  SetStopTimer(kTRUE, abort, to);
305 }
306 
307 ////////////////////////////////////////////////////////////////////////////////
308 /// Enable/disable the timer to dispatch pening events while processing.
309 
311 {
312  SafeDelete(fDispatchTimer);
314  if (on) {
315  fDispatchTimer = new TDispatchTimer(this);
316  fDispatchTimer->Start();
317  }
318 }
319 
320 ////////////////////////////////////////////////////////////////////////////////
321 /// Enable/disable the timer to stop/abort processing.
322 /// The 'timeout' is in seconds.
323 
325 {
326  std::lock_guard<std::mutex> lock(fStopTimerMtx);
327 
328  // Clean-up the timer
329  SafeDelete(fStopTimer);
330  if (on) {
331  // create timer
332  fStopTimer = new TStopTimer(this, abort, timeout);
333  // Start the countdown
334  fStopTimer->Start();
335  if (gDebug > 0)
336  Info ("SetStopTimer", "%s timer STARTED (timeout: %d)",
337  (abort ? "ABORT" : "STOP"), timeout);
338  } else {
339  if (gDebug > 0)
340  Info ("SetStopTimer", "timer STOPPED");
341  }
342 }
343 
344 ////////////////////////////////////////////////////////////////////////////////
345 /// Add query result to the list, making sure that there are no
346 /// duplicates.
347 
349 {
350  if (!q) {
351  Warning("AddQueryResult","query undefined - do nothing");
352  return;
353  }
354 
355  // Treat differently normal and draw queries
356  if (!(q->IsDraw())) {
357  if (!fQueryResults) {
358  fQueryResults = new TList;
359  fQueryResults->Add(q);
360  } else {
361  TIter nxr(fQueryResults);
362  TQueryResult *qr = 0;
363  TQueryResult *qp = 0;
364  while ((qr = (TQueryResult *) nxr())) {
365  // If same query, remove old version and break
366  if (*qr == *q) {
367  fQueryResults->Remove(qr);
368  delete qr;
369  break;
370  }
371  // Record position according to start time
372  if (qr->GetStartTime().Convert() <= q->GetStartTime().Convert())
373  qp = qr;
374  }
375 
376  if (!qp) {
377  fQueryResults->AddFirst(q);
378  } else {
379  fQueryResults->AddAfter(qp, q);
380  }
381  }
382  } else if (IsClient()) {
383  // If max reached, eliminate first the oldest one
384  if (fDrawQueries == fMaxDrawQueries && fMaxDrawQueries > 0) {
385  TIter nxr(fQueryResults);
386  TQueryResult *qr = 0;
387  while ((qr = (TQueryResult *) nxr())) {
388  // If same query, remove old version and break
389  if (qr->IsDraw()) {
390  fDrawQueries--;
391  fQueryResults->Remove(qr);
392  delete qr;
393  break;
394  }
395  }
396  }
397  // Add new draw query
398  if (fDrawQueries >= 0 && fDrawQueries < fMaxDrawQueries) {
399  fDrawQueries++;
400  if (!fQueryResults)
401  fQueryResults = new TList;
402  fQueryResults->Add(q);
403  }
404  }
405 }
406 
407 ////////////////////////////////////////////////////////////////////////////////
408 /// Remove all query result instances referenced 'ref' from
409 /// the list of results.
410 
411 void TProofPlayer::RemoveQueryResult(const char *ref)
412 {
413  if (fQueryResults) {
414  TIter nxq(fQueryResults);
415  TQueryResult *qr = 0;
416  while ((qr = (TQueryResult *) nxq())) {
417  if (qr->Matches(ref)) {
418  fQueryResults->Remove(qr);
419  delete qr;
420  }
421  }
422  }
423 }
424 
425 ////////////////////////////////////////////////////////////////////////////////
426 /// Get query result instances referenced 'ref' from
427 /// the list of results.
428 
430 {
431  if (fQueryResults) {
432  if (ref && strlen(ref) > 0) {
433  TIter nxq(fQueryResults);
434  TQueryResult *qr = 0;
435  while ((qr = (TQueryResult *) nxq())) {
436  if (qr->Matches(ref))
437  return qr;
438  }
439  } else {
440  // Get last
441  return (TQueryResult *) fQueryResults->Last();
442  }
443  }
444 
445  // Nothing found
446  return (TQueryResult *)0;
447 }
448 
449 ////////////////////////////////////////////////////////////////////////////////
450 /// Set current query and save previous value.
451 
453 {
454  fPreviousQuery = fQuery;
455  fQuery = q;
456 }
457 
458 ////////////////////////////////////////////////////////////////////////////////
459 /// Add object to input list.
460 
462 {
463  fInput->Add(inp);
464 }
465 
466 ////////////////////////////////////////////////////////////////////////////////
467 /// Clear input list.
468 
470 {
471  fInput->Clear();
472 }
473 
474 ////////////////////////////////////////////////////////////////////////////////
475 /// Get output object by name.
476 
478 {
479  if (fOutput)
480  return fOutput->FindObject(name);
481  return 0;
482 }
483 
484 ////////////////////////////////////////////////////////////////////////////////
485 /// Get output list.
486 
488 {
489  TList *ol = fOutput;
490  if (!ol && fQuery)
491  ol = fQuery->GetOutputList();
492  return ol;
493 }
494 
495 ////////////////////////////////////////////////////////////////////////////////
496 /// Reinitialize fSelector using the selector files in the query result.
497 /// Needed when Finalize is called after a Process execution for the same
498 /// selector name.
499 
501 {
502  Int_t rc = 0;
503 
504  // Make sure we have a query
505  if (!qr) {
506  Info("ReinitSelector", "query undefined - do nothing");
507  return -1;
508  }
509 
510  // Selector name
511  TString selec = qr->GetSelecImp()->GetName();
512  if (selec.Length() <= 0) {
513  Info("ReinitSelector", "selector name undefined - do nothing");
514  return -1;
515  }
516 
517  // Find out if this is a standard selection used for Draw actions
518  Bool_t stdselec = TSelector::IsStandardDraw(selec);
519 
520  // Find out if this is a precompiled selector: in such a case we do not
521  // have the code in TMacros, so we must rely on local libraries
522  Bool_t compselec = (selec.Contains(".") || stdselec) ? kFALSE : kTRUE;
523 
524  // If not, find out if it needs to be expanded
525  TString ipathold;
526  if (!stdselec && !compselec) {
527  // Check checksums for the versions of the selector files
528  Bool_t expandselec = kTRUE;
529  TString dir, ipath;
530  char *selc = gSystem->Which(TROOT::GetMacroPath(), selec, kReadPermission);
531  if (selc) {
532  // Check checksums
533  TMD5 *md5icur = 0, *md5iold = 0, *md5hcur = 0, *md5hold = 0;
534  // Implementation files
535  md5icur = TMD5::FileChecksum(selc);
536  md5iold = qr->GetSelecImp()->Checksum();
537  // Header files
538  TString selh(selc);
539  Int_t dot = selh.Last('.');
540  if (dot != kNPOS) selh.Remove(dot);
541  selh += ".h";
543  md5hcur = TMD5::FileChecksum(selh);
544  md5hold = qr->GetSelecHdr()->Checksum();
545 
546  // If nothing has changed nothing to do
547  if (md5hcur && md5hold && md5icur && md5iold)
548  if (*md5hcur == *md5hold && *md5icur == *md5iold)
549  expandselec = kFALSE;
550 
551  SafeDelete(md5icur);
552  SafeDelete(md5hcur);
553  SafeDelete(md5iold);
554  SafeDelete(md5hold);
555  if (selc) delete [] selc;
556  }
557 
558  Bool_t ok = kTRUE;
559  // Expand selector files, if needed
560  if (expandselec) {
561 
562  ok = kFALSE;
563  // Expand files in a temporary directory
564  TUUID u;
565  dir = Form("%s/%s",gSystem->TempDirectory(),u.AsString());
566  if (!(gSystem->MakeDirectory(dir))) {
567 
568  // Export implementation file
569  selec = Form("%s/%s",dir.Data(),selec.Data());
570  qr->GetSelecImp()->SaveSource(selec);
571 
572  // Export header file
573  TString seleh = Form("%s/%s",dir.Data(),qr->GetSelecHdr()->GetName());
574  qr->GetSelecHdr()->SaveSource(seleh);
575 
576  // Adjust include path
577  ipathold = gSystem->GetIncludePath();
578  ipath = Form("-I%s %s", dir.Data(), gSystem->GetIncludePath());
579  gSystem->SetIncludePath(ipath.Data());
580 
581  ok = kTRUE;
582  }
583  }
584  TString opt(qr->GetOptions());
585  Ssiz_t id = opt.Last('#');
586  if (id != kNPOS && id < opt.Length() - 1)
587  selec += opt(id + 1, opt.Length());
588 
589  if (!ok) {
590  Info("ReinitSelector", "problems locating or exporting selector files");
591  return -1;
592  }
593  }
594 
595  // Cleanup previous stuff
596  SafeDelete(fSelector);
597  fSelectorClass = 0;
598 
599  // Init the selector now
600  Int_t iglevelsave = gErrorIgnoreLevel;
601  if (compselec)
602  // Silent error printout on first attempt
604 
605  if ((fSelector = TSelector::GetSelector(selec))) {
606  if (compselec)
607  gErrorIgnoreLevel = iglevelsave; // restore ignore level
608  fSelectorClass = fSelector->IsA();
609  fSelector->SetOption(qr->GetOptions());
610 
611  } else {
612  if (compselec) {
613  gErrorIgnoreLevel = iglevelsave; // restore ignore level
614  // Retry by loading first the libraries listed in TQueryResult, if any
615  if (strlen(qr->GetLibList()) > 0) {
616  TString sl(qr->GetLibList());
617  TObjArray *oa = sl.Tokenize(" ");
618  if (oa) {
619  Bool_t retry = kFALSE;
620  TIter nxl(oa);
621  TObjString *os = 0;
622  while ((os = (TObjString *) nxl())) {
623  TString lib = gSystem->BaseName(os->GetName());
624  if (lib != "lib") {
625  lib.ReplaceAll("-l", "lib");
626  if (gSystem->Load(lib) == 0)
627  retry = kTRUE;
628  }
629  }
630  // Retry now, if the case
631  if (retry)
632  fSelector = TSelector::GetSelector(selec);
633  }
634  }
635  }
636  if (!fSelector) {
637  if (compselec)
638  Info("ReinitSelector", "compiled selector re-init failed:"
639  " automatic reload unsuccessful:"
640  " please load manually the correct library");
641  rc = -1;
642  }
643  }
644  if (fSelector) {
645  // Draw needs to reinit temp histos
646  fSelector->SetInputList(qr->GetInputList());
647  if (stdselec) {
648  ((TProofDraw *)fSelector)->DefVar();
649  } else {
650  // variables may have been initialized in Begin()
651  fSelector->Begin(0);
652  }
653  }
654 
655  // Restore original include path, if needed
656  if (ipathold.Length() > 0)
657  gSystem->SetIncludePath(ipathold.Data());
658 
659  return rc;
660 }
661 
662 ////////////////////////////////////////////////////////////////////////////////
663 /// Incorporate output object (may not be used in this class).
664 
666 {
667  MayNotUse("AddOutputObject");
668  return -1;
669 }
670 
671 ////////////////////////////////////////////////////////////////////////////////
672 /// Incorporate output list (may not be used in this class).
673 
675 {
676  MayNotUse("AddOutput");
677 }
678 
679 ////////////////////////////////////////////////////////////////////////////////
680 /// Store output list (may not be used in this class).
681 
683 {
684  MayNotUse("StoreOutput");
685 }
686 
687 ////////////////////////////////////////////////////////////////////////////////
688 /// Store feedback list (may not be used in this class).
689 
691 {
692  MayNotUse("StoreFeedback");
693 }
694 
695 ////////////////////////////////////////////////////////////////////////////////
696 /// Report progress (may not be used in this class).
697 
698 void TProofPlayer::Progress(Long64_t /*total*/, Long64_t /*processed*/)
699 {
700  MayNotUse("Progress");
701 }
702 
703 ////////////////////////////////////////////////////////////////////////////////
704 /// Report progress (may not be used in this class).
705 
706 void TProofPlayer::Progress(Long64_t /*total*/, Long64_t /*processed*/,
707  Long64_t /*bytesread*/,
708  Float_t /*evtRate*/, Float_t /*mbRate*/,
709  Float_t /*evtrti*/, Float_t /*mbrti*/)
710 {
711  MayNotUse("Progress");
712 }
713 
714 ////////////////////////////////////////////////////////////////////////////////
715 /// Report progress (may not be used in this class).
716 
718 {
719  MayNotUse("Progress");
720 }
721 
722 ////////////////////////////////////////////////////////////////////////////////
723 /// Set feedback list (may not be used in this class).
724 
726 {
727  MayNotUse("Feedback");
728 }
729 
730 ////////////////////////////////////////////////////////////////////////////////
731 /// Draw feedback creation proxy. When accessed via TProof avoids
732 /// link dependency on libProofPlayer.
733 
735 {
736  return new TDrawFeedback(p);
737 }
738 
739 ////////////////////////////////////////////////////////////////////////////////
740 /// Set draw feedback option.
741 
743 {
744  if (f)
745  f->SetOption(opt);
746 }
747 
748 ////////////////////////////////////////////////////////////////////////////////
749 /// Delete draw feedback object.
750 
752 {
753  delete f;
754 }
755 
756 ////////////////////////////////////////////////////////////////////////////////
757 /// Save the partial results of this query to a dedicated file under the user
758 /// data directory. The file name has the form
759 /// <session_tag>.q<query_seq_num>.root
760 /// The file pat and the file are created if not existing already.
761 /// Only objects in the outputlist not being TProofOutputFile are saved.
762 /// The packets list 'packets' is saved if given.
763 /// Trees not attached to any file are attached to the open file.
764 /// If 'queryend' is kTRUE evrything is written out (TTrees included).
765 /// The actual saving action is controlled by 'force' and by fSavePartialResults /
766 /// fSaveResultsPerPacket:
767 ///
768 /// fSavePartialResults = kFALSE/kTRUE no-saving/saving
769 /// fSaveResultsPerPacket = kFALSE/kTRUE save-per-query/save-per-packet
770 ///
771 /// The function CheckMemUsage sets fSavePartialResults = 1 if fSaveMemThreshold > 0 and
772 /// ProcInfo_t::fMemResident >= fSaveMemThreshold: from that point on partial results
773 /// are always saved and expensive calls to TSystem::GetProcInfo saved.
774 /// The switch fSaveResultsPerPacket is instead controlled by the user or admin
775 /// who can also force saving in all cases; parameter PROOF_SavePartialResults or
776 /// RC env ProofPlayer.SavePartialResults .
777 /// However, if 'force' is kTRUE, fSavePartialResults and fSaveResultsPerPacket
778 /// are ignored.
779 /// Return -1 in case of problems, 0 otherwise.
780 
782 {
783  Bool_t save = (force || (fSavePartialResults &&
784  (queryend || fSaveResultsPerPacket))) ? kTRUE : kFALSE;
785  if (!save) {
786  PDB(kOutput, 2)
787  Info("SavePartialResults", "partial result saving disabled");
788  return 0;
789  }
790 
791  // Sanity check
792  if (!gProofServ) {
793  Error("SavePartialResults", "gProofServ undefined: something really wrong going on!!!");
794  return -1;
795  }
796  if (!fOutput) {
797  Error("SavePartialResults", "fOutput undefined: something really wrong going on!!!");
798  return -1;
799  }
800 
801  PDB(kOutput, 1)
802  Info("SavePartialResults", "start saving partial results {%d,%d,%d,%d}",
803  queryend, force, fSavePartialResults, fSaveResultsPerPacket);
804 
805  // Get list of processed packets from the iterator
806  PDB(kOutput, 2) Info("SavePartialResults", "fEvIter: %p", fEvIter);
807 
808  TList *packets = (fEvIter) ? fEvIter->GetPackets() : 0;
809  PDB(kOutput, 2) Info("SavePartialResults", "list of packets: %p, sz: %d",
810  packets, (packets ? packets->GetSize(): -1));
811 
812  // Open the file
813  const char *oopt = "UPDATE";
814  // Check if the file has already been defined
815  TString baseName(fOutputFilePath);
816  if (fOutputFilePath.IsNull()) {
817  baseName.Form("output-%s.q%d.root", gProofServ->GetTopSessionTag(), gProofServ->GetQuerySeqNum());
818  if (gProofServ->GetDataDirOpts() && strlen(gProofServ->GetDataDirOpts()) > 0) {
819  fOutputFilePath.Form("%s/%s?%s", gProofServ->GetDataDir(), baseName.Data(),
821  } else {
822  fOutputFilePath.Form("%s/%s", gProofServ->GetDataDir(), baseName.Data());
823  }
824  Info("SavePartialResults", "file with (partial) output: '%s'", fOutputFilePath.Data());
825  oopt = "RECREATE";
826  }
827  // Open the file in write mode
828  if (!(fOutputFile = TFile::Open(fOutputFilePath, oopt)) ||
829  (fOutputFile && fOutputFile->IsZombie())) {
830  Error("SavePartialResults", "cannot open '%s' for writing", fOutputFilePath.Data());
831  SafeDelete(fOutputFile);
832  return -1;
833  }
834 
835  // Save current directory
836  TDirectory *curdir = gDirectory;
837  fOutputFile->cd();
838 
839  // Write first the packets list, if required
840  if (packets) {
841  TDirectory *packetsDir = fOutputFile->mkdir("packets");
842  if (packetsDir) packetsDir->cd();
844  fOutputFile->cd();
845  }
846 
847  Bool_t notempty = kFALSE;
848  // Write out the output list
849  TList torm;
850  TIter nxo(fOutput);
851  TObject *o = 0;
852  while ((o = nxo())) {
853  // Skip output file drivers
854  if (o->InheritsFrom(TProofOutputFile::Class())) continue;
855  // Skip control objets
856  if (!strncmp(o->GetName(), "PROOF_", 6)) continue;
857  // Skip data members mapping
859  // Skip missing file info
860  if (!strcmp(o->GetName(), "MissingFiles")) continue;
861  // Trees need a special treatment
862  if (o->InheritsFrom("TTree")) {
863  TTree *t = (TTree *) o;
864  TDirectory *d = t->GetDirectory();
865  // If the tree is not attached to any file ...
866  if (!d || (d && !d->InheritsFrom("TFile"))) {
867  // ... we attach it
868  t->SetDirectory(fOutputFile);
869  }
870  if (t->GetDirectory() == fOutputFile) {
871  if (queryend) {
872  // ... we write it out
873  o->Write(0, TObject::kOverwrite);
874  // At least something in the file
875  notempty = kTRUE;
876  // Flag for removal from the outputlist
877  torm.Add(o);
878  // Prevent double-deletion attempts
879  t->SetDirectory(0);
880  } else {
881  // ... or we set in automatic flush mode
882  t->SetAutoFlush();
883  }
884  }
885  } else if (queryend || fSaveResultsPerPacket) {
886  // Save overwriting what's already there
887  o->Write(0, TObject::kOverwrite);
888  // At least something in the file
889  notempty = kTRUE;
890  // Flag for removal from the outputlist
891  if (queryend) torm.Add(o);
892  }
893  }
894 
895  // Restore previous directory
896  gDirectory = curdir;
897 
898  // Close the file if required
899  if (notempty) {
900  if (!fOutput->FindObject(baseName)) {
901  TProofOutputFile *po = 0;
902  // Get directions
903  TNamed *nm = (TNamed *) fInput->FindObject("PROOF_DefaultOutputOption");
904  TString oname = (nm) ? nm->GetTitle() : fOutputFilePath.Data();
905  if (nm && oname.BeginsWith("ds:")) {
906  oname.Replace(0, 3, "");
907  TString qtag =
909  oname.ReplaceAll("<qtag>", qtag);
910  // Create the TProofOutputFile for dataset creation
911  po = new TProofOutputFile(baseName, "DRO", oname.Data());
912  } else {
913  Bool_t hasddir = kFALSE;
914  // Create the TProofOutputFile for automatic merging
915  po = new TProofOutputFile(baseName, "M");
916  if (oname.BeginsWith("of:")) oname.Replace(0, 3, "");
917  if (gProofServ->IsTopMaster()) {
918  if (!strcmp(TUrl(oname, kTRUE).GetProtocol(), "file")) {
919  TString dsrv;
921  TProofServ::FilterLocalroot(oname, dsrv);
922  oname.Insert(0, dsrv);
923  }
924  } else {
925  if (nm) {
926  // The name has been sent by the client: resolve local place holders
927  oname.ReplaceAll("<file>", baseName);
928  } else {
929  // We did not get any indication; the final file will be in the datadir on
930  // the top master and it will be resolved there
931  oname.Form("<datadir>/%s", baseName.Data());
932  hasddir = kTRUE;
933  }
934  }
935  po->SetOutputFileName(oname.Data());
936  if (hasddir)
937  // Reset the bit, so that <datadir> has a chance to be resolved in AddOutputObject
939  po->SetName(gSystem->BaseName(oname.Data()));
940  }
941  po->AdoptFile(fOutputFile);
942  fOutput->Add(po);
943  // Flag the nature of this file
945  }
946  }
947  fOutputFile->Close();
948  SafeDelete(fOutputFile);
949 
950  // If last call, cleanup the output list from objects saved to file
951  if (queryend && torm.GetSize() > 0) {
952  TIter nxrm(&torm);
953  while ((o = nxrm())) { fOutput->Remove(o); }
954  }
955  torm.SetOwner(kFALSE);
956 
957  PDB(kOutput, 1)
958  Info("SavePartialResults", "partial results saved to file");
959  // We are done
960  return 0;
961 }
962 
963 ////////////////////////////////////////////////////////////////////////////////
964 /// Make sure that a valid selector object
965 /// Return -1 in case of problems, 0 otherwise
966 
967 Int_t TProofPlayer::AssertSelector(const char *selector_file)
968 {
969  if (selector_file && strlen(selector_file)) {
970  if (fCreateSelObj) SafeDelete(fSelector);
971 
972  // Get selector files from cache
973  if (gProofServ) {
975  TString ocwd = gSystem->WorkingDirectory();
977 
978  fSelector = TSelector::GetSelector(selector_file);
979 
980  gSystem->ChangeDirectory(ocwd);
982 
983  if (!fSelector) {
984  Error("AssertSelector", "cannot load: %s", selector_file );
985  return -1;
986  }
987  }
988 
990  Info("AssertSelector", "Processing via filename");
991  } else if (!fSelector) {
992  Error("AssertSelector", "no TSelector object define : cannot continue!");
993  return -1;
994  } else {
995  Info("AssertSelector", "Processing via TSelector object");
996  }
997  // Done
998  return 0;
999 }
1000 ////////////////////////////////////////////////////////////////////////////////
1001 /// Update fProgressStatus
1002 
1004 {
1005  if (fProgressStatus) {
1006  fProgressStatus->IncEntries(fProcessedRun);
1010  if (gMonitoringWriter)
1013  fProcessedRun = 0;
1014  }
1015 }
1016 
1017 ////////////////////////////////////////////////////////////////////////////////
1018 /// Process specified TDSet on PROOF worker.
1019 /// The return value is -1 in case of error and TSelector::GetStatus()
1020 /// in case of success.
1021 
1022 Long64_t TProofPlayer::Process(TDSet *dset, const char *selector_file,
1023  Option_t *option, Long64_t nentries,
1024  Long64_t first)
1025 {
1026  PDB(kGlobal,1) Info("Process","Enter");
1027 
1029  fOutput = 0;
1030 
1031  TCleanup clean(this);
1032 
1033  fSelectorClass = 0;
1034  TString wmsg;
1035  TRY {
1036  if (AssertSelector(selector_file) != 0 || !fSelector) {
1037  Error("Process", "cannot assert the selector object");
1038  return -1;
1039  }
1040 
1041  fSelectorClass = fSelector->IsA();
1042  Int_t version = fSelector->Version();
1043  if (version == 0 && IsClient()) fSelector->GetOutputList()->Clear();
1044 
1045  fOutput = (THashList *) fSelector->GetOutputList();
1046 
1047  if (gProofServ)
1048  TPerfStats::Start(fInput, fOutput);
1049 
1050  fSelStatus = new TStatus;
1051  fOutput->Add(fSelStatus);
1052 
1053  fSelector->SetOption(option);
1054  fSelector->SetInputList(fInput);
1055 
1056  // If in sequential (0-PROOF) mode validate the data set to get
1057  // the number of entries
1058  fTotalEvents = nentries;
1059  if (fTotalEvents < 0 && gProofServ &&
1061  dset->Validate();
1062  dset->Reset();
1063  TDSetElement *e = 0;
1064  while ((e = dset->Next())) {
1065  fTotalEvents += e->GetNum();
1066  }
1067  }
1068 
1069  dset->Reset();
1070 
1071  // Set parameters controlling the iterator behaviour
1072  Int_t useTreeCache = 1;
1073  if (TProof::GetParameter(fInput, "PROOF_UseTreeCache", useTreeCache) == 0) {
1074  if (useTreeCache > -1 && useTreeCache < 2)
1075  gEnv->SetValue("ProofPlayer.UseTreeCache", useTreeCache);
1076  }
1077  Long64_t cacheSize = -1;
1078  if (TProof::GetParameter(fInput, "PROOF_CacheSize", cacheSize) == 0) {
1079  TString sz = TString::Format("%lld", cacheSize);
1080  gEnv->SetValue("ProofPlayer.CacheSize", sz.Data());
1081  }
1082  // Parallel unzipping
1083  Int_t useParallelUnzip = 0;
1084  if (TProof::GetParameter(fInput, "PROOF_UseParallelUnzip", useParallelUnzip) == 0) {
1085  if (useParallelUnzip > -1 && useParallelUnzip < 2)
1086  gEnv->SetValue("ProofPlayer.UseParallelUnzip", useParallelUnzip);
1087  }
1088  // OS file caching (Mac Os X only)
1089  Int_t dontCacheFiles = 0;
1090  if (TProof::GetParameter(fInput, "PROOF_DontCacheFiles", dontCacheFiles) == 0) {
1091  if (dontCacheFiles == 1)
1092  gEnv->SetValue("ProofPlayer.DontCacheFiles", 1);
1093  }
1094  fEvIter = TEventIter::Create(dset, fSelector, first, nentries);
1095 
1096  // Control file object swap
1097  // <how>*10 + <force>
1098  // <how> = 0 end of run
1099  // 1 after each packet
1100  // <force> = 0 no, swap only if memory threshold is reached
1101  // 1 swap in all cases, accordingly to <how>
1102  Int_t opt = 0;
1103  if (TProof::GetParameter(fInput, "PROOF_SavePartialResults", opt) != 0) {
1104  opt = gEnv->GetValue("ProofPlayer.SavePartialResults", 0);
1105  }
1106  fSaveResultsPerPacket = (opt >= 10) ? kTRUE : kFALSE;
1107  fSavePartialResults = (opt%10 > 0) ? kTRUE : kFALSE;
1108  Info("Process", "save partial results? %d per-packet? %d", fSavePartialResults, fSaveResultsPerPacket);
1109 
1110  // Memory threshold for file object swap
1111  Float_t memfrac = gEnv->GetValue("ProofPlayer.SaveMemThreshold", -1.);
1112  if (memfrac > 0.) {
1113  // The threshold is per core
1114  SysInfo_t si;
1115  if (gSystem->GetSysInfo(&si) == 0) {
1116  fSaveMemThreshold = (Long_t) ((memfrac * si.fPhysRam * 1024.) / si.fCpus);
1117  Info("Process", "memory threshold for saving objects to file set to %ld kB",
1118  fSaveMemThreshold);
1119  } else {
1120  Error("Process", "cannot get SysInfo_t (!)");
1121  }
1122  }
1123 
1124  if (version == 0) {
1125  PDB(kLoop,1) Info("Process","Call Begin(0)");
1126  fSelector->Begin(0);
1127  } else {
1128  if (IsClient()) {
1129  // on client (for local run)
1130  PDB(kLoop,1) Info("Process","Call Begin(0)");
1131  fSelector->Begin(0);
1132  }
1133  if (!fSelStatus->TestBit(TStatus::kNotOk)) {
1134  PDB(kLoop,1) Info("Process","Call SlaveBegin(0)");
1135  fSelector->SlaveBegin(0); // Init is called explicitly
1136  // from GetNextEvent()
1137  }
1138  }
1139 
1140  } CATCH(excode) {
1142  Error("Process","exception %d caught", excode);
1144  return -1;
1145  } ENDTRY;
1146 
1147  // Save the results, if needed, closing the file
1148  if (SavePartialResults(kFALSE) < 0)
1149  Warning("Process", "problems seetting up file-object swapping");
1150 
1151  // Create feedback lists, if required
1152  SetupFeedback();
1153 
1154  if (gMonitoringWriter)
1155  gMonitoringWriter->SendProcessingStatus("STARTED",kTRUE);
1156 
1157  PDB(kLoop,1)
1158  Info("Process","Looping over Process()");
1159 
1160  // get the byte read counter at the beginning of processing
1161  fReadBytesRun = TFile::GetFileBytesRead();
1162  fReadCallsRun = TFile::GetFileReadCalls();
1163  fProcessedRun = 0;
1164  // force the first monitoring info
1165  if (gMonitoringWriter)
1167 
1168  // Start asynchronous timer to dispatch pending events
1169  SetDispatchTimer(kTRUE);
1170 
1171  // Loop over range
1172  gAbort = kFALSE;
1173  Long64_t entry;
1176 
1177  TRY {
1178 
1179  Int_t mrc = -1;
1180  // Get the frequency for checking memory consumption and logging information
1181  Long64_t memlogfreq = -1;
1182  if (((mrc = TProof::GetParameter(fInput, "PROOF_MemLogFreq", memlogfreq))) != 0) memlogfreq = -1;
1183  Long64_t singleshot = 1;
1184  Bool_t warnHWMres = kTRUE, warnHWMvir = kTRUE;
1185  TString lastMsg("(unfortunately no detailed info is available about current packet)");
1186 
1187  // Initial memory footprint
1188  if (!CheckMemUsage(singleshot, warnHWMres, warnHWMvir, wmsg)) {
1189  Error("Process", "%s", wmsg.Data());
1190  wmsg.Insert(0, TString::Format("ERROR:%s, after SlaveBegin(), ", gProofServ->GetOrdinal()));
1191  fSelStatus->Add(wmsg.Data());
1192  if (gProofServ) {
1193  gProofServ->SendAsynMessage(wmsg.Data());
1195  }
1198  } else if (!wmsg.IsNull()) {
1199  Warning("Process", "%s", wmsg.Data());
1200  }
1201 
1202  TPair *currentElem = 0;
1203  // The event loop on the worker
1204  Long64_t fst = -1, num;
1205  Long_t maxproctime = -1;
1206  Bool_t newrun = kFALSE;
1207  while ((fEvIter->GetNextPacket(fst, num) != -1) &&
1208  !fSelStatus->TestBit(TStatus::kNotOk) &&
1209  fSelector->GetAbort() == TSelector::kContinue) {
1210  // This is needed by the inflate infrastructure to calculate
1211  // sleeping times
1213 
1214  // Give the possibility to the selector to access additional info in the
1215  // incoming packet
1216  if (dset->Current()) {
1217  if (!currentElem) {
1218  currentElem = new TPair(new TObjString("PROOF_CurrentElement"), dset->Current());
1219  fInput->Add(currentElem);
1220  } else {
1221  if (currentElem->Value() != dset->Current()) {
1222  currentElem->SetValue(dset->Current());
1223  } else if (dset->Current()->TestBit(TDSetElement::kNewRun)) {
1225  }
1226  }
1227  if (dset->Current()->TestBit(TDSetElement::kNewPacket)) {
1228  if (dset->TestBit(TDSet::kEmpty)) {
1229  lastMsg = "check logs for possible stacktrace - last cycle:";
1230  } else {
1231  TDSetElement *elem = dynamic_cast<TDSetElement *>(currentElem->Value());
1232  TString fn = (elem) ? elem->GetFileName() : "<undef>";
1233  lastMsg.Form("while processing dset:'%s', file:'%s'"
1234  " - check logs for possible stacktrace - last event:", dset->GetName(), fn.Data());
1235  }
1236  TProofServ::SetLastMsg(lastMsg);
1237  }
1238  // Set the max proc time, if any
1239  if (dset->Current()->GetMaxProcTime() >= 0.)
1240  maxproctime = (Long_t) (1000 * dset->Current()->GetMaxProcTime());
1241  newrun = (dset->Current()->TestBit(TDSetElement::kNewPacket)) ? kTRUE : kFALSE;
1242  }
1243 
1246  // Setup packet proc time measurement
1247  if (maxproctime > 0) {
1248  if (!fProcTimeTimer) fProcTimeTimer = new TProctimeTimer(this, maxproctime);
1249  fProcTimeTimer->Start(maxproctime, kTRUE); // One shot
1250  if (!fProcTime) fProcTime = new TStopwatch();
1251  fProcTime->Reset(); // Reset counters
1252  }
1253  Long64_t refnum = num;
1254  if (refnum < 0 && maxproctime <= 0) {
1255  wmsg.Form("neither entries nor max proc time specified:"
1256  " risk of infinite loop: processing aborted");
1257  Error("Process", "%s", wmsg.Data());
1258  if (gProofServ) {
1259  wmsg.Insert(0, TString::Format("ERROR:%s, entry:%lld, ",
1261  gProofServ->SendAsynMessage(wmsg.Data());
1262  }
1265  break;
1266  }
1267  while (refnum < 0 || num--) {
1268 
1269  // Did we use all our time?
1271  fProcTime->Stop();
1272  if (!newrun && !TestBit(TProofPlayer::kMaxProcTimeExtended) && refnum > 0) {
1273  // How much are we left with?
1274  Float_t xleft = (refnum > num) ? (Float_t) num / (Float_t) (refnum) : 1.;
1275  if (xleft < 0.2) {
1276  // Give another try, 1.5 times the remaining measured expected time
1277  Long_t mpt = (Long_t) (1500 * num / ((Double_t)(refnum - num) / fProcTime->RealTime()));
1279  fProcTimeTimer->Start(mpt, kTRUE); // One shot
1281  }
1282  }
1284  Info("Process", "max proc time reached (%ld msecs): packet processing stopped:\n%s",
1285  maxproctime, lastMsg.Data());
1286 
1287  break;
1288  }
1289  }
1290 
1291  if (!(!fSelStatus->TestBit(TStatus::kNotOk) &&
1292  fSelector->GetAbort() == TSelector::kContinue)) break;
1293 
1294  // Get the netry number, taking into account entry or event lists
1295  entry = fEvIter->GetEntryNumber(fst);
1296  fst++;
1297 
1298  // Set the last entry
1299  TProofServ::SetLastEntry(entry);
1300 
1301  if (fSelector->Version() == 0) {
1302  PDB(kLoop,3)
1303  Info("Process","Call ProcessCut(%lld)", entry);
1304  if (fSelector->ProcessCut(entry)) {
1305  PDB(kLoop,3)
1306  Info("Process","Call ProcessFill(%lld)", entry);
1307  fSelector->ProcessFill(entry);
1308  }
1309  } else {
1310  PDB(kLoop,3)
1311  Info("Process","Call Process(%lld)", entry);
1312  fSelector->Process(entry);
1313  if (fSelector->GetAbort() == TSelector::kAbortProcess) {
1315  break;
1316  } else if (fSelector->GetAbort() == TSelector::kAbortFile) {
1317  Info("Process", "packet processing aborted following the"
1318  " selector settings:\n%s", lastMsg.Data());
1319  fEvIter->InvalidatePacket();
1321  }
1322  }
1323  if (!fSelStatus->TestBit(TStatus::kNotOk)) fProcessedRun++;
1324 
1325  // Check the memory footprint, if required
1326  if (memlogfreq > 0 && (GetEventsProcessed() + fProcessedRun)%memlogfreq == 0) {
1327  if (!CheckMemUsage(memlogfreq, warnHWMres, warnHWMvir, wmsg)) {
1328  Error("Process", "%s", wmsg.Data());
1329  if (gProofServ) {
1330  wmsg.Insert(0, TString::Format("ERROR:%s, entry:%lld, ",
1331  gProofServ->GetOrdinal(), entry));
1332  gProofServ->SendAsynMessage(wmsg.Data());
1333  }
1337  break;
1338  } else {
1339  if (!wmsg.IsNull()) {
1340  Warning("Process", "%s", wmsg.Data());
1341  if (gProofServ) {
1342  wmsg.Insert(0, TString::Format("WARNING:%s, entry:%lld, ",
1343  gProofServ->GetOrdinal(), entry));
1344  gProofServ->SendAsynMessage(wmsg.Data());
1345  }
1346  }
1347  }
1348  }
1350  gSystem->DispatchOneEvent(kTRUE);
1352  }
1354  if (fSelStatus->TestBit(TStatus::kNotOk) || gROOT->IsInterrupted()) break;
1355 
1356  // Make sure that the selector abort status is reset
1357  if (fSelector->GetAbort() == TSelector::kAbortFile)
1358  fSelector->Abort("status reset", TSelector::kContinue);
1359  }
1360  }
1361 
1362  } CATCH(excode) {
1363  if (excode == kPEX_STOPPED) {
1364  Info("Process","received stop-process signal");
1366  } else if (excode == kPEX_ABORTED) {
1367  gAbort = kTRUE;
1368  Info("Process","received abort-process signal");
1370  } else {
1371  Error("Process","exception %d caught", excode);
1372  // Perhaps we need a dedicated status code here ...
1373  gAbort = kTRUE;
1375  }
1377  } ENDTRY;
1378 
1379  // Clean-up the envelop for the current element
1380  TPair *currentElem = 0;
1381  if ((currentElem = (TPair *) fInput->FindObject("PROOF_CurrentElement"))) {
1382  if ((currentElem = (TPair *) fInput->Remove(currentElem))) {
1383  delete currentElem->Key();
1384  delete currentElem;
1385  }
1386  }
1387 
1388  // Final memory footprint
1389  Long64_t singleshot = 1;
1390  Bool_t warnHWMres = kTRUE, warnHWMvir = kTRUE;
1391  Bool_t shrc = CheckMemUsage(singleshot, warnHWMres, warnHWMvir, wmsg);
1392  if (!wmsg.IsNull()) Warning("Process", "%s (%s)", wmsg.Data(), shrc ? "warn" : "hwm");
1393 
1394  PDB(kGlobal,2)
1395  Info("Process","%lld events processed", fProgressStatus->GetEntries());
1396 
1397  if (gMonitoringWriter) {
1401  }
1402 
1403  // Stop active timers
1404  SetDispatchTimer(kFALSE);
1405  if (fStopTimer != 0)
1406  SetStopTimer(kFALSE, gAbort);
1407  if (fFeedbackTimer != 0)
1408  HandleTimer(0);
1409 
1410  StopFeedback();
1411 
1412  // Save the results, if needed, closing the file
1413  if (SavePartialResults(kTRUE) < 0)
1414  Warning("Process", "problems saving the results to file");
1415 
1416  SafeDelete(fEvIter);
1417 
1418  // Finalize
1419 
1420  if (fExitStatus != kAborted) {
1421 
1422  TIter nxo(GetOutputList());
1423  TObject *o = 0;
1424  while ((o = nxo())) {
1425  // Special treatment for files
1426  if (o->IsA() == TProofOutputFile::Class()) {
1428  of->Print();
1430  const char *dir = of->GetDir();
1431  if (!dir || (dir && strlen(dir) <= 0)) {
1433  } else if (dir && strlen(dir) > 0) {
1434  TUrl u(dir);
1435  if (!strcmp(u.GetHost(), "localhost") || !strcmp(u.GetHost(), "127.0.0.1") ||
1436  !strcmp(u.GetHost(), "localhost.localdomain")) {
1437  u.SetHost(TUrl(gSystem->HostName()).GetHostFQDN());
1438  of->SetDir(u.GetUrl(kTRUE));
1439  }
1440  of->Print();
1441  }
1442  }
1443  }
1444 
1446 
1447  if (!fSelStatus->TestBit(TStatus::kNotOk)) {
1448  if (fSelector->Version() == 0) {
1449  PDB(kLoop,1) Info("Process","Call Terminate()");
1450  fSelector->Terminate();
1451  } else {
1452  PDB(kLoop,1) Info("Process","Call SlaveTerminate()");
1453  fSelector->SlaveTerminate();
1454  if (IsClient() && !fSelStatus->TestBit(TStatus::kNotOk)) {
1455  PDB(kLoop,1) Info("Process","Call Terminate()");
1456  fSelector->Terminate();
1457  }
1458  }
1459  }
1460 
1461  // Add Selector status in the output list so it can be returned to the client as done
1462  // by Tree::Process (see ROOT-748). The status from the various workers will be added.
1463  fOutput->Add(new TParameter<Long64_t>("PROOF_SelectorStatus", (Long64_t) fSelector->GetStatus()));
1464 
1465  if (gProofServ && !gProofServ->IsParallel()) { // put all the canvases onto the output list
1466  TIter nxc(gROOT->GetListOfCanvases());
1467  while (TObject *c = nxc())
1468  fOutput->Add(c);
1469  }
1470  }
1471 
1472  if (gProofServ)
1473  TPerfStats::Stop();
1474 
1475  return 0;
1476 }
1477 
1478 ////////////////////////////////////////////////////////////////////////////////
1479 /// Process specified TDSet on PROOF worker with TSelector object
1480 /// The return value is -1 in case of error and TSelector::GetStatus()
1481 /// in case of success.
1482 
1484  Option_t *option, Long64_t nentries,
1485  Long64_t first)
1486 {
1487  if (!selector) {
1488  Error("Process", "selector object undefiend!");
1489  return -1;
1490  }
1491 
1492  if (fCreateSelObj) SafeDelete(fSelector);
1493  fSelector = selector;
1495  return Process(dset, (const char *)0, option, nentries, first);
1496 }
1497 
1498 ////////////////////////////////////////////////////////////////////////////////
1499 /// Not implemented: meaningful only in the remote player. Returns kFALSE.
1500 
1502 {
1503  return kFALSE;
1504 }
1505 
1506 ////////////////////////////////////////////////////////////////////////////////
1507 /// Check the memory usage, if requested.
1508 /// Return kTRUE if OK, kFALSE if above 95% of at least one between virtual or
1509 /// resident limits are depassed.
1510 
1512  Bool_t &w80v, TString &wmsg)
1513 {
1514  Long64_t processed = GetEventsProcessed() + fProcessedRun;
1515  if (mfreq > 0 && processed%mfreq == 0) {
1516  // Record the memory information
1517  ProcInfo_t pi;
1518  if (!gSystem->GetProcInfo(&pi)){
1519  wmsg = "";
1520  if (gProofServ)
1521  Info("CheckMemUsage|Svc", "Memory %ld virtual %ld resident event %lld",
1522  pi.fMemVirtual, pi.fMemResident, processed);
1523  // Save info in TStatus
1524  fSelStatus->SetMemValues(pi.fMemVirtual, pi.fMemResident);
1525  // Apply limit on virtual memory, if any: warn if above 80%, stop if above 95% of max
1526  if (TProofServ::GetVirtMemMax() > 0) {
1528  wmsg.Form("using more than %d%% of allowed virtual memory (%ld kB)"
1529  " - STOP processing", (Int_t) (TProofServ::GetMemStop() * 100), pi.fMemVirtual);
1530  return kFALSE;
1531  } else if (pi.fMemVirtual > TProofServ::GetMemHWM() * TProofServ::GetVirtMemMax() && w80v) {
1532  // Refine monitoring
1533  mfreq = 1;
1534  wmsg.Form("using more than %d%% of allowed virtual memory (%ld kB)",
1535  (Int_t) (TProofServ::GetMemHWM() * 100), pi.fMemVirtual);
1536  w80v = kFALSE;
1537  }
1538  }
1539  // Apply limit on resident memory, if any: warn if above 80%, stop if above 95% of max
1540  if (TProofServ::GetResMemMax() > 0) {
1542  wmsg.Form("using more than %d%% of allowed resident memory (%ld kB)"
1543  " - STOP processing", (Int_t) (TProofServ::GetMemStop() * 100), pi.fMemResident);
1544  return kFALSE;
1545  } else if (pi.fMemResident > TProofServ::GetMemHWM() * TProofServ::GetResMemMax() && w80r) {
1546  // Refine monitoring
1547  mfreq = 1;
1548  if (wmsg.Length() > 0) {
1549  wmsg.Form("using more than %d%% of allowed both virtual and resident memory ({%ld,%ld} kB)",
1550  (Int_t) (TProofServ::GetMemHWM() * 100), pi.fMemVirtual, pi.fMemResident);
1551  } else {
1552  wmsg.Form("using more than %d%% of allowed resident memory (%ld kB)",
1553  (Int_t) (TProofServ::GetMemHWM() * 100), pi.fMemResident);
1554  }
1555  w80r = kFALSE;
1556  }
1557  }
1558  // In saving-partial-results mode flag the saving regime when reached to save expensive calls
1559  // to TSystem::GetProcInfo in SavePartialResults
1560  if (fSaveMemThreshold > 0 && pi.fMemResident >= fSaveMemThreshold) fSavePartialResults = kTRUE;
1561  }
1562  }
1563  // Done
1564  return kTRUE;
1565 }
1566 
1567 ////////////////////////////////////////////////////////////////////////////////
1568 /// Finalize query (may not be used in this class).
1569 
1571 {
1572  MayNotUse("Finalize");
1573  return -1;
1574 }
1575 
1576 ////////////////////////////////////////////////////////////////////////////////
1577 /// Finalize query (may not be used in this class).
1578 
1580 {
1581  MayNotUse("Finalize");
1582  return -1;
1583 }
1584 ////////////////////////////////////////////////////////////////////////////////
1585 /// Merge output (may not be used in this class).
1586 
1588 {
1589  MayNotUse("MergeOutput");
1590  return;
1591 }
1592 
1593 ////////////////////////////////////////////////////////////////////////////////
1594 
1596 {
1598  fOutput->Add(olsdm);
1599 }
1600 
1601 ////////////////////////////////////////////////////////////////////////////////
1602 /// Update automatic binning parameters for given object "name".
1603 
1607  Double_t& zmin, Double_t& zmax)
1608 {
1609  if ( fAutoBins == 0 ) {
1610  fAutoBins = new THashList;
1611  }
1612 
1613  TAutoBinVal *val = (TAutoBinVal*) fAutoBins->FindObject(name);
1614 
1615  if ( val == 0 ) {
1616  //look for info in higher master
1617  if (gProofServ && !gProofServ->IsTopMaster()) {
1618  TString key = name;
1619  TProofLimitsFinder::AutoBinFunc(key,xmin,xmax,ymin,ymax,zmin,zmax);
1620  }
1621 
1622  val = new TAutoBinVal(name,xmin,xmax,ymin,ymax,zmin,zmax);
1623  fAutoBins->Add(val);
1624  } else {
1625  val->GetAll(xmin,xmax,ymin,ymax,zmin,zmax);
1626  }
1627 }
1628 
1629 ////////////////////////////////////////////////////////////////////////////////
1630 /// Get next packet (may not be used in this class).
1631 
1633 {
1634  MayNotUse("GetNextPacket");
1635  return 0;
1636 }
1637 
1638 ////////////////////////////////////////////////////////////////////////////////
1639 /// Set up feedback (may not be used in this class).
1640 
1642 {
1643  MayNotUse("SetupFeedback");
1644 }
1645 
1646 ////////////////////////////////////////////////////////////////////////////////
1647 /// Stop feedback (may not be used in this class).
1648 
1650 {
1651  MayNotUse("StopFeedback");
1652 }
1653 
1654 ////////////////////////////////////////////////////////////////////////////////
1655 /// Draw (may not be used in this class).
1656 
1657 Long64_t TProofPlayer::DrawSelect(TDSet * /*set*/, const char * /*varexp*/,
1658  const char * /*selection*/, Option_t * /*option*/,
1659  Long64_t /*nentries*/, Long64_t /*firstentry*/)
1660 {
1661  MayNotUse("DrawSelect");
1662  return -1;
1663 }
1664 
1665 ////////////////////////////////////////////////////////////////////////////////
1666 /// Handle tree header request.
1667 
1669 {
1670  MayNotUse("HandleGetTreeHeader|");
1671 }
1672 
1673 ////////////////////////////////////////////////////////////////////////////////
1674 /// Receive histo from slave.
1675 
1677 {
1678  TObject *obj = mess->ReadObject(mess->GetClass());
1679  if (obj->InheritsFrom(TH1::Class())) {
1680  TH1 *h = (TH1*)obj;
1681  h->SetDirectory(0);
1682  TH1 *horg = (TH1*)gDirectory->GetList()->FindObject(h->GetName());
1683  if (horg)
1684  horg->Add(h);
1685  else
1687  }
1688 }
1689 
1690 ////////////////////////////////////////////////////////////////////////////////
1691 /// Draw the object if it is a canvas.
1692 /// Return 0 in case of success, 1 if it is not a canvas or libProofDraw
1693 /// is not available.
1694 
1696 {
1697  static Int_t (*gDrawCanvasHook)(TObject *) = 0;
1698 
1699  // Load the library the first time
1700  if (!gDrawCanvasHook) {
1701  // Load library needed for graphics ...
1702  TString drawlib = "libProofDraw";
1703  char *p = 0;
1704  if ((p = gSystem->DynamicPathName(drawlib, kTRUE))) {
1705  delete[] p;
1706  if (gSystem->Load(drawlib) != -1) {
1707  // Locate DrawCanvas
1708  Func_t f = 0;
1709  if ((f = gSystem->DynFindSymbol(drawlib,"DrawCanvas")))
1710  gDrawCanvasHook = (Int_t (*)(TObject *))(f);
1711  else
1712  Warning("DrawCanvas", "can't find DrawCanvas");
1713  } else
1714  Warning("DrawCanvas", "can't load %s", drawlib.Data());
1715  } else
1716  Warning("DrawCanvas", "can't locate %s", drawlib.Data());
1717  }
1718  if (gDrawCanvasHook && obj)
1719  return (*gDrawCanvasHook)(obj);
1720  // No drawing hook or object undefined
1721  return 1;
1722 }
1723 
1724 ////////////////////////////////////////////////////////////////////////////////
1725 /// Parse the arguments from var, sel and opt and fill the selector and
1726 /// object name accordingly.
1727 /// Return 0 in case of success, 1 if libProofDraw is not available.
1728 
1729 Int_t TProofPlayer::GetDrawArgs(const char *var, const char *sel, Option_t *opt,
1730  TString &selector, TString &objname)
1731 {
1732  static Int_t (*gGetDrawArgsHook)(const char *, const char *, Option_t *,
1733  TString &, TString &) = 0;
1734 
1735  // Load the library the first time
1736  if (!gGetDrawArgsHook) {
1737  // Load library needed for graphics ...
1738  TString drawlib = "libProofDraw";
1739  char *p = 0;
1740  if ((p = gSystem->DynamicPathName(drawlib, kTRUE))) {
1741  delete[] p;
1742  if (gSystem->Load(drawlib) != -1) {
1743  // Locate GetDrawArgs
1744  Func_t f = 0;
1745  if ((f = gSystem->DynFindSymbol(drawlib,"GetDrawArgs")))
1746  gGetDrawArgsHook = (Int_t (*)(const char *, const char *, Option_t *,
1747  TString &, TString &))(f);
1748  else
1749  Warning("GetDrawArgs", "can't find GetDrawArgs");
1750  } else
1751  Warning("GetDrawArgs", "can't load %s", drawlib.Data());
1752  } else
1753  Warning("GetDrawArgs", "can't locate %s", drawlib.Data());
1754  }
1755  if (gGetDrawArgsHook)
1756  return (*gGetDrawArgsHook)(var, sel, opt, selector, objname);
1757  // No parser hook or object undefined
1758  return 1;
1759 }
1760 
1761 ////////////////////////////////////////////////////////////////////////////////
1762 /// Create/destroy a named canvas for feedback
1763 
1764 void TProofPlayer::FeedBackCanvas(const char *name, Bool_t create)
1765 {
1766  static void (*gFeedBackCanvasHook)(const char *, Bool_t) = 0;
1767 
1768  // Load the library the first time
1769  if (!gFeedBackCanvasHook) {
1770  // Load library needed for graphics ...
1771  TString drawlib = "libProofDraw";
1772  char *p = 0;
1773  if ((p = gSystem->DynamicPathName(drawlib, kTRUE))) {
1774  delete[] p;
1775  if (gSystem->Load(drawlib) != -1) {
1776  // Locate FeedBackCanvas
1777  Func_t f = 0;
1778  if ((f = gSystem->DynFindSymbol(drawlib,"FeedBackCanvas")))
1779  gFeedBackCanvasHook = (void (*)(const char *, Bool_t))(f);
1780  else
1781  Warning("FeedBackCanvas", "can't find FeedBackCanvas");
1782  } else
1783  Warning("FeedBackCanvas", "can't load %s", drawlib.Data());
1784  } else
1785  Warning("FeedBackCanvas", "can't locate %s", drawlib.Data());
1786  }
1787  if (gFeedBackCanvasHook) (*gFeedBackCanvasHook)(name, create);
1788  // No parser hook or object undefined
1789  return;
1790 }
1791 
1792 ////////////////////////////////////////////////////////////////////////////////
1793 /// Return the size in bytes of the cache
1794 
1796 {
1797  if (fEvIter) return fEvIter->GetCacheSize();
1798  return -1;
1799 }
1800 
1801 ////////////////////////////////////////////////////////////////////////////////
1802 /// Return the number of entries in the learning phase
1803 
1805 {
1806  if (fEvIter) return fEvIter->GetLearnEntries();
1807  return -1;
1808 }
1809 
1810 ////////////////////////////////////////////////////////////////////////////////
1811 /// Switch on/off merge timer
1812 
1814 {
1815  if (on) {
1816  if (!fMergeSTW) fMergeSTW = new TStopwatch();
1817  PDB(kGlobal,1)
1818  Info("SetMerging", "ON: mergers: %d", fProof->fMergersCount);
1819  if (fNumMergers <= 0 && fProof->fMergersCount > 0)
1821  } else if (fMergeSTW) {
1822  fMergeSTW->Stop();
1823  Float_t rt = fMergeSTW->RealTime();
1824  PDB(kGlobal,1)
1825  Info("SetMerging", "OFF: rt: %f, mergers: %d", rt, fNumMergers);
1826  if (fQuery) {
1827  if (!fProof->TestBit(TProof::kIsClient) || fProof->IsLite()) {
1828  // On the master (or in Lite()) we set the merging time and the numebr of mergers
1829  fQuery->SetMergeTime(rt);
1830  fQuery->SetNumMergers(fNumMergers);
1831  } else {
1832  // In a standard client we save the transfer-to-client time
1833  fQuery->SetRecvTime(rt);
1834  }
1835  PDB(kGlobal,2) fQuery->Print("F");
1836  }
1837  }
1838 }
1839 
1840 //------------------------------------------------------------------------------
1841 
1843 
1844 ////////////////////////////////////////////////////////////////////////////////
1845 /// Process the specified TSelector object 'nentries' times.
1846 /// Used to test the PROOF interator mechanism for cycle-driven selectors in a
1847 /// local session.
1848 /// The return value is -1 in case of error and TSelector::GetStatus()
1849 /// in case of success.
1850 
1851 Long64_t TProofPlayerLocal::Process(TSelector *selector,
1852  Long64_t nentries, Option_t *option)
1853 {
1854  if (!selector) {
1855  Error("Process", "selector object undefiend!");
1856  return -1;
1857  }
1858 
1859  TDSetProxy *set = new TDSetProxy("", "", "");
1860  set->SetBit(TDSet::kEmpty);
1861  set->SetBit(TDSet::kIsLocal);
1862  Long64_t rc = Process(set, selector, option, nentries);
1863  SafeDelete(set);
1864 
1865  // Done
1866  return rc;
1867 }
1868 
1869 ////////////////////////////////////////////////////////////////////////////////
1870 /// Process the specified TSelector file 'nentries' times.
1871 /// Used to test the PROOF interator mechanism for cycle-driven selectors in a
1872 /// local session.
1873 /// Process specified TDSet on PROOF worker with TSelector object
1874 /// The return value is -1 in case of error and TSelector::GetStatus()
1875 /// in case of success.
1876 
1878  Long64_t nentries, Option_t *option)
1879 {
1880  TDSetProxy *set = new TDSetProxy("", "", "");
1881  set->SetBit(TDSet::kEmpty);
1882  set->SetBit(TDSet::kIsLocal);
1883  Long64_t rc = Process(set, selector, option, nentries);
1884  SafeDelete(set);
1885 
1886  // Done
1887  return rc;
1888 }
1889 
1890 
1891 //------------------------------------------------------------------------------
1892 
1894 
1895 ////////////////////////////////////////////////////////////////////////////////
1896 /// Destructor.
1897 
1899 {
1900  SafeDelete(fOutput); // owns the output list
1901  SafeDelete(fOutputLists);
1902 
1903  // Objects stored in maps are already deleted when merging the feedback
1904  SafeDelete(fFeedbackLists);
1905  SafeDelete(fPacketizer);
1906 
1907  if (fProcessMessage)
1908  SafeDelete(fProcessMessage);
1909 }
1910 
1911 ////////////////////////////////////////////////////////////////////////////////
1912 /// Init the packetizer
1913 /// Return 0 on success (fPacketizer is correctly initialized), -1 on failure.
1914 
1916  Long64_t first, const char *defpackunit,
1917  const char *defpackdata)
1918 {
1920  PDB(kGlobal,1) Info("Process","Enter");
1921  fDSet = dset;
1923 
1924  // This is done here to pickup on the fly changes
1925  Int_t honebyone = 1;
1926  if (TProof::GetParameter(fInput, "PROOF_MergeTH1OneByOne", honebyone) != 0)
1927  honebyone = gEnv->GetValue("ProofPlayer.MergeTH1OneByOne", 1);
1928  fMergeTH1OneByOne = (honebyone == 1) ? kTRUE : kFALSE;
1929 
1930  Bool_t noData = dset->TestBit(TDSet::kEmpty) ? kTRUE : kFALSE;
1931 
1932  TString packetizer;
1933  TList *listOfMissingFiles = 0;
1934 
1935  TMethodCall callEnv;
1936  TClass *cl;
1937  noData = dset->TestBit(TDSet::kEmpty) ? kTRUE : kFALSE;
1938 
1939  if (noData) {
1940 
1941  if (TProof::GetParameter(fInput, "PROOF_Packetizer", packetizer) != 0)
1942  packetizer = defpackunit;
1943  else
1944  Info("InitPacketizer", "using alternate packetizer: %s", packetizer.Data());
1945 
1946  // Get linked to the related class
1947  cl = TClass::GetClass(packetizer);
1948  if (cl == 0) {
1949  Error("InitPacketizer", "class '%s' not found", packetizer.Data());
1951  return -1;
1952  }
1953 
1954  // Init the constructor
1955  callEnv.InitWithPrototype(cl, cl->GetName(),"TList*,Long64_t,TList*,TProofProgressStatus*");
1956  if (!callEnv.IsValid()) {
1957  Error("InitPacketizer",
1958  "cannot find correct constructor for '%s'", cl->GetName());
1960  return -1;
1961  }
1962  callEnv.ResetParam();
1964  callEnv.SetParam((Long64_t) nentries);
1965  callEnv.SetParam((Long_t) fInput);
1966  callEnv.SetParam((Long_t) fProgressStatus);
1967 
1968  } else if (dset->TestBit(TDSet::kMultiDSet)) {
1969 
1970  // We have to process many datasets in one go, keeping them separate
1971  if (fProof->GetRunStatus() != TProof::kRunning) {
1972  // We have been asked to stop
1973  Error("InitPacketizer", "received stop/abort request");
1975  return -1;
1976  }
1977 
1978  // The multi packetizer
1979  packetizer = "TPacketizerMulti";
1980 
1981  // Get linked to the related class
1982  cl = TClass::GetClass(packetizer);
1983  if (cl == 0) {
1984  Error("InitPacketizer", "class '%s' not found", packetizer.Data());
1986  return -1;
1987  }
1988 
1989  // Init the constructor
1990  callEnv.InitWithPrototype(cl, cl->GetName(),"TDSet*,TList*,Long64_t,Long64_t,TList*,TProofProgressStatus*");
1991  if (!callEnv.IsValid()) {
1992  Error("InitPacketizer", "cannot find correct constructor for '%s'", cl->GetName());
1994  return -1;
1995  }
1996  callEnv.ResetParam();
1997  callEnv.SetParam((Long_t) dset);
1999  callEnv.SetParam((Long64_t) first);
2000  callEnv.SetParam((Long64_t) nentries);
2001  callEnv.SetParam((Long_t) fInput);
2002  callEnv.SetParam((Long_t) fProgressStatus);
2003 
2004  // We are going to test validity during the packetizer initialization
2005  dset->SetBit(TDSet::kValidityChecked);
2006  dset->ResetBit(TDSet::kSomeInvalid);
2007 
2008  } else {
2009 
2010  // Lookup - resolve the end-point urls to optmize the distribution.
2011  // The lookup was previously called in the packetizer's constructor.
2012  // A list for the missing files may already have been added to the
2013  // output list; otherwise, if needed it will be created inside
2014  if ((listOfMissingFiles = (TList *)fInput->FindObject("MissingFiles"))) {
2015  // Move it to the output list
2016  fInput->Remove(listOfMissingFiles);
2017  } else {
2018  listOfMissingFiles = new TList;
2019  }
2020  // Do the lookup; we only skip it if explicitly requested so.
2021  TString lkopt;
2022  if (TProof::GetParameter(fInput, "PROOF_LookupOpt", lkopt) != 0 || lkopt != "none")
2023  dset->Lookup(kTRUE, &listOfMissingFiles);
2024 
2025  if (fProof->GetRunStatus() != TProof::kRunning) {
2026  // We have been asked to stop
2027  Error("InitPacketizer", "received stop/abort request");
2029  return -1;
2030  }
2031 
2032  if (!(dset->GetListOfElements()) ||
2033  !(dset->GetListOfElements()->GetSize())) {
2034  if (gProofServ)
2035  gProofServ->SendAsynMessage("InitPacketizer: No files from the data set were found - Aborting");
2036  Error("InitPacketizer", "No files from the data set were found - Aborting");
2038  if (listOfMissingFiles) {
2039  listOfMissingFiles->SetOwner();
2040  fOutput->Remove(listOfMissingFiles);
2041  SafeDelete(listOfMissingFiles);
2042  }
2043  return -1;
2044  }
2045 
2046  if (TProof::GetParameter(fInput, "PROOF_Packetizer", packetizer) != 0)
2047  // Using standard packetizer TAdaptivePacketizer
2048  packetizer = defpackdata;
2049  else
2050  Info("InitPacketizer", "using alternate packetizer: %s", packetizer.Data());
2051 
2052  // Get linked to the related class
2053  cl = TClass::GetClass(packetizer);
2054  if (cl == 0) {
2055  Error("InitPacketizer", "class '%s' not found", packetizer.Data());
2057  return -1;
2058  }
2059 
2060  // Init the constructor
2061  callEnv.InitWithPrototype(cl, cl->GetName(),"TDSet*,TList*,Long64_t,Long64_t,TList*,TProofProgressStatus*");
2062  if (!callEnv.IsValid()) {
2063  Error("InitPacketizer", "cannot find correct constructor for '%s'", cl->GetName());
2065  return -1;
2066  }
2067  callEnv.ResetParam();
2068  callEnv.SetParam((Long_t) dset);
2070  callEnv.SetParam((Long64_t) first);
2071  callEnv.SetParam((Long64_t) nentries);
2072  callEnv.SetParam((Long_t) fInput);
2073  callEnv.SetParam((Long_t) fProgressStatus);
2074 
2075  // We are going to test validity during the packetizer initialization
2076  dset->SetBit(TDSet::kValidityChecked);
2077  dset->ResetBit(TDSet::kSomeInvalid);
2078  }
2079 
2080  // Get an instance of the packetizer
2081  Long_t ret = 0;
2082  callEnv.Execute(ret);
2083  if ((fPacketizer = (TVirtualPacketizer *)ret) == 0) {
2084  Error("InitPacketizer", "cannot construct '%s'", cl->GetName());
2086  return -1;
2087  }
2088 
2089  if (!fPacketizer->IsValid()) {
2090  Error("InitPacketizer",
2091  "instantiated packetizer object '%s' is invalid", cl->GetName());
2094  return -1;
2095  }
2096 
2097  // In multi mode retrieve the list of missing files
2098  if (!noData && dset->TestBit(TDSet::kMultiDSet)) {
2099  if ((listOfMissingFiles = (TList *) fInput->FindObject("MissingFiles"))) {
2100  // Remove it; it will be added to the output list
2101  fInput->Remove(listOfMissingFiles);
2102  }
2103  }
2104 
2105  if (!noData) {
2106  // Add invalid elements to the list of missing elements
2107  TDSetElement *elem = 0;
2108  if (dset->TestBit(TDSet::kSomeInvalid)) {
2109  TIter nxe(dset->GetListOfElements());
2110  while ((elem = (TDSetElement *)nxe())) {
2111  if (!elem->GetValid()) {
2112  if (!listOfMissingFiles)
2113  listOfMissingFiles = new TList;
2114  listOfMissingFiles->Add(elem->GetFileInfo(dset->GetType()));
2115  dset->Remove(elem, kFALSE);
2116  }
2117  }
2118  // The invalid elements have been removed
2120  }
2121 
2122  // Record the list of missing or invalid elements in the output list
2123  if (listOfMissingFiles && listOfMissingFiles->GetSize() > 0) {
2124  TIter missingFiles(listOfMissingFiles);
2125  TString msg;
2126  if (gDebug > 0) {
2127  TFileInfo *fi = 0;
2128  while ((fi = (TFileInfo *) missingFiles.Next())) {
2129  if (fi->GetCurrentUrl()) {
2130  msg = Form("File not found: %s - skipping!",
2131  fi->GetCurrentUrl()->GetUrl());
2132  } else {
2133  msg = Form("File not found: %s - skipping!", fi->GetName());
2134  }
2136  }
2137  }
2138  // Make sure it will be sent back
2139  if (!GetOutput("MissingFiles")) {
2140  listOfMissingFiles->SetName("MissingFiles");
2141  AddOutputObject(listOfMissingFiles);
2142  }
2143  TStatus *tmpStatus = (TStatus *)GetOutput("PROOF_Status");
2144  if (!tmpStatus) AddOutputObject((tmpStatus = new TStatus()));
2145 
2146  // Estimate how much data are missing
2147  Int_t ngood = dset->GetListOfElements()->GetSize();
2148  Int_t nbad = listOfMissingFiles->GetSize();
2149  Double_t xb = Double_t(nbad) / Double_t(ngood + nbad);
2150  msg = Form(" About %.2f %c of the requested files (%d out of %d) were missing or unusable; details in"
2151  " the 'missingFiles' list", xb * 100., '%', nbad, nbad + ngood);
2152  tmpStatus->Add(msg.Data());
2153  msg = Form(" +++\n"
2154  " +++ About %.2f %c of the requested files (%d out of %d) are missing or unusable; details in"
2155  " the 'MissingFiles' list\n"
2156  " +++", xb * 100., '%', nbad, nbad + ngood);
2158  } else {
2159  // Cleanup
2160  SafeDelete(listOfMissingFiles);
2161  }
2162  }
2163 
2164  // Done
2165  return 0;
2166 }
2167 
2168 ////////////////////////////////////////////////////////////////////////////////
2169 /// Process specified TDSet on PROOF.
2170 /// This method is called on client and on the PROOF master.
2171 /// The return value is -1 in case of an error and TSelector::GetStatus() in
2172 /// in case of success.
2173 
2174 Long64_t TProofPlayerRemote::Process(TDSet *dset, const char *selector_file,
2175  Option_t *option, Long64_t nentries,
2176  Long64_t first)
2177 {
2178  PDB(kGlobal,1) Info("Process", "Enter");
2179 
2180  fDSet = dset;
2182 
2183  if (!fProgressStatus) {
2184  Error("Process", "No progress status");
2185  return -1;
2186  }
2188 
2189  // delete fOutput;
2190  if (!fOutput)
2191  fOutput = new THashList;
2192  else
2193  fOutput->Clear();
2194 
2196 
2197  if (fProof->IsMaster()){
2198  TPerfStats::Start(fInput, fOutput);
2199  } else {
2201  }
2202 
2203  TStopwatch elapsed;
2204 
2205  // Define filename
2206  TString fn;
2207  fSelectorFileName = selector_file;
2208 
2209  if (fCreateSelObj) {
2210  if(!SendSelector(selector_file)) return -1;
2211  fn = gSystem->BaseName(selector_file);
2212  } else {
2213  fn = selector_file;
2214  }
2215 
2216  TMessage mesg(kPROOF_PROCESS);
2217 
2218  // Parse option
2219  Bool_t sync = (fProof->GetQueryMode(option) == TProof::kSync);
2220 
2221  TList *inputtmp = 0; // List of temporary input objects
2222  TDSet *set = dset;
2223  if (fProof->IsMaster()) {
2224 
2225  PDB(kPacketizer,1) Info("Process","Create Proxy TDSet");
2226  set = new TDSetProxy( dset->GetType(), dset->GetObjName(),
2227  dset->GetDirectory() );
2228  if (dset->TestBit(TDSet::kEmpty))
2229  set->SetBit(TDSet::kEmpty);
2230 
2231  if (InitPacketizer(dset, nentries, first, "TPacketizerUnit", "TPacketizer") != 0) {
2232  Error("Process", "cannot init the packetizer");
2234  return -1;
2235  }
2236 
2237  // Reset start, this is now managed by the packetizer
2238  first = 0;
2239 
2240  // Negative memlogfreq disable checks.
2241  // If 0 is passed we try to have 100 messages about memory
2242  // Otherwise we use the frequency passed.
2243  Int_t mrc = -1;
2244  Long64_t memlogfreq = -1, mlf;
2245  if (gSystem->Getenv("PROOF_MEMLOGFREQ")) {
2246  TString clf(gSystem->Getenv("PROOF_MEMLOGFREQ"));
2247  if (clf.IsDigit()) { memlogfreq = clf.Atoi(); mrc = 0; }
2248  }
2249  if ((mrc = TProof::GetParameter(fProof->GetInputList(), "PROOF_MemLogFreq", mlf)) == 0) memlogfreq = mlf;
2250  if (memlogfreq == 0) {
2251  memlogfreq = fPacketizer->GetTotalEntries()/(fProof->GetParallel()*100);
2252  if (memlogfreq <= 0) memlogfreq = 1;
2253  }
2254  if (mrc == 0) fProof->SetParameter("PROOF_MemLogFreq", memlogfreq);
2255 
2256 
2257  // Send input data, if any
2258  TString emsg;
2259  if (TProof::SendInputData(fQuery, fProof, emsg) != 0)
2260  Warning("Process", "could not forward input data: %s", emsg.Data());
2261 
2262  // Attach to the transient histogram with the assigned packets, if required
2263  if (fInput->FindObject("PROOF_StatsHist") != 0) {
2264  if (!(fProcPackets = (TH1I *) fOutput->FindObject("PROOF_ProcPcktHist"))) {
2265  Warning("Process", "could not attach to histogram 'PROOF_ProcPcktHist'");
2266  } else {
2267  PDB(kLoop,1)
2268  Info("Process", "attached to histogram 'PROOF_ProcPcktHist' to record"
2269  " packets being processed");
2270  }
2271  }
2272 
2273  } else {
2274 
2275  // Check whether we have to enforce the use of submergers
2276  if (gEnv->Lookup("Proof.UseMergers") && !fInput->FindObject("PROOF_UseMergers")) {
2277  Int_t smg = gEnv->GetValue("Proof.UseMergers",-1);
2278  if (smg >= 0) {
2279  fInput->Add(new TParameter<Int_t>("PROOF_UseMergers", smg));
2280  if (gEnv->Lookup("Proof.MergersByHost")) {
2281  Int_t mbh = gEnv->GetValue("Proof.MergersByHost",0);
2282  if (mbh != 0) {
2283  // Administrator settings have the priority
2284  TObject *o = 0;
2285  if ((o = fInput->FindObject("PROOF_MergersByHost"))) { fInput->Remove(o); delete o; }
2286  fInput->Add(new TParameter<Int_t>("PROOF_MergersByHost", mbh));
2287  }
2288  }
2289  }
2290  }
2291 
2292  // For a new query clients should make sure that the temporary
2293  // output list is empty
2294  if (fOutputLists) {
2295  fOutputLists->Delete();
2296  delete fOutputLists;
2297  fOutputLists = 0;
2298  }
2299 
2300  if (!sync) {
2302  Printf(" ");
2303  Info("Process","starting new query");
2304  }
2305 
2306  // Define fSelector in Client if processing with filename
2307  if (fCreateSelObj) {
2308  SafeDelete(fSelector);
2309  if (!(fSelector = TSelector::GetSelector(selector_file))) {
2310  if (!sync)
2311  gSystem->RedirectOutput(0);
2312  return -1;
2313  }
2314  }
2315 
2316  fSelectorClass = 0;
2317  fSelectorClass = fSelector->IsA();
2318 
2319  // Add fSelector to inputlist if processing with object
2320  if (!fCreateSelObj) {
2321  // In any input list was set into the selector move it to the PROOF
2322  // input list, because we do not want to stream the selector one
2323  if (fSelector->GetInputList() && fSelector->GetInputList()->GetSize() > 0) {
2324  TIter nxi(fSelector->GetInputList());
2325  TObject *o = 0;
2326  while ((o = nxi())) {
2327  if (!fInput->FindObject(o)) {
2328  fInput->Add(o);
2329  if (!inputtmp) {
2330  inputtmp = new TList;
2331  inputtmp->SetOwner(kFALSE);
2332  }
2333  inputtmp->Add(o);
2334  }
2335  }
2336  }
2337  fInput->Add(fSelector);
2338  }
2339  // Set the input list for initialization
2340  fSelector->SetInputList(fInput);
2341  fSelector->SetOption(option);
2342  if (fSelector->GetOutputList()) fSelector->GetOutputList()->Clear();
2343 
2344  PDB(kLoop,1) Info("Process","Call Begin(0)");
2345  fSelector->Begin(0);
2346 
2347  // Reset the input list to avoid double streaming and related problems (saving
2348  // the TQueryResult)
2349  if (!fCreateSelObj) fSelector->SetInputList(0);
2350 
2351  // Send large input data objects, if any
2353 
2354  if (!sync)
2355  gSystem->RedirectOutput(0);
2356  }
2357 
2358  TCleanup clean(this);
2359  SetupFeedback();
2360 
2361  TString opt = option;
2362 
2363  // Old servers need a dedicated streamer
2364  if (fProof->fProtocol < 13)
2365  dset->SetWriteV3(kTRUE);
2366 
2367  // Workers will get the entry ranges from the packetizer
2368  Long64_t num = (gProofServ && gProofServ->IsMaster() && gProofServ->IsParallel()) ? -1 : nentries;
2369  Long64_t fst = (gProofServ && gProofServ->IsMaster() && gProofServ->IsParallel()) ? -1 : first;
2370 
2371  // Entry- or Event- list ?
2372  TEntryList *enl = (!fProof->IsMaster()) ? dynamic_cast<TEntryList *>(set->GetEntryList())
2373  : (TEntryList *)0;
2374  TEventList *evl = (!fProof->IsMaster() && !enl) ? dynamic_cast<TEventList *>(set->GetEntryList())
2375  : (TEventList *)0;
2376  if (fProof->fProtocol > 14) {
2377  if (fProcessMessage) delete fProcessMessage;
2379  mesg << set << fn << fInput << opt << num << fst << evl << sync << enl;
2380  (*fProcessMessage) << set << fn << fInput << opt << num << fst << evl << sync << enl;
2381  } else {
2382  mesg << set << fn << fInput << opt << num << fst << evl << sync;
2383  if (enl)
2384  // Not supported remotely
2385  Warning("Process","entry lists not supported by the server");
2386  }
2387 
2388  // Reset the merging progress information
2389  fProof->ResetMergePrg();
2390 
2391  Int_t nb = fProof->Broadcast(mesg);
2392  PDB(kGlobal,1) Info("Process", "Broadcast called: %d workers notified", nb);
2393  if (fProof->IsLite()) fProof->fNotIdle += nb;
2394 
2395  // Reset streamer choice
2396  if (fProof->fProtocol < 13)
2397  dset->SetWriteV3(kFALSE);
2398 
2399  // Redirect logs from master to special log frame
2400  if (IsClient())
2401  fProof->fRedirLog = kTRUE;
2402 
2403  if (!IsClient()){
2404  // Signal the start of finalize for the memory log grepping
2405  Info("Process|Svc", "Start merging Memory information");
2406  }
2407 
2408  if (!sync) {
2409  if (IsClient()) {
2410  // Asynchronous query: just make sure that asynchronous input
2411  // is enabled and return the prompt
2412  PDB(kGlobal,1) Info("Process","Asynchronous processing:"
2413  " activating CollectInputFrom");
2414  fProof->Activate();
2415 
2416  // Receive the acknowledgement and query sequential number
2417  fProof->Collect();
2418 
2419  return fProof->fSeqNum;
2420 
2421  } else {
2422  PDB(kGlobal,1) Info("Process","Calling Collect");
2423  fProof->Collect();
2424 
2425  HandleTimer(0); // force an update of final result
2426  // This forces a last call to TPacketizer::HandleTimer via the second argument
2427  // (the first is ignored). This is needed when some events were skipped so that
2428  // the total number of entries is not the one requested. The packetizer has no
2429  // way in such a case to understand that processing is finished: it must be told.
2430  if (fPacketizer) {
2431  fPacketizer->StopProcess(kFALSE, kTRUE);
2432  // The progress timer will now stop itself at the next call
2434  // Store process info
2435  elapsed.Stop();
2436  if (fQuery)
2437  fQuery->SetProcessInfo(0, 0., fPacketizer->GetBytesRead(),
2439  elapsed.RealTime());
2440  }
2441  StopFeedback();
2442 
2443  return Finalize(kFALSE,sync);
2444  }
2445  } else {
2446 
2447  PDB(kGlobal,1) Info("Process","Synchronous processing: calling Collect");
2448  fProof->Collect();
2449  if (!(fProof->IsSync())) {
2450  // The server required to switch to asynchronous mode
2451  Info("Process", "switching to the asynchronous mode ...");
2452  return fProof->fSeqNum;
2453  }
2454 
2455  // Restore prompt logging, for clients (Collect leaves things as they were
2456  // at the time it was called)
2457  if (IsClient())
2458  fProof->fRedirLog = kFALSE;
2459 
2460  if (!IsClient()) {
2461  // Force an update of final result
2462  HandleTimer(0);
2463  // This forces a last call to TPacketizer::HandleTimer via the second argument
2464  // (the first is ignored). This is needed when some events were skipped so that
2465  // the total number of entries is not the one requested. The packetizer has no
2466  // way in such a case to understand that processing is finished: it must be told.
2467  if (fPacketizer) {
2468  fPacketizer->StopProcess(kFALSE, kTRUE);
2469  // The progress timer will now stop itself at the next call
2471  // Store process info
2472  if (fQuery)
2473  fQuery->SetProcessInfo(0, 0., fPacketizer->GetBytesRead(),
2476  }
2477  } else {
2478  // Set the input list: maybe required at termination
2479  if (!fCreateSelObj) fSelector->SetInputList(fInput);
2480  }
2481  StopFeedback();
2482 
2483  Long64_t rc = -1;
2485  rc = Finalize(kFALSE,sync);
2486 
2487  // Remove temporary input objects, if any
2488  if (inputtmp) {
2489  TIter nxi(inputtmp);
2490  TObject *o = 0;
2491  while ((o = nxi())) fInput->Remove(o);
2492  SafeDelete(inputtmp);
2493  }
2494 
2495  // Done
2496  return rc;
2497  }
2498 }
2499 
2500 ////////////////////////////////////////////////////////////////////////////////
2501 /// Process specified TDSet on PROOF.
2502 /// This method is called on client and on the PROOF master.
2503 /// The return value is -1 in case of an error and TSelector::GetStatus() in
2504 /// in case of success.
2505 
2507  Option_t *option, Long64_t nentries,
2508  Long64_t first)
2509 {
2510  if (!selector) {
2511  Error("Process", "selector object undefined");
2512  return -1;
2513  }
2514 
2515  // Define fSelector in Client
2516  if (IsClient() && (selector != fSelector)) {
2517  if (fCreateSelObj) SafeDelete(fSelector);
2518  fSelector = selector;
2519  }
2520 
2522  Long64_t rc = Process(dset, selector->ClassName(), option, nentries, first);
2523  fCreateSelObj = kTRUE;
2524 
2525  // Done
2526  return rc;
2527 }
2528 
2529 ////////////////////////////////////////////////////////////////////////////////
2530 /// Prepares the given list of new workers to join a progressing process.
2531 /// Returns kTRUE on success, kFALSE otherwise.
2532 
2534 {
2535  if (!fProcessMessage || !fProof || !fPacketizer) {
2536  Error("Process", "Should not happen: fProcessMessage=%p fProof=%p fPacketizer=%p",
2538  return kFALSE;
2539  }
2540 
2541  if (!workers || !fProof->IsMaster()) {
2542  Error("Process", "Invalid call");
2543  return kFALSE;
2544  }
2545 
2546  PDB(kGlobal, 1)
2547  Info("Process", "Preparing %d new worker(s) to process", workers->GetEntries());
2548 
2549  // Sends the file associated to the TSelector, if necessary
2550  if (fCreateSelObj) {
2551  PDB(kGlobal, 2)
2552  Info("Process", "Sending selector file %s", fSelectorFileName.Data());
2554  Error("Process", "Problems in sending selector file %s", fSelectorFileName.Data());
2555  return kFALSE;
2556  }
2557  }
2558 
2559  if (fProof->IsLite()) fProof->fNotIdle += workers->GetSize();
2560 
2561  PDB(kGlobal, 2)
2562  Info("Process", "Adding new workers to the packetizer");
2563  if (fPacketizer->AddWorkers(workers) == -1) {
2564  Error("Process", "Cannot add new workers to the packetizer!");
2565  return kFALSE; // TODO: make new wrks inactive
2566  }
2567 
2568  PDB(kGlobal, 2)
2569  Info("Process", "Broadcasting process message to new workers");
2570  fProof->Broadcast(*fProcessMessage, workers);
2571 
2572  // Don't call Collect(): we came here from a global Collect() already which
2573  // will take care of new workers as well
2574 
2575  return kTRUE;
2576 
2577 }
2578 
2579 ////////////////////////////////////////////////////////////////////////////////
2580 /// Merge output in files
2581 
2583 {
2584  PDB(kOutput,1) Info("MergeOutputFiles", "enter: fOutput size: %d", fOutput->GetSize());
2585  PDB(kOutput,2) fOutput->ls();
2586 
2587  TList *rmList = 0;
2588  if (fMergeFiles) {
2589  TIter nxo(fOutput);
2590  TObject *o = 0;
2591  TProofOutputFile *pf = 0;
2592  while ((o = nxo())) {
2593  if ((pf = dynamic_cast<TProofOutputFile*>(o))) {
2594 
2595  PDB(kOutput,2) pf->Print();
2596 
2597  if (pf->IsMerge()) {
2598 
2599  // Point to the merger
2600  Bool_t localMerge = (pf->GetTypeOpt() == TProofOutputFile::kLocal) ? kTRUE : kFALSE;
2601  TFileMerger *filemerger = pf->GetFileMerger(localMerge);
2602  if (!filemerger) {
2603  Error("MergeOutputFiles", "file merger is null in TProofOutputFile! Protocol error?");
2604  pf->Print();
2605  continue;
2606  }
2607  // If only one instance the list in the merger is not yet created: do it now
2608  if (!pf->IsMerged()) {
2609  PDB(kOutput,2) pf->Print();
2610  TString fileLoc = TString::Format("%s/%s", pf->GetDir(), pf->GetFileName());
2611  filemerger->AddFile(fileLoc);
2612  }
2613  // Datadir
2614  TString ddir, ddopts;
2615  if (gProofServ) {
2616  ddir.Form("%s/", gProofServ->GetDataDir());
2618  }
2619  // Set the output file
2621  if (outfile.Contains("<datadir>/")) {
2622  outfile.ReplaceAll("<datadir>/", ddir.Data());
2623  if (!ddopts.IsNull())
2624  outfile += TString::Format("?%s", ddopts.Data());
2626  }
2627  if ((gProofServ && gProofServ->IsTopMaster()) || (fProof && fProof->IsLite())) {
2629  TString srv;
2631  TUrl usrv(srv);
2632  Bool_t localFile = kFALSE;
2633  if (pf->IsRetrieve()) {
2634  // This file will be retrieved by the client: we created it in the data dir
2635  // and save the file URL on the client in the title
2636  if (outfile.BeginsWith("client:")) outfile.Replace(0, 7, "");
2637  TString bn = gSystem->BaseName(TUrl(outfile.Data(), kTRUE).GetFile());
2638  // The output file path on the master
2639  outfile.Form("%s%s", ddir.Data(), bn.Data());
2640  // Save the client path in the title if not defined yet
2641  if (strlen(pf->GetTitle()) <= 0) pf->SetTitle(bn);
2642  // The file is local
2643  localFile = kTRUE;
2644  } else {
2645  // Check if the file is on the master or elsewhere
2646  if (outfile.BeginsWith("master:")) outfile.Replace(0, 7, "");
2647  // Check locality
2648  TUrl uof(outfile.Data(), kTRUE);
2649  TString lfn;
2650  ftyp = TFile::GetType(uof.GetUrl(), "RECREATE", &lfn);
2651  if (ftyp == TFile::kLocal && !srv.IsNull()) {
2652  // Check if is a different server
2653  if (uof.GetPort() > 0 && usrv.GetPort() > 0 &&
2654  usrv.GetPort() != uof.GetPort()) ftyp = TFile::kNet;
2655  }
2656  // If it is really local set the file name
2657  if (ftyp == TFile::kLocal) outfile = lfn;
2658  // The file maybe local
2659  if (ftyp == TFile::kLocal || ftyp == TFile::kFile) localFile = kTRUE;
2660  }
2661  // The remote output file name (the one to be used by the client)
2662  TString outfilerem(outfile);
2663  // For local files we add the local server
2664  if (localFile) {
2665  // Remove prefix, if any, if included and if Xrootd
2666  TProofServ::FilterLocalroot(outfilerem, srv);
2667  outfilerem.Insert(0, srv);
2668  }
2669  // Save the new remote output filename
2670  pf->SetOutputFileName(outfilerem);
2671  // Align the filename
2672  pf->SetFileName(gSystem->BaseName(outfilerem));
2673  }
2674  if (!filemerger->OutputFile(outfile)) {
2675  Error("MergeOutputFiles", "cannot open the output file");
2676  continue;
2677  }
2678  // Merge
2679  PDB(kSubmerger,2) filemerger->PrintFiles("");
2680  if (!filemerger->Merge()) {
2681  Error("MergeOutputFiles", "cannot merge the output files");
2682  continue;
2683  }
2684  // Remove the files
2685  TList *fileList = filemerger->GetMergeList();
2686  if (fileList) {
2687  TIter next(fileList);
2688  TObjString *url = 0;
2689  while((url = (TObjString*)next())) {
2690  TUrl u(url->GetName());
2691  if (!strcmp(u.GetProtocol(), "file")) {
2692  gSystem->Unlink(u.GetFile());
2693  } else {
2694  gSystem->Unlink(url->GetName());
2695  }
2696  }
2697  }
2698  // Reset the merger
2699  filemerger->Reset();
2700 
2701  } else {
2702 
2703  // If not yet merged (for example when having only 1 active worker,
2704  // we need to create the dataset by calling Merge on an effectively empty list
2705  if (!pf->IsMerged()) {
2706  TList dumlist;
2707  dumlist.Add(new TNamed("dum", "dum"));
2708  dumlist.SetOwner(kTRUE);
2709  pf->Merge(&dumlist);
2710  }
2711  // Point to the dataset
2713  if (!fc) {
2714  Error("MergeOutputFiles", "file collection is null in TProofOutputFile! Protocol error?");
2715  pf->Print();
2716  continue;
2717  }
2718  // Add the collection to the output list for registration and/or to be returned
2719  // to the client
2720  fOutput->Add(fc);
2721  // Do not cleanup at destruction
2722  pf->ResetFileCollection();
2723  // Tell the main thread to register this dataset, if needed
2724  if (pf->IsRegister()) {
2725  TString opt;
2726  if ((pf->GetTypeOpt() & TProofOutputFile::kOverwrite)) opt += "O";
2727  if ((pf->GetTypeOpt() & TProofOutputFile::kVerify)) opt += "V";
2728  if (!fOutput->FindObject("PROOFSERV_RegisterDataSet"))
2729  fOutput->Add(new TNamed("PROOFSERV_RegisterDataSet", ""));
2730  TString tag = TString::Format("DATASET_%s", pf->GetTitle());
2731  fOutput->Add(new TNamed(tag, opt));
2732  }
2733  // Remove this object from the output list and schedule it for distruction
2734  fOutput->Remove(pf);
2735  if (!rmList) rmList = new TList;
2736  rmList->Add(pf);
2737  PDB(kOutput,2) fOutput->Print();
2738  }
2739  }
2740  }
2741  }
2742 
2743  // Remove objects scheduled for removal
2744  if (rmList && rmList->GetSize() > 0) {
2745  TIter nxo(rmList);
2746  TObject *o = 0;
2747  while((o = nxo())) {
2748  fOutput->Remove(o);
2749  }
2750  rmList->SetOwner(kTRUE);
2751  delete rmList;
2752  }
2753 
2754  PDB(kOutput,1) Info("MergeOutputFiles", "done!");
2755 
2756  // Done
2757  return kTRUE;
2758 }
2759 
2760 
2761 ////////////////////////////////////////////////////////////////////////////////
2762 /// Set the selector's data members:
2763 /// find the mapping of data members to otuput list entries in the output list
2764 /// and apply it.
2765 
2767 {
2770  if (!olsdm) {
2771  PDB(kOutput,1) Warning("SetSelectorDataMembersFromOutputList",
2772  "failed to find map object in output list!");
2773  return;
2774  }
2775 
2776  olsdm->SetDataMembers(fSelector);
2777 }
2778 
2779 ////////////////////////////////////////////////////////////////////////////////
2780 
2782 {
2783  // Finalize a query.
2784  // Returns -1 in case of an error, 0 otherwise.
2785 
2786  if (IsClient()) {
2787  if (fOutputLists == 0) {
2788  if (force)
2789  if (fQuery)
2790  return fProof->Finalize(Form("%s:%s", fQuery->GetTitle(),
2791  fQuery->GetName()), force);
2792  } else {
2793  // Make sure the all objects are in the output list
2794  PDB(kGlobal,1) Info("Finalize","Calling Merge Output to finalize the output list");
2795  MergeOutput();
2796  }
2797  }
2798 
2799  Long64_t rv = 0;
2800  if (fProof->IsMaster()) {
2801 
2802  // Fill information for monitoring and stop it
2803  TStatus *status = (TStatus *) fOutput->FindObject("PROOF_Status");
2804  if (!status) {
2805  // The query was aborted: let's add some info in the output list
2806  status = new TStatus();
2807  fOutput->Add(status);
2808  TString emsg = TString::Format("Query aborted after %lld entries", GetEventsProcessed());
2809  status->Add(emsg);
2810  }
2811  status->SetExitStatus((Int_t) GetExitStatus());
2812 
2813  PDB(kOutput,1) Info("Finalize","Calling Merge Output");
2814  // Some objects (e.g. histos in autobin) may not have been merged yet
2815  // do it now
2816  MergeOutput();
2817 
2818  fOutput->SetOwner();
2819 
2820  // Add the active-wrks-vs-proctime info from the packetizer
2821  if (fPacketizer) {
2822  TObject *pperf = (TObject *) fPacketizer->GetProgressPerf(kTRUE);
2823  if (pperf) fOutput->Add(pperf);
2824  TList *parms = fPacketizer->GetConfigParams(kTRUE);
2825  if (parms) {
2826  TIter nxo(parms);
2827  TObject *o = 0;
2828  while ((o = nxo())) fOutput->Add(o);
2829  }
2830 
2831  // If other invalid elements were found during processing, add them to the
2832  // list of missing elements
2833  TDSetElement *elem = 0;
2834  if (fPacketizer->GetFailedPackets()) {
2836  TList *listOfMissingFiles = (TList *) fOutput->FindObject("MissingFiles");
2837  if (!listOfMissingFiles) {
2838  listOfMissingFiles = new TList;
2839  listOfMissingFiles->SetName("MissingFiles");
2840  }
2842  while ((elem = (TDSetElement *)nxe()))
2843  listOfMissingFiles->Add(elem->GetFileInfo(type));
2844  if (!fOutput->FindObject(listOfMissingFiles)) fOutput->Add(listOfMissingFiles);
2845  }
2846  }
2847 
2848  TPerfStats::Stop();
2849  // Save memory usage on master
2850  Long_t vmaxmst, rmaxmst;
2851  TPerfStats::GetMemValues(vmaxmst, rmaxmst);
2852  status->SetMemValues(vmaxmst, rmaxmst, kTRUE);
2853 
2854  SafeDelete(fSelector);
2855 
2856  } else {
2857  if (fExitStatus != kAborted) {
2858 
2859  if (!sync) {
2860  // Reinit selector (with multi-sessioning we must do this until
2861  // TSelector::GetSelector() is optimized to i) avoid reloading of an
2862  // unchanged selector and ii) invalidate existing instances of
2863  // reloaded selector)
2864  if (ReinitSelector(fQuery) == -1) {
2865  Info("Finalize", "problems reinitializing selector \"%s\"",
2866  fQuery->GetSelecImp()->GetName());
2867  return -1;
2868  }
2869  }
2870 
2871  if (fPacketizer)
2872  if (TList *failedPackets = fPacketizer->GetFailedPackets()) {
2874  failedPackets->SetName("FailedPackets");
2875  AddOutputObject(failedPackets);
2876 
2877  TStatus *status = (TStatus *)GetOutput("PROOF_Status");
2878  if (!status) AddOutputObject((status = new TStatus()));
2879  status->Add("Some packets were not processed! Check the the"
2880  " 'FailedPackets' list in the output list");
2881  }
2882 
2883  // Some input parameters may be needed in Terminate
2884  fSelector->SetInputList(fInput);
2885 
2886  TList *output = fSelector->GetOutputList();
2887  if (output) {
2888  TIter next(fOutput);
2889  while(TObject* obj = next()) {
2890  if (fProof->IsParallel() || DrawCanvas(obj) == 1)
2891  // Either parallel or not a canvas or not able to display it:
2892  // just add to the list
2893  output->Add(obj);
2894  }
2895  } else {
2896  Warning("Finalize", "undefined output list in the selector! Protocol error?");
2897  }
2898 
2899  // We need to do this because the output list can be modified in TSelector::Terminate
2900  // in a way to invalidate existing objects; so we clean the links when still valid and
2901  // we re-copy back later
2902  fOutput->SetOwner(kFALSE);
2903  fOutput->Clear("nodelete");
2904 
2905  // Map output objects to selector members
2907 
2908  PDB(kLoop,1) Info("Finalize","Call Terminate()");
2909  // This is the end of merging
2910  SetMerging(kFALSE);
2911  // We measure the merge time
2912  fProof->fQuerySTW.Reset();
2913  // Call Terminate now
2914  fSelector->Terminate();
2915 
2916  rv = fSelector->GetStatus();
2917 
2918  // Copy the output list back and clean the selector's list
2919  TIter it(output);
2920  while(TObject* o = it()) {
2921  fOutput->Add(o);
2922  }
2923 
2924  // Save the output list in the current query, if any
2925  if (fQuery) {
2926  fQuery->SetOutputList(fOutput);
2927  // Set in finalized state (cannot be done twice)
2928  fQuery->SetFinalized();
2929  } else {
2930  Warning("Finalize","current TQueryResult object is undefined!");
2931  }
2932 
2933  if (!fCreateSelObj) {
2934  fInput->Remove(fSelector);
2935  fOutput->Remove(fSelector);
2936  if (output) output->Remove(fSelector);
2937  fSelector = 0;
2938  }
2939 
2940  // We have transferred copy of the output objects in TQueryResult,
2941  // so now we can cleanup the selector, making sure that we do not
2942  // touch the output objects
2943  if (output) { output->SetOwner(kFALSE); output->Clear("nodelete"); }
2944  if (fCreateSelObj) SafeDelete(fSelector);
2945 
2946  // Delete fOutput (not needed anymore, cannot be finalized twice),
2947  // making sure that the objects saved in TQueryResult are not deleted
2948  fOutput->SetOwner(kFALSE);
2949  fOutput->Clear("nodelete");
2950  SafeDelete(fOutput);
2951 
2952  } else {
2953 
2954  // Cleanup
2955  fOutput->SetOwner();
2956  SafeDelete(fSelector);
2957  if (!fCreateSelObj) fSelector = 0;
2958  }
2959  }
2960  PDB(kGlobal,1) Info("Process","exit");
2961 
2962  if (!IsClient()) {
2963  Info("Finalize", "finalization on %s finished", gProofServ->GetPrefix());
2964  }
2966 
2967  return rv;
2968 }
2969 
2970 ////////////////////////////////////////////////////////////////////////////////
2971 /// Finalize the results of a query already processed.
2972 
2974 {
2975  PDB(kGlobal,1) Info("Finalize(TQueryResult *)","Enter");
2976 
2977  if (!IsClient()) {
2978  Info("Finalize(TQueryResult *)",
2979  "method to be executed only on the clients");
2980  return -1;
2981  }
2982 
2983  if (!qr) {
2984  Info("Finalize(TQueryResult *)", "query undefined");
2985  return -1;
2986  }
2987 
2988  if (qr->IsFinalized()) {
2989  Info("Finalize(TQueryResult *)", "query already finalized");
2990  return -1;
2991  }
2992 
2993  // Reset the list
2994  if (!fOutput)
2995  fOutput = new THashList;
2996  else
2997  fOutput->Clear();
2998 
2999  // Make sure that the temporary output list is empty
3000  if (fOutputLists) {
3001  fOutputLists->Delete();
3002  delete fOutputLists;
3003  fOutputLists = 0;
3004  }
3005 
3006  // Re-init the selector
3008 
3009  // Import the output list
3010  TList *tmp = (TList *) qr->GetOutputList();
3011  if (!tmp) {
3012  gSystem->RedirectOutput(0);
3013  Info("Finalize(TQueryResult *)", "outputlist is empty");
3014  return -1;
3015  }
3016  TList *out = fOutput;
3017  if (fProof->fProtocol < 11)
3018  out = new TList;
3019  TIter nxo(tmp);
3020  TObject *o = 0;
3021  while ((o = nxo()))
3022  out->Add(o->Clone());
3023 
3024  // Adopts the list
3025  if (fProof->fProtocol < 11) {
3026  out->SetOwner();
3027  StoreOutput(out);
3028  }
3029  gSystem->RedirectOutput(0);
3030 
3032 
3033  // Finalize it
3034  SetCurrentQuery(qr);
3035  Long64_t rc = Finalize();
3037 
3038  return rc;
3039 }
3040 
3041 ////////////////////////////////////////////////////////////////////////////////
3042 /// Send the selector file(s) to master or worker nodes.
3043 
3044 Bool_t TProofPlayerRemote::SendSelector(const char* selector_file)
3045 {
3046  // Check input
3047  if (!selector_file) {
3048  Info("SendSelector", "Invalid input: selector (file) name undefined");
3049  return kFALSE;
3050  }
3051 
3052  if (!strchr(gSystem->BaseName(selector_file), '.')) {
3053  if (gDebug > 1)
3054  Info("SendSelector", "selector name '%s' does not contain a '.':"
3055  " nothing to send, it will be loaded from a library", selector_file);
3056  return kTRUE;
3057  }
3058 
3059  // Extract the fine name first
3060  TString selec = selector_file;
3061  TString aclicMode;
3062  TString arguments;
3063  TString io;
3064  selec = gSystem->SplitAclicMode(selec, aclicMode, arguments, io);
3065 
3066  // Expand possible envs or '~'
3067  gSystem->ExpandPathName(selec);
3068 
3069  // Update the macro path
3071  TString np(gSystem->DirName(selec));
3072  if (!np.IsNull()) {
3073  np += ":";
3074  if (!mp.BeginsWith(np) && !mp.Contains(":"+np)) {
3075  Int_t ip = (mp.BeginsWith(".:")) ? 2 : 0;
3076  mp.Insert(ip, np);
3077  TROOT::SetMacroPath(mp);
3078  if (gDebug > 0)
3079  Info("SendSelector", "macro path set to '%s'", TROOT::GetMacroPath());
3080  }
3081  }
3082 
3083  // Header file
3084  TString header = selec;
3085  header.Remove(header.Last('.'));
3086  header += ".h";
3087  if (gSystem->AccessPathName(header, kReadPermission)) {
3088  TString h = header;
3089  header.Remove(header.Last('.'));
3090  header += ".hh";
3091  if (gSystem->AccessPathName(header, kReadPermission)) {
3092  Info("SendSelector",
3093  "header file not found: tried: %s %s", h.Data(), header.Data());
3094  return kFALSE;
3095  }
3096  }
3097 
3098  // Send files now
3100  Info("SendSelector", "problems sending implementation file %s", selec.Data());
3101  return kFALSE;
3102  }
3103  if (fProof->SendFile(header, (TProof::kBinary | TProof::kForward | TProof::kCp)) == -1) {
3104  Info("SendSelector", "problems sending header file %s", header.Data());
3105  return kFALSE;
3106  }
3107 
3108  return kTRUE;
3109 }
3110 
3111 ////////////////////////////////////////////////////////////////////////////////
3112 /// Merge objects in output the lists.
3113 
3115 {
3116  PDB(kOutput,1) Info("MergeOutput","Enter");
3117 
3118  TObject *obj = 0;
3119  if (fOutputLists) {
3120 
3122 
3123  TList *list;
3124  while ( (list = (TList *) next()) ) {
3125 
3126  if (!(obj = fOutput->FindObject(list->GetName()))) {
3127  obj = list->First();
3128  list->Remove(obj);
3129  fOutput->Add(obj);
3130  }
3131 
3132  if ( list->IsEmpty() ) continue;
3133 
3134  TMethodCall callEnv;
3135  if (obj->IsA())
3136  callEnv.InitWithPrototype(obj->IsA(), "Merge", "TCollection*");
3137  if (callEnv.IsValid()) {
3138  callEnv.SetParam((Long_t) list);
3139  callEnv.Execute(obj);
3140  } else {
3141  // No Merge interface, return individual objects
3142  while ( (obj = list->First()) ) {
3143  fOutput->Add(obj);
3144  list->Remove(obj);
3145  }
3146  }
3147  }
3149 
3150  } else {
3151 
3152  PDB(kOutput,1) Info("MergeOutput","fOutputLists empty");
3153  }
3154 
3155  if (!IsClient() || fProof->IsLite()) {
3156  // Merge the output files created on workers, if any
3157  MergeOutputFiles();
3158  }
3159 
3160  // If there are TProofOutputFile objects we have to make sure that the internal
3161  // information is consistent for the cases where this object is going to be merged
3162  // again (e.g. when using submergers or in a multi-master setup). This may not be
3163  // the case because the first coming in is taken as reference and it has the
3164  // internal dir and raw dir of the originating worker.
3165  TString key;
3166  TNamed *nm = 0;
3167  TList rmlist;
3168  TIter nxo(fOutput);
3169  while ((obj = nxo())) {
3170  TProofOutputFile *pf = dynamic_cast<TProofOutputFile *>(obj);
3171  if (pf) {
3172  if (gProofServ) {
3173  PDB(kOutput,2) Info("MergeOutput","found TProofOutputFile '%s'", obj->GetName());
3174  TString dir(pf->GetOutputFileName());
3175  PDB(kOutput,2) Info("MergeOutput","outputfilename: '%s'", dir.Data());
3176  // The dir
3177  if (dir.Last('/') != kNPOS) dir.Remove(dir.Last('/')+1);
3178  PDB(kOutput,2) Info("MergeOutput","dir: '%s'", dir.Data());
3179  pf->SetDir(dir);
3180  // The raw dir; for xrootd based system we include the 'localroot', if any
3181  TUrl u(dir);
3182  dir = u.GetFile();
3183  TString pfx = gEnv->GetValue("Path.Localroot","");
3184  if (!pfx.IsNull() &&
3185  (!strcmp(u.GetProtocol(), "root") || !strcmp(u.GetProtocol(), "xrd")))
3186  dir.Insert(0, pfx);
3187  PDB(kOutput,2) Info("MergeOutput","rawdir: '%s'", dir.Data());
3188  pf->SetDir(dir, kTRUE);
3189  // The worker ordinal
3191  // The saved output file name, if any
3192  key.Form("PROOF_OutputFileName_%s", pf->GetFileName());
3193  if ((nm = (TNamed *) fOutput->FindObject(key.Data()))) {
3194  pf->SetOutputFileName(nm->GetTitle());
3195  rmlist.Add(nm);
3197  pf->SetOutputFileName(0);
3199  }
3200  // The filename (order is important to exclude '.merger' from the key)
3201  dir = pf->GetFileName();
3203  dir += ".merger";
3204  pf->SetMerged(kFALSE);
3205  } else {
3206  if (dir.EndsWith(".merger")) dir.Remove(dir.Last('.'));
3207  }
3208  pf->SetFileName(dir);
3209  } else if (fProof->IsLite()) {
3210  // The ordinal
3211  pf->SetWorkerOrdinal("0");
3212  // The dir
3213  pf->SetDir(gSystem->DirName(pf->GetOutputFileName()));
3214  // The filename and raw dir
3215  TUrl u(pf->GetOutputFileName(), kTRUE);
3216  pf->SetFileName(gSystem->BaseName(u.GetFile()));
3217  pf->SetDir(gSystem->DirName(u.GetFile()), kTRUE);
3218  // Notify the output path
3219  Printf("\nOutput file: %s", pf->GetOutputFileName());
3220  }
3221  } else {
3222  PDB(kOutput,2) Info("MergeOutput","output object '%s' is not a TProofOutputFile", obj->GetName());
3223  }
3224  }
3225 
3226  // Remove temporary objects from fOutput
3227  if (rmlist.GetSize() > 0) {
3228  TIter nxrm(&rmlist);
3229  while ((obj = nxrm()))
3230  fOutput->Remove(obj);
3231  rmlist.SetOwner(kTRUE);
3232  }
3233 
3234  // If requested (typically in case of submerger to count possible side-effects in that process)
3235  // save the measured memory usage
3236  if (saveMemValues) {
3237  TPerfStats::Stop();
3238  // Save memory usage on master
3239  Long_t vmaxmst, rmaxmst;
3240  TPerfStats::GetMemValues(vmaxmst, rmaxmst);
3241  TStatus *status = (TStatus *) fOutput->FindObject("PROOF_Status");
3242  if (status) status->SetMemValues(vmaxmst, rmaxmst, kFALSE);
3243  }
3244 
3245  PDB(kOutput,1) fOutput->Print();
3246  PDB(kOutput,1) Info("MergeOutput","leave (%d object(s))", fOutput->GetSize());
3247 }
3248 
3249 ////////////////////////////////////////////////////////////////////////////////
3250 /// Progress signal.
3251 
3253 {
3254  if (IsClient()) {
3255  fProof->Progress(total, processed);
3256  } else {
3257  // Send to the previous tier
3259  m << total << processed;
3260  gProofServ->GetSocket()->Send(m);
3261  }
3262 }
3263 
3264 ////////////////////////////////////////////////////////////////////////////////
3265 /// Progress signal.
3266 
3268  Long64_t bytesread,
3269  Float_t initTime, Float_t procTime,
3270  Float_t evtrti, Float_t mbrti)
3271 {
3272  PDB(kGlobal,1)
3273  Info("Progress","%lld %lld %lld %f %f %f %f", total, processed, bytesread,
3274  initTime, procTime, evtrti, mbrti);
3275 
3276  if (IsClient()) {
3277  fProof->Progress(total, processed, bytesread, initTime, procTime, evtrti, mbrti);
3278  } else {
3279  // Send to the previous tier
3281  m << total << processed << bytesread << initTime << procTime << evtrti << mbrti;
3282  gProofServ->GetSocket()->Send(m);
3283  }
3284 }
3285 
3286 ////////////////////////////////////////////////////////////////////////////////
3287 /// Progress signal.
3288 
3290 {
3291  if (pi) {
3292  PDB(kGlobal,1)
3293  Info("Progress","%lld %lld %lld %f %f %f %f %d %f", pi->fTotal, pi->fProcessed, pi->fBytesRead,
3294  pi->fInitTime, pi->fProcTime, pi->fEvtRateI, pi->fMBRateI,
3295  pi->fActWorkers, pi->fEffSessions);
3296 
3297  if (IsClient()) {
3298  fProof->Progress(pi->fTotal, pi->fProcessed, pi->fBytesRead,
3299  pi->fInitTime, pi->fProcTime,
3300  pi->fEvtRateI, pi->fMBRateI,
3301  pi->fActWorkers, pi->fTotSessions, pi->fEffSessions);
3302  } else {
3303  // Send to the previous tier
3305  m << pi;
3306  gProofServ->GetSocket()->Send(m);
3307  }
3308  } else {
3309  Warning("Progress","TProofProgressInfo object undefined!");
3310  }
3311 }
3312 
3313 
3314 ////////////////////////////////////////////////////////////////////////////////
3315 /// Feedback signal.
3316 
3318 {
3319  fProof->Feedback(objs);
3320 }
3321 
3322 ////////////////////////////////////////////////////////////////////////////////
3323 /// Stop process after this event.
3324 
3326 {
3327  if (fPacketizer != 0)
3328  fPacketizer->StopProcess(abort, kFALSE);
3329  if (abort == kTRUE)
3331  else
3333 }
3334 
3335 ////////////////////////////////////////////////////////////////////////////////
3336 /// Incorporate the received object 'obj' into the output list fOutput.
3337 /// The latter is created if not existing.
3338 /// This method short cuts 'StoreOutput + MergeOutput' optimizing the memory
3339 /// consumption.
3340 /// Returns -1 in case of error, 1 if the object has been merged into another
3341 /// one (so that its ownership has not been taken and can be deleted), and 0
3342 /// otherwise.
3343 
3345 {
3346  PDB(kOutput,1)
3347  Info("AddOutputObject","Enter: %p (%s)", obj, obj ? obj->ClassName() : "undef");
3348 
3349  // We must something to process
3350  if (!obj) {
3351  PDB(kOutput,1) Info("AddOutputObject","Invalid input (obj == 0x0)");
3352  return -1;
3353  }
3354 
3355  // Create the output list, if not yet done
3356  if (!fOutput)
3357  fOutput = new THashList;
3358 
3359  // Flag about merging
3360  Bool_t merged = kTRUE;
3361 
3362  // Process event lists first
3363  TList *elists = dynamic_cast<TList *> (obj);
3364  if (elists && !strcmp(elists->GetName(), "PROOF_EventListsList")) {
3365 
3366  // Create a global event list, result of merging the event lists
3367  // coresponding to the various data set elements
3368  TEventList *evlist = new TEventList("PROOF_EventList");
3369 
3370  // Iterate the list of event list segments
3371  TIter nxevl(elists);
3372  TEventList *evl = 0;
3373  while ((evl = dynamic_cast<TEventList *> (nxevl()))) {
3374 
3375  // Find the file offset (fDSet is the current TDSet instance)
3376  // locating the element by name
3377  TIter nxelem(fDSet->GetListOfElements());
3378  TDSetElement *elem = 0;
3379  while ((elem = dynamic_cast<TDSetElement *> (nxelem()))) {
3380  if (!strcmp(elem->GetFileName(), evl->GetName()))
3381  break;
3382  }
3383  if (!elem) {
3384  Error("AddOutputObject", "Found an event list for %s, but no object with"
3385  " the same name in the TDSet", evl->GetName());
3386  continue;
3387  }
3388  Long64_t offset = elem->GetTDSetOffset();
3389 
3390  // Shift the list by the number of first event in that file
3391  Long64_t *arr = evl->GetList();
3392  Int_t num = evl->GetN();
3393  if (arr && offset > 0)
3394  for (Int_t i = 0; i < num; i++)
3395  arr[i] += offset;
3396 
3397  // Add to the global event list
3398  evlist->Add(evl);
3399  }
3400 
3401  // Incorporate the resulting global list in fOutput
3402  SetLastMergingMsg(evlist);
3403  Incorporate(evlist, fOutput, merged);
3404  NotifyMemory(evlist);
3405 
3406  // Delete the global list if merged
3407  if (merged)
3408  SafeDelete(evlist);
3409 
3410  // The original object has been transformed in something else; we do
3411  // not have ownership on it
3412  return 1;
3413  }
3414 
3415  // Check if we need to merge files
3416  TProofOutputFile *pf = dynamic_cast<TProofOutputFile*>(obj);
3417  if (pf) {
3418  fMergeFiles = kTRUE;
3419  if (!IsClient() || fProof->IsLite()) {
3420  if (pf->IsMerge()) {
3421  Bool_t hasfout = (pf->GetOutputFileName() &&
3422  strlen(pf->GetOutputFileName()) > 0 &&
3423  pf->TestBit(TProofOutputFile::kOutputFileNameSet)) ? kTRUE : kFALSE;
3424  Bool_t setfout = (!hasfout || TestBit(TVirtualProofPlayer::kIsSubmerger)) ? kTRUE : kFALSE;
3425  if (setfout) {
3426 
3427  TString ddir, ddopts;
3428  if (gProofServ) {
3429  ddir.Form("%s/", gProofServ->GetDataDir());
3430  if (gProofServ->GetDataDirOpts()) ddopts = gProofServ->GetDataDirOpts();
3431  }
3432  // Set the output file
3434  outfile.ReplaceAll("<datadir>/", ddir.Data());
3435  if (!ddopts.IsNull()) outfile += TString::Format("?%s", ddopts.Data());
3437 
3438  if (gProofServ) {
3439  // If submerger, save first the existing filename, if any
3440  if (TestBit(TVirtualProofPlayer::kIsSubmerger) && hasfout) {
3441  TString key = TString::Format("PROOF_OutputFileName_%s", pf->GetFileName());
3442  if (!fOutput->FindObject(key.Data()))
3443  fOutput->Add(new TNamed(key.Data(), pf->GetOutputFileName()));
3444  }
3445  TString of;
3447  if (of.IsNull()) {
3448  // Assume an xroot server running on the machine
3449  of.Form("root://%s/", gSystem->HostName());
3450  if (gSystem->Getenv("XRDPORT")) {
3451  TString sp(gSystem->Getenv("XRDPORT"));
3452  if (sp.IsDigit())
3453  of.Form("root://%s:%s/", gSystem->HostName(), sp.Data());
3454  }
3455  }
3456  TString sessionPath(gProofServ->GetSessionDir());
3457  TProofServ::FilterLocalroot(sessionPath, of);
3458  of += TString::Format("%s/%s", sessionPath.Data(), pf->GetFileName());
3460  if (!of.EndsWith(".merger")) of += ".merger";
3461  } else {
3462  if (of.EndsWith(".merger")) of.Remove(of.Last('.'));
3463  }
3464  pf->SetOutputFileName(of);
3465  }
3466  }
3467  // Notify
3468  PDB(kOutput, 1) pf->Print();
3469  }
3470  } else {
3471  // On clients notify the output path
3472  Printf("Output file: %s", pf->GetOutputFileName());
3473  }
3474  }
3475 
3476  // For other objects we just run the incorporation procedure
3477  SetLastMergingMsg(obj);
3478  Incorporate(obj, fOutput, merged);
3479  NotifyMemory(obj);
3480 
3481  // We are done
3482  return (merged ? 1 : 0);
3483 }
3484 
3485 ////////////////////////////////////////////////////////////////////////////////
3486 /// Control output redirection to TProof::fLogFileW
3487 
3489 {
3490  if (on && fProof && fProof->fLogFileW) {
3493  } else if (!on) {
3494  if (fErrorHandler) {
3497  }
3498  }
3499 }
3500 
3501 ////////////////////////////////////////////////////////////////////////////////
3502 /// Incorporate the content of the received output list 'out' into the final
3503 /// output list fOutput. The latter is created if not existing.
3504 /// This method short cuts 'StoreOutput + MergeOutput' limiting the memory
3505 /// consumption.
3506 
3508 {
3509  PDB(kOutput,1) Info("AddOutput","Enter");
3510 
3511  // We must something to process
3512  if (!out) {
3513  PDB(kOutput,1) Info("AddOutput","Invalid input (out == 0x0)");
3514  return;
3515  }
3516 
3517  // Create the output list, if not yet done
3518  if (!fOutput)
3519  fOutput = new THashList;
3520 
3521  // Process event lists first
3522  Bool_t merged = kTRUE;
3523  TList *elists = dynamic_cast<TList *> (out->FindObject("PROOF_EventListsList"));
3524  if (elists) {
3525 
3526  // Create a global event list, result of merging the event lists
3527  // corresponding to the various data set elements
3528  TEventList *evlist = new TEventList("PROOF_EventList");
3529 
3530  // Iterate the list of event list segments
3531  TIter nxevl(elists);
3532  TEventList *evl = 0;
3533  while ((evl = dynamic_cast<TEventList *> (nxevl()))) {
3534 
3535  // Find the file offset (fDSet is the current TDSet instance)
3536  // locating the element by name
3537  TIter nxelem(fDSet->GetListOfElements());
3538  TDSetElement *elem = 0;
3539  while ((elem = dynamic_cast<TDSetElement *> (nxelem()))) {
3540  if (!strcmp(elem->GetFileName(), evl->GetName()))
3541  break;
3542  }
3543  if (!elem) {
3544  Error("AddOutput", "Found an event list for %s, but no object with"
3545  " the same name in the TDSet", evl->GetName());
3546  continue;
3547  }
3548  Long64_t offset = elem->GetTDSetOffset();
3549 
3550  // Shift the list by the number of first event in that file
3551  Long64_t *arr = evl->GetList();
3552  Int_t num = evl->GetN();
3553  if (arr && offset > 0)
3554  for (Int_t i = 0; i < num; i++)
3555  arr[i] += offset;
3556 
3557  // Add to the global event list
3558  evlist->Add(evl);
3559  }
3560 
3561  // Remove and delete the events lists object to avoid spoiling iteration
3562  // during next steps
3563  out->Remove(elists);
3564  delete elists;
3565 
3566  // Incorporate the resulting global list in fOutput
3567  SetLastMergingMsg(evlist);
3568  Incorporate(evlist, fOutput, merged);
3569  NotifyMemory(evlist);
3570  }
3571 
3572  // Iterate on the remaining objects in the received list
3573  TIter nxo(out);
3574  TObject *obj = 0;
3575  while ((obj = nxo())) {
3576  SetLastMergingMsg(obj);
3577  Incorporate(obj, fOutput, merged);
3578  // If not merged, drop from the temporary list, as the ownership
3579  // passes to fOutput
3580  if (!merged)
3581  out->Remove(obj);
3582  NotifyMemory(obj);
3583  }
3584 
3585  // Done
3586  return;
3587 }
3588 
3589 ////////////////////////////////////////////////////////////////////////////////
3590 /// Printout the memory record after merging object 'obj'
3591 /// This record is used by the memory monitor
3592 
3594 {
3595  if (fProof && (!IsClient() || fProof->IsLite())){
3596  ProcInfo_t pi;
3597  if (!gSystem->GetProcInfo(&pi)){
3598  // For PROOF-Lite we redirect this output to a the open log file so that the
3599  // memory monitor can pick these messages up
3601  Info("NotifyMemory|Svc", "Memory %ld virtual %ld resident after merging object %s",
3602  pi.fMemVirtual, pi.fMemResident, obj->GetName());
3603  RedirectOutput(0);
3604  }
3605  // Record also values for monitoring
3607  }
3608 }
3609 
3610 ////////////////////////////////////////////////////////////////////////////////
3611 /// Set the message to be notified in case of exception
3612 
3614 {
3615  TString lastMsg = TString::Format("while merging object '%s'", obj->GetName());
3616  TProofServ::SetLastMsg(lastMsg);
3617 }
3618 
3619 ////////////////////////////////////////////////////////////////////////////////
3620 /// Incorporate object 'newobj' in the list 'outlist'.
3621 /// The object is merged with an object of the same name already existing in
3622 /// the list, or just added.
3623 /// The boolean merged is set to kFALSE when the object is just added to 'outlist';
3624 /// this happens if the Merge() method does not exist or if a object named as 'obj'
3625 /// is not already in the list. If the obj is not 'merged' than it should not be
3626 /// deleted, unless outlist is not owner of its objects.
3627 /// Return 0 on success, -1 on error.
3628 
3630 {
3631  merged = kTRUE;
3632 
3633  PDB(kOutput,1)
3634  Info("Incorporate", "enter: obj: %p (%s), list: %p",
3635  newobj, newobj ? newobj->ClassName() : "undef", outlist);
3636 
3637  // The object and list must exist
3638  if (!newobj || !outlist) {
3639  Error("Incorporate","Invalid inputs: obj: %p, list: %p", newobj, outlist);
3640  return -1;
3641  }
3642 
3643  // Special treatment for histograms in autobin mode
3644  Bool_t specialH =
3645  (!fProof || !fProof->TestBit(TProof::kIsClient) || fProof->IsLite()) ? kTRUE : kFALSE;
3646  if (specialH && newobj->InheritsFrom(TH1::Class())) {
3647  if (!HandleHistogram(newobj, merged)) {
3648  if (merged) {
3649  PDB(kOutput,1) Info("Incorporate", "histogram object '%s' merged", newobj->GetName());
3650  } else {
3651  PDB(kOutput,1) Info("Incorporate", "histogram object '%s' added to the"
3652  " appropriate list for delayed merging", newobj->GetName());
3653  }
3654  return 0;
3655  }
3656  }
3657 
3658  // Check if an object with the same name exists already
3659  TObject *obj = outlist->FindObject(newobj->GetName());
3660 
3661  // If no, add the new object and return
3662  if (!obj) {
3663  outlist->Add(newobj);
3664  merged = kFALSE;
3665  // Done
3666  return 0;
3667  }
3668 
3669  // Locate the Merge(TCollection *) method
3670  TMethodCall callEnv;
3671  if (obj->IsA())
3672  callEnv.InitWithPrototype(obj->IsA(), "Merge", "TCollection*");
3673  if (callEnv.IsValid()) {
3674  // Found: put the object in a one-element list
3675  static TList *xlist = new TList;
3676  xlist->Add(newobj);
3677  // Call the method
3678  callEnv.SetParam((Long_t) xlist);
3679  callEnv.Execute(obj);
3680  // Ready for next call
3681  xlist->Clear();
3682  } else {
3683  // Not found: return individual objects
3684  outlist->Add(newobj);
3685  merged = kFALSE;
3686  }
3687 
3688  // Done
3689  return 0;
3690 }
3691 
3692 ////////////////////////////////////////////////////////////////////////////////
3693 /// Low statistic histograms need a special treatment when using autobin
3694 
3696 {
3697  TH1 *h = dynamic_cast<TH1 *>(obj);
3698  if (!h) {
3699  // Not an histo
3700  return obj;
3701  }
3702 
3703  // This is only used if we return (TObject *)0 and there is only one case
3704  // when we set this to kTRUE
3705  merged = kFALSE;
3706 
3707  // Does is still needs binning ?
3708  Bool_t tobebinned = (h->GetBuffer()) ? kTRUE : kFALSE;
3709 
3710  // Number of entries
3711  Int_t nent = h->GetBufferLength();
3712  PDB(kOutput,2) Info("HandleHistogram", "h:%s ent:%d, buffer size: %d",
3713  h->GetName(), nent, h->GetBufferSize());
3714 
3715  // Attach to the list in the outputlists, if any
3716  TList *list = 0;
3717  if (!fOutputLists) {
3718  PDB(kOutput,2) Info("HandleHistogram", "create fOutputLists");
3719  fOutputLists = new TList;
3721  }
3722  list = (TList *) fOutputLists->FindObject(h->GetName());
3723 
3724  TH1 *href = 0;
3725  if (tobebinned) {
3726 
3727  // The histogram needs to be projected in a reasonable range: we
3728  // do this at the end with all the histos, so we need to create
3729  // a list here
3730  if (!list) {
3731  // Create the list
3732  list = new TList;
3733  list->SetName(h->GetName());
3734  list->SetOwner();
3735  fOutputLists->Add(list);
3736  // Move in it any previously merged object from the output list
3737  if (fOutput && (href = (TH1 *) fOutput->FindObject(h->GetName()))) {
3738  fOutput->Remove(href);
3739  list->Add(href);
3740  }
3741  }
3742  TIter nxh(list);
3743  while ((href = (TH1 *) nxh())) {
3744  if (href->GetBuffer() && href->GetBufferLength() < nent) break;
3745  }
3746  if (href) {
3747  list->AddBefore(href, h);
3748  } else {
3749  list->Add(h);
3750  }
3751  // Done
3752  return (TObject *)0;
3753 
3754  } else {
3755 
3756  if (list) {
3757  TIter nxh(list);
3758  while ((href = (TH1 *) nxh())) {
3759  if (href->GetBuffer() || href->GetEntries() < nent) break;
3760  }
3761  if (href) {
3762  list->AddBefore(href, h);
3763  } else {
3764  list->Add(h);
3765  }
3766  // Done
3767  return (TObject *)0;
3768 
3769  } else {
3770  // Check if we can 'Add' the histogram to an existing one; this is more efficient
3771  // then using Merge
3772  TH1 *hout = (TH1*) fOutput->FindObject(h->GetName());
3773  if (hout) {
3774  // Remove the existing histo from the output list ...
3775  fOutput->Remove(hout);
3776  // ... and create either the list to merge in one-go at the end
3777  // (more efficient than merging one by one) or, if too big, merge
3778  // these two and start the 'one-by-one' technology
3779  Int_t hsz = h->GetNbinsX() * h->GetNbinsY() * h->GetNbinsZ();
3780  if (fMergeTH1OneByOne || (gProofServ && hsz > gProofServ->GetMsgSizeHWM())) {
3781  list = new TList;
3782  list->Add(hout);
3783  h->Merge(list);
3784  list->SetOwner();
3785  delete list;
3786  return h;
3787  } else {
3788  list = new TList;
3789  list->SetName(h->GetName());
3790  list->SetOwner();
3791  fOutputLists->Add(list);
3792  // Add the existing and the incoming histos
3793  list->Add(hout);
3794  list->Add(h);
3795  // Done
3796  return (TObject *)0;
3797  }
3798  } else {
3799  // This is the first one; add it to the output list
3800  fOutput->Add(h);
3801  return (TObject *)0;
3802  }
3803  }
3804  }
3805  PDB(kOutput,1) Info("HandleHistogram", "leaving");
3806 }
3807 
3808 ////////////////////////////////////////////////////////////////////////////////
3809 /// Return kTRUE is the histograms 'h0' and 'h1' have the same binning and ranges
3810 /// on the axis (i.e. if they can be just Add-ed for merging).
3811 
3813 {
3814  Bool_t rc = kFALSE;
3815  if (!h0 || !h1) return rc;
3816 
3817  TAxis *a0 = 0, *a1 = 0;
3818 
3819  // Check X
3820  a0 = h0->GetXaxis();
3821  a1 = h1->GetXaxis();
3822  if (a0->GetNbins() == a1->GetNbins())
3823  if (TMath::Abs(a0->GetXmax() - a1->GetXmax()) < 1.e-9)
3824  if (TMath::Abs(a0->GetXmin() - a1->GetXmin()) < 1.e-9) rc = kTRUE;
3825 
3826  // Check Y, if needed
3827  if (h0->GetDimension() > 1) {
3828  rc = kFALSE;
3829  a0 = h0->GetYaxis();
3830  a1 = h1->GetYaxis();
3831  if (a0->GetNbins() == a1->GetNbins())
3832  if (TMath::Abs(a0->GetXmax() - a1->GetXmax()) < 1.e-9)
3833  if (TMath::Abs(a0->GetXmin() - a1->GetXmin()) < 1.e-9) rc = kTRUE;
3834  }
3835 
3836  // Check Z, if needed
3837  if (h0->GetDimension() > 2) {
3838  rc = kFALSE;
3839  a0 = h0->GetZaxis();
3840  a1 = h1->GetZaxis();
3841  if (a0->GetNbins() == a1->GetNbins())
3842  if (TMath::Abs(a0->GetXmax() - a1->GetXmax()) < 1.e-9)
3843  if (TMath::Abs(a0->GetXmin() - a1->GetXmin()) < 1.e-9) rc = kTRUE;
3844  }
3845 
3846  // Done
3847  return rc;
3848 }
3849 
3850 ////////////////////////////////////////////////////////////////////////////////
3851 /// Store received output list.
3852 
3854 {
3855  PDB(kOutput,1) Info("StoreOutput","Enter");
3856 
3857  if ( out == 0 ) {
3858  PDB(kOutput,1) Info("StoreOutput","Leave (empty)");
3859  return;
3860  }
3861 
3862  TIter next(out);
3863  out->SetOwner(kFALSE); // take ownership of the contents
3864 
3865  if (fOutputLists == 0) {
3866  PDB(kOutput,2) Info("StoreOutput","Create fOutputLists");
3867  fOutputLists = new TList;
3869  }
3870  // process eventlists first
3871  TList* lists = dynamic_cast<TList*> (out->FindObject("PROOF_EventListsList"));
3872  if (lists) {
3873  out->Remove(lists);
3874  TEventList *mainList = new TEventList("PROOF_EventList");
3875  out->Add(mainList);
3876  TIter it(lists);
3877  TEventList *aList;
3878  while ( (aList = dynamic_cast<TEventList*> (it())) ) {
3879  // find file offset
3880  TIter nxe(fDSet->GetListOfElements());
3881  TDSetElement *elem;
3882  while ( (elem = dynamic_cast<TDSetElement*> (nxe())) ) {
3883  if (strcmp(elem->GetFileName(), aList->GetName()) == 0)
3884  break;
3885  }
3886  if (!elem) {
3887  Error("StoreOutput", "found the EventList for %s, but no object with that name "
3888  "in the TDSet", aList->GetName());
3889  continue;
3890  }
3891  Long64_t offset = elem->GetTDSetOffset();
3892 
3893  // shift the list by the number of first event in that file
3894  Long64_t *arr = aList->GetList();
3895  Int_t num = aList->GetN();
3896  if (arr && offset)
3897  for (int i = 0; i < num; i++)
3898  arr[i] += offset;
3899 
3900  mainList->Add(aList); // add to the main list
3901  }
3902  delete lists;
3903  }
3904 
3905  TObject *obj;
3906  while( (obj = next()) ) {
3907  PDB(kOutput,2) Info("StoreOutput","find list for '%s'", obj->GetName() );
3908 
3909  TList *list = (TList *) fOutputLists->FindObject( obj->GetName() );
3910  if ( list == 0 ) {
3911  PDB(kOutput,2) Info("StoreOutput", "list for '%s' not found (creating)", obj->GetName());
3912  list = new TList;
3913  list->SetName( obj->GetName() );
3914  list->SetOwner();
3915  fOutputLists->Add( list );
3916  }
3917  list->Add( obj );
3918  }
3919 
3920  delete out;
3921  PDB(kOutput,1) Info("StoreOutput", "leave");
3922 }
3923 
3924 ////////////////////////////////////////////////////////////////////////////////
3925 /// Merge feedback lists.
3926 
3928 {
3929  PDB(kFeedback,1)
3930  Info("MergeFeedback","Enter");
3931 
3932  if ( fFeedbackLists == 0 ) {
3933  PDB(kFeedback,1)
3934  Info("MergeFeedback","Leave (no output)");
3935  return 0;
3936  }
3937 
3938  TList *fb = new TList; // collection of feedback objects
3939  fb->SetOwner();
3940 
3942 
3943  TMap *map;
3944  while ( (map = (TMap*) next()) ) {
3945 
3946  PDB(kFeedback,2)
3947  Info("MergeFeedback", "map %s size: %d", map->GetName(), map->GetSize());
3948 
3949  // turn map into list ...
3950 
3951  TList *list = new TList;
3952  TIter keys(map);
3953 
3954 #ifndef R__TH1MERGEFIXED
3955  Int_t nbmx = -1;
3956  TObject *oref = 0;
3957 #endif
3958  while ( TObject *key = keys() ) {
3959  TObject *o = map->GetValue(key);
3960  TH1 *h = dynamic_cast<TH1 *>(o);
3961 #ifndef R__TH1MERGEFIXED
3962  // Temporary fix for to cope with the problem in TH1::Merge.
3963  // We need to use a reference histo the one with the largest number
3964  // of bins so that the histos from all submasters can be correctly
3965  // fit in
3966  if (h && !strncmp(o->GetName(),"PROOF_",6)) {
3967  if (h->GetNbinsX() > nbmx) {
3968  nbmx= h->GetNbinsX();
3969  oref = o;
3970  }
3971  }
3972 #endif
3973  if (h) {
3974  TIter nxh(list);
3975  TH1 *href= 0;
3976  while ((href = (TH1 *)nxh())) {
3977  if (h->GetBuffer()) {
3978  if (href->GetBuffer() && href->GetBufferLength() < h->GetBufferLength()) break;
3979  } else {
3980  if (href->GetBuffer() || href->GetEntries() < h->GetEntries()) break;
3981  }
3982  }
3983  if (href) {
3984  list->AddBefore(href, h);
3985  } else {
3986  list->Add(h);
3987  }
3988  } else {
3989  list->Add(o);
3990  }
3991  }
3992 
3993  // clone first object, remove from list
3994 #ifdef R__TH1MERGEFIXED
3995  TObject *obj = list->First();
3996 #else
3997  TObject *obj = (oref) ? oref : list->First();
3998 #endif
3999  list->Remove(obj);
4000  obj = obj->Clone();
4001  fb->Add(obj);
4002 
4003  if ( list->IsEmpty() ) {
4004  delete list;
4005  continue;
4006  }
4007 
4008  // merge list with clone
4009  TMethodCall callEnv;
4010  if (obj->IsA())
4011  callEnv.InitWithPrototype(obj->IsA(), "Merge", "TCollection*");
4012  if (callEnv.IsValid()) {
4013  callEnv.SetParam((Long_t) list);
4014  callEnv.Execute(obj);
4015  } else {
4016  // No Merge interface, return copy of individual objects
4017  while ( (obj = list->First()) ) {
4018  fb->Add(obj->Clone());
4019  list->Remove(obj);
4020  }
4021  }
4022 
4023  delete list;
4024  }
4025 
4026  PDB(kFeedback,1)
4027  Info("MergeFeedback","Leave (%d object(s))", fb->GetSize());
4028 
4029  return fb;
4030 }
4031 
4032 ////////////////////////////////////////////////////////////////////////////////
4033 /// Store feedback results from the specified slave.
4034 
4036 {
4037  PDB(kFeedback,1)
4038  Info("StoreFeedback","Enter");
4039 
4040  if ( out == 0 ) {
4041  PDB(kFeedback,1)
4042  Info("StoreFeedback","Leave (empty)");
4043  return;
4044  }
4045 
4046  if ( IsClient() ) {
4047  // in client
4048  Feedback(out);
4049  delete out;
4050  return;
4051  }
4052 
4053  if (fFeedbackLists == 0) {
4054  PDB(kFeedback,2) Info("StoreFeedback","Create fFeedbackLists");
4055  fFeedbackLists = new TList;
4057  }
4058 
4059  TIter next(out);
4060  out->SetOwner(kFALSE); // take ownership of the contents
4061 
4062  const char *ord = ((TSlave*) slave)->GetOrdinal();
4063 
4064  TObject *obj;
4065  while( (obj = next()) ) {
4066  PDB(kFeedback,2)
4067  Info("StoreFeedback","%s: Find '%s'", ord, obj->GetName() );
4068  TMap *map = (TMap*) fFeedbackLists->FindObject(obj->GetName());
4069  if ( map == 0 ) {
4070  PDB(kFeedback,2)
4071  Info("StoreFeedback", "%s: map for '%s' not found (creating)", ord, obj->GetName());
4072  // Map must not be owner (ownership is with regards to the keys (only))
4073  map = new TMap;
4074  map->SetName(obj->GetName());
4075  fFeedbackLists->Add(map);
4076  } else {
4077  PDB(kFeedback,2)
4078  Info("StoreFeedback","%s: removing previous value", ord);
4079  if (map->GetValue(slave))
4080  delete map->GetValue(slave);
4081  map->Remove(slave);
4082  }
4083  map->Add(slave, obj);
4084  PDB(kFeedback,2)
4085  Info("StoreFeedback","%s: %s, size: %d", ord, obj->GetName(), map->GetSize());
4086  }
4087 
4088  delete out;
4089  PDB(kFeedback,1)
4090  Info("StoreFeedback","Leave");
4091 }
4092 
4093 ////////////////////////////////////////////////////////////////////////////////
4094 /// Setup reporting of feedback objects.
4095 
4097 {
4098  if (IsClient()) return; // Client does not need timer
4099 
4100  fFeedback = (TList*) fInput->FindObject("FeedbackList");
4101 
4102  PDB(kFeedback,1) Info("SetupFeedback","\"FeedbackList\" %sfound",
4103  fFeedback == 0 ? "NOT ":"");
4104 
4105  if (fFeedback == 0 || fFeedback->GetSize() == 0) return;
4106 
4107  // OK, feedback was requested, setup the timer
4108  SafeDelete(fFeedbackTimer);
4109  fFeedbackPeriod = 2000;
4110  TProof::GetParameter(fInput, "PROOF_FeedbackPeriod", fFeedbackPeriod);
4111  fFeedbackTimer = new TTimer;
4112  fFeedbackTimer->SetObject(this);
4113  fFeedbackTimer->Start(fFeedbackPeriod, kTRUE);
4114 }
4115 
4116 ////////////////////////////////////////////////////////////////////////////////
4117 /// Stop reporting of feedback objects.
4118 
4120 {
4121  if (fFeedbackTimer == 0) return;
4122 
4123  PDB(kFeedback,1) Info("StopFeedback","Stop Timer");
4124 
4125  SafeDelete(fFeedbackTimer);
4126 }
4127 
4128 ////////////////////////////////////////////////////////////////////////////////
4129 /// Send feedback objects to client.
4130 
4132 {
4133  PDB(kFeedback,2) Info("HandleTimer","Entry");
4134 
4135  if (fFeedbackTimer == 0) return kFALSE; // timer already switched off
4136 
4137  // process local feedback objects
4138 
4139  TList *fb = new TList;
4140  fb->SetOwner();
4141 
4142  TIter next(fFeedback);
4143  while( TObjString *name = (TObjString*) next() ) {
4144  TObject *o = fOutput->FindObject(name->GetName());
4145  if (o != 0) {
4146  fb->Add(o->Clone());
4147  // remove the corresponding entry from the feedback list
4148  TMap *m = 0;
4149  if (fFeedbackLists &&
4150  (m = (TMap *) fFeedbackLists->FindObject(name->GetName()))) {
4151  fFeedbackLists->Remove(m);
4152  m->DeleteValues();
4153  delete m;
4154  }
4155  }
4156  }
4157 
4158  if (fb->GetSize() > 0) {
4159  StoreFeedback(this, fb); // adopts fb
4160  } else {
4161  delete fb;
4162  }
4163 
4164  if (fFeedbackLists == 0) {
4165  fFeedbackTimer->Start(fFeedbackPeriod, kTRUE); // maybe next time
4166  return kFALSE;
4167  }
4168 
4169  fb = MergeFeedback();
4170 
4171  PDB(kFeedback,2) Info("HandleTimer","Sending %d objects", fb->GetSize());
4172 
4174  m << fb;
4175 
4176  // send message to client;
4177  gProofServ->GetSocket()->Send(m);
4178 
4179  delete fb;
4180 
4181  fFeedbackTimer->Start(fFeedbackPeriod, kTRUE);
4182 
4183  return kFALSE; // ignored?
4184 }
4185 
4186 ////////////////////////////////////////////////////////////////////////////////
4187 /// Get next packet for specified slave.
4188 
4190 {
4191  // The first call to this determines the end of initialization
4192  SetInitTime();
4193 
4194  if (fProcPackets) {
4195  Int_t bin = fProcPackets->GetXaxis()->FindBin(slave->GetOrdinal());
4196  if (bin >= 0) {
4197  if (fProcPackets->GetBinContent(bin) > 0)
4198  fProcPackets->Fill(slave->GetOrdinal(), -1);
4199  }
4200  }
4201 
4202  TDSetElement *e = fPacketizer->GetNextPacket( slave, r );
4203 
4204  if (e == 0) {
4205  PDB(kPacketizer,2)
4206  Info("GetNextPacket","%s: done!", slave->GetOrdinal());
4207  } else if (e == (TDSetElement*) -1) {
4208  PDB(kPacketizer,2)
4209  Info("GetNextPacket","%s: waiting ...", slave->GetOrdinal());
4210  } else {
4211  PDB(kPacketizer,2)
4212  Info("GetNextPacket","%s (%s): '%s' '%s' '%s' %lld %lld",
4213  slave->GetOrdinal(), slave->GetName(), e->GetFileName(),
4214  e->GetDirectory(), e->GetObjName(), e->GetFirst(), e->GetNum());
4215  if (fProcPackets) fProcPackets->Fill(slave->GetOrdinal(), 1);
4216  }
4217 
4218  return e;
4219 }
4220 
4221 ////////////////////////////////////////////////////////////////////////////////
4222 /// Is the player running on the client?
4223 
4225 {
4227 }
4228 
4229 ////////////////////////////////////////////////////////////////////////////////
4230 /// Draw (support for TChain::Draw()).
4231 /// Returns -1 in case of error or number of selected events in case of success.
4232 
4234  const char *selection, Option_t *option,
4235  Long64_t nentries, Long64_t firstentry)
4236 {
4237  if (!fgDrawInputPars) {
4238  fgDrawInputPars = new THashList;
4239  fgDrawInputPars->Add(new TObjString("FeedbackList"));
4240  fgDrawInputPars->Add(new TObjString("PROOF_ChainWeight"));
4241  fgDrawInputPars->Add(new TObjString("PROOF_LineColor"));
4242  fgDrawInputPars->Add(new TObjString("PROOF_LineStyle"));
4243  fgDrawInputPars->Add(new TObjString("PROOF_LineWidth"));
4244  fgDrawInputPars->Add(new TObjString("PROOF_MarkerColor"));
4245  fgDrawInputPars->Add(new TObjString("PROOF_MarkerStyle"));
4246  fgDrawInputPars->Add(new TObjString("PROOF_MarkerSize"));
4247  fgDrawInputPars->Add(new TObjString("PROOF_FillColor"));
4248  fgDrawInputPars->Add(new TObjString("PROOF_FillStyle"));
4249  fgDrawInputPars->Add(new TObjString("PROOF_ListOfAliases"));
4250  }
4251 
4252  TString selector, objname;
4253  if (GetDrawArgs(varexp, selection, option, selector, objname) != 0) {
4254  Error("DrawSelect", "parsing arguments");
4255  return -1;
4256  }
4257 
4258  TNamed *varexpobj = new TNamed("varexp", varexp);
4259  TNamed *selectionobj = new TNamed("selection", selection);
4260 
4261  // Save the current input list
4262  TObject *o = 0;
4263  TList *savedInput = new TList;
4264  TIter nxi(fInput);
4265  while ((o = nxi())) {
4266  savedInput->Add(o);
4267  TString n(o->GetName());
4268  if (fgDrawInputPars &&
4269  !fgDrawInputPars->FindObject(o->GetName()) &&
4270  !n.BeginsWith("alias:")) fInput->Remove(o);
4271  }
4272 
4273  fInput->Add(varexpobj);
4274  fInput->Add(selectionobj);
4275 
4276  // Make sure we have an object name
4277  if (objname == "") objname = "htemp";
4278 
4279  fProof->AddFeedback(objname);
4280  Long64_t r = Process(set, selector, option, nentries, firstentry);
4281  fProof->RemoveFeedback(objname);
4282 
4283  fInput->Remove(varexpobj);
4284  fInput->Remove(selectionobj);
4285  if (TNamed *opt = dynamic_cast<TNamed*> (fInput->FindObject("PROOF_OPTIONS"))) {
4286  fInput->Remove(opt);
4287  delete opt;
4288  }
4289 
4290  delete varexpobj;
4291  delete selectionobj;
4292 
4293  // Restore the input list
4294  fInput->Clear();
4295  TIter nxsi(savedInput);
4296  while ((o = nxsi()))
4297  fInput->Add(o);
4298  savedInput->SetOwner(kFALSE);
4299  delete savedInput;
4300 
4301  return r;
4302 }
4303 
4304 ////////////////////////////////////////////////////////////////////////////////
4305 /// Set init time
4306 
4308 {
4309  if (fPacketizer)
4311 }
4312 
4313 //------------------------------------------------------------------------------
4314 
4315 
4317 
4318 ////////////////////////////////////////////////////////////////////////////////
4319 /// Setup feedback.
4320 
4321 void TProofPlayerSlave::SetupFeedback()
4322 {
4323  TList *fb = (TList*) fInput->FindObject("FeedbackList");
4324  if (fb) {
4325  PDB(kFeedback,1)
4326  Info("SetupFeedback","\"FeedbackList\" found: %d objects", fb->GetSize());
4327  } else {
4328  PDB(kFeedback,1)
4329  Info("SetupFeedback","\"FeedbackList\" NOT found");
4330  }
4331 
4332  if (fb == 0 || fb->GetSize() == 0) return;
4333 
4334  // OK, feedback was requested, setup the timer
4335 
4336  SafeDelete(fFeedbackTimer);
4337  fFeedbackPeriod = 2000;
4338  TProof::GetParameter(fInput, "PROOF_FeedbackPeriod", fFeedbackPeriod);
4339  fFeedbackTimer = new TTimer;
4340  fFeedbackTimer->SetObject(this);
4341  fFeedbackTimer->Start(fFeedbackPeriod, kTRUE);
4342 
4343  fFeedback = fb;
4344 }
4345 
4346 ////////////////////////////////////////////////////////////////////////////////
4347 /// Stop feedback.
4348 
4350 {
4351  if (fFeedbackTimer == 0) return;
4352 
4353  PDB(kFeedback,1) Info("StopFeedback","Stop Timer");
4354 
4355  SafeDelete(fFeedbackTimer);
4356 }
4357 
4358 ////////////////////////////////////////////////////////////////////////////////
4359 /// Handle timer event.
4360 
4362 {
4363  PDB(kFeedback,2) Info("HandleTimer","Entry");
4364 
4365  // If in sequential (0-slave-PROOF) mode we do not have a packetizer
4366  // so we also send the info to update the progress bar.
4367  if (gProofServ) {
4368  Bool_t sendm = kFALSE;
4370  if (gProofServ->IsMaster() && !gProofServ->IsParallel()) {
4371  sendm = kTRUE;
4372  if (gProofServ->GetProtocol() > 25) {
4373  m << GetProgressStatus();
4374  } else if (gProofServ->GetProtocol() > 11) {
4376  m << fTotalEvents << ps->GetEntries() << ps->GetBytesRead()
4377  << (Float_t) -1. << (Float_t) ps->GetProcTime()
4378  << (Float_t) ps->GetRate() << (Float_t) -1.;
4379  } else {
4380  m << fTotalEvents << GetEventsProcessed();
4381  }
4382  }
4383  if (sendm) gProofServ->GetSocket()->Send(m);
4384  }
4385 
4386  if (fFeedback == 0) return kFALSE;
4387 
4388  TList *fb = new TList;
4389  fb->SetOwner(kFALSE);
4390 
4391  if (fOutput == 0) {
4392  fOutput = (THashList *) fSelector->GetOutputList();
4393  }
4394 
4395  if (fOutput) {
4396  TIter next(fFeedback);
4397  while( TObjString *name = (TObjString*) next() ) {
4398  // TODO: find object in memory ... maybe allow only in fOutput ?
4399  TObject *o = fOutput->FindObject(name->GetName());
4400  if (o != 0) fb->Add(o);
4401  }
4402  }
4403 
4404  PDB(kFeedback,2) Info("HandleTimer","Sending %d objects", fb->GetSize());
4405 
4407  m << fb;
4408 
4409  // send message to client;
4410  gProofServ->GetSocket()->Send(m);
4411 
4412  delete fb;
4413 
4414  fFeedbackTimer->Start(fFeedbackPeriod, kTRUE);
4415 
4416  return kFALSE; // ignored?
4417 }
4418 
4419 ////////////////////////////////////////////////////////////////////////////////
4420 /// Handle tree header request.
4421 
4423 {
4425 
4426  TDSet *dset;
4427  (*mess) >> dset;
4428  dset->Reset();
4429  TDSetElement *e = dset->Next();
4430  Long64_t entries = 0;
4431  TFile *f = 0;
4432  TTree *t = 0;
4433  if (!e) {
4434  PDB(kGlobal, 1) Info("HandleGetTreeHeader", "empty TDSet");
4435  } else {
4436  f = TFile::Open(e->GetFileName());
4437  t = 0;
4438  if (f) {
4439  t = (TTree*) f->Get(e->GetObjName());
4440  if (t) {
4441  t->SetMaxVirtualSize(0);
4442  t->DropBaskets();
4443  entries = t->GetEntries();
4444 
4445  // compute #entries in all the files
4446  while ((e = dset->Next()) != 0) {
4447  TFile *f1 = TFile::Open(e->GetFileName());
4448  if (f1) {
4449  TTree *t1 = (TTree*) f1->Get(e->GetObjName());
4450  if (t1) {
4451  entries += t1->GetEntries();
4452  delete t1;
4453  }
4454  delete f1;
4455  }
4456  }
4457  t->SetMaxEntryLoop(entries); // this field will hold the total number of entries ;)
4458  }
4459  }
4460  }
4461  if (t)
4462  answ << TString("Success") << t;
4463  else
4464  answ << TString("Failed") << t;
4465 
4466  fSocket->Send(answ);
4467 
4468  SafeDelete(t);
4469  SafeDelete(f);
4470 }
4471 
4472 
4473 //------------------------------------------------------------------------------
4474 
4476 
4477 ////////////////////////////////////////////////////////////////////////////////
4478 /// Process specified TDSet on PROOF. Runs on super master.
4479 /// The return value is -1 in case of error and TSelector::GetStatus() in
4480 /// in case of success.
4481 
4482 Long64_t TProofPlayerSuperMaster::Process(TDSet *dset, const char *selector_file,
4483  Option_t *option, Long64_t nentries,
4484  Long64_t first)
4485 {
4486  fProgressStatus->Reset();
4487  PDB(kGlobal,1) Info("Process","Enter");
4488 
4489  TProofSuperMaster *proof = dynamic_cast<TProofSuperMaster*>(GetProof());
4490  if (!proof) return -1;
4491 
4492  delete fOutput;
4493  fOutput = new THashList;
4494 
4495  TPerfStats::Start(fInput, fOutput);
4496 
4497  if (!SendSelector(selector_file)) {
4498  Error("Process", "sending selector %s", selector_file);
4499  return -1;
4500  }
4501 
4502  TCleanup clean(this);
4503  SetupFeedback();
4504 
4505  if (proof->IsMaster()) {
4506 
4507  // make sure the DSet is valid
4508  if (!dset->ElementsValid()) {
4509  proof->ValidateDSet(dset);
4510  if (!dset->ElementsValid()) {
4511  Error("Process", "could not validate TDSet");
4512  return -1;
4513  }
4514  }
4515 
4516  TList msds;
4517  msds.SetOwner(); // This will delete TPairs
4518 
4519  TList keyholder; // List to clean up key part of the pairs
4520  keyholder.SetOwner();
4521  TList valueholder; // List to clean up value part of the pairs
4522  valueholder.SetOwner();
4523 
4524  // Construct msd list using the slaves
4525  TIter nextslave(proof->GetListOfActiveSlaves());
4526  while (TSlave *sl = dynamic_cast<TSlave*>(nextslave())) {
4527  TList *submasters = 0;
4528  TPair *msd = dynamic_cast<TPair*>(msds.FindObject(sl->GetMsd()));
4529  if (!msd) {
4530  submasters = new TList;
4531  submasters->SetName(sl->GetMsd());
4532  keyholder.Add(submasters);
4533  TList *setelements = new TSortedList(kSortDescending);
4534  setelements->SetName(TString(sl->GetMsd())+"_Elements");
4535  valueholder.Add(setelements);
4536  msds.Add(new TPair(submasters, setelements));
4537  } else {
4538  submasters = dynamic_cast<TList*>(msd->Key());
4539  }
4540  if (submasters) submasters->Add(sl);
4541  }
4542 
4543  // Add TDSetElements to msd list
4544  Long64_t cur = 0; //start of next element
4545  TIter nextelement(dset->GetListOfElements());
4546  while (TDSetElement *elem = dynamic_cast<TDSetElement*>(nextelement())) {
4547 
4548  if (elem->GetNum()<1) continue; // get rid of empty elements
4549 
4550  if (nentries !=-1 && cur>=first+nentries) {
4551  // we are done
4552  break;
4553  }
4554 
4555  if (cur+elem->GetNum()-1<first) {
4556  //element is before first requested entry
4557  cur+=elem->GetNum();
4558  continue;
4559  }
4560 
4561  if (cur<first) {
4562  //modify element to get proper start
4563  elem->SetNum(elem->GetNum()-(first-cur));
4564  elem->SetFirst(elem->GetFirst()+first-cur);
4565  cur=first;
4566  }
4567 
4568  if (nentries==-1 || cur+elem->GetNum()<=first+nentries) {
4569  cur+=elem->GetNum();
4570  } else {
4571  //modify element to get proper end
4572  elem->SetNum(first+nentries-cur);
4573  cur=first+nentries;
4574  }
4575 
4576  TPair *msd = dynamic_cast<TPair*>(msds.FindObject(elem->GetMsd()));
4577  if (!msd) {
4578  Error("Process", "data requires mass storage domain '%s'"
4579  " which is not accessible in this proof session",
4580  elem->GetMsd());
4581  return -1;
4582  } else {
4583  TList *elements = dynamic_cast<TList*>(msd->Value());
4584  if (elements) elements->Add(elem);
4585  }
4586  }
4587 
4588  TList usedmasters;
4589  TIter nextmsd(msds.MakeIterator());
4590  while (TPair *msd = dynamic_cast<TPair*>(nextmsd())) {
4591  TList *submasters = dynamic_cast<TList*>(msd->Key());
4592  TList *setelements = dynamic_cast<TList*>(msd->Value());
4593 
4594  // distribute elements over the masters
4595  Int_t nmasters = submasters ? submasters->GetSize() : -1;
4596  Int_t nelements = setelements ? setelements->GetSize() : -1;
4597  for (Int_t i=0; i<nmasters; i++) {
4598 
4599  Long64_t nent = 0;
4600  TDSet set(dset->GetType(), dset->GetObjName(),
4601  dset->GetDirectory());
4602  for (Int_t j = (i*nelements)/nmasters;
4603  j < ((i+1)*nelements)/nmasters;
4604  j++) {
4605  TDSetElement *elem = setelements ?
4606  dynamic_cast<TDSetElement*>(setelements->At(j)) : (TDSetElement *)0;
4607  if (elem) {
4608  set.Add(elem->GetFileName(), elem->GetObjName(),
4609  elem->GetDirectory(), elem->GetFirst(),
4610  elem->GetNum(), elem->GetMsd());
4611  nent += elem->GetNum();
4612  } else {
4613  Warning("Process", "not a TDSetElement object");
4614  }
4615  }
4616 
4617  if (set.GetListOfElements()->GetSize()>0) {
4618  TMessage mesg(kPROOF_PROCESS);
4619  TString fn(gSystem->BaseName(selector_file));
4620  TString opt = option;
4621  mesg << &set << fn << fInput << opt << Long64_t(-1) << Long64_t(0);
4622 
4623  TSlave *sl = dynamic_cast<TSlave*>(submasters->At(i));
4624  if (sl) {
4625  PDB(kGlobal,1) Info("Process",
4626  "Sending TDSet with %d elements to submaster %s",
4627  set.GetListOfElements()->GetSize(),
4628  sl->GetOrdinal());
4629  sl->GetSocket()->Send(mesg);
4630  usedmasters.Add(sl);
4631 
4632  // setup progress info
4633  fSlaves.AddLast(sl);
4634  fSlaveProgress.Set(fSlaveProgress.GetSize()+1);
4635  fSlaveProgress[fSlaveProgress.GetSize()-1] = 0;
4636  fSlaveTotals.Set(fSlaveTotals.GetSize()+1);
4637  fSlaveTotals[fSlaveTotals.GetSize()-1] = nent;
4638  fSlaveBytesRead.Set(fSlaveBytesRead.GetSize()+1);
4639  fSlaveBytesRead[fSlaveBytesRead.GetSize()-1] = 0;
4640  fSlaveInitTime.Set(fSlaveInitTime.GetSize()+1);
4641  fSlaveInitTime[fSlaveInitTime.GetSize()-1] = -1.;
4642  fSlaveProcTime.Set(fSlaveProcTime.GetSize()+1);
4643  fSlaveProcTime[fSlaveProcTime.GetSize()-1] = -1.;
4644  fSlaveEvtRti.Set(fSlaveEvtRti.GetSize()+1);
4645  fSlaveEvtRti[fSlaveEvtRti.GetSize()-1] = -1.;
4646  fSlaveMBRti.Set(fSlaveMBRti.GetSize()+1);
4647  fSlaveMBRti[fSlaveMBRti.GetSize()-1] = -1.;
4648  fSlaveActW.Set(fSlaveActW.GetSize()+1);
4649  fSlaveActW[fSlaveActW.GetSize()-1] = 0;
4650  fSlaveTotS.Set(fSlaveTotS.GetSize()+1);
4651  fSlaveTotS[fSlaveTotS.GetSize()-1] = 0;
4652  fSlaveEffS.Set(fSlaveEffS.GetSize()+1);
4653  fSlaveEffS[fSlaveEffS.GetSize()-1] = 0.;
4654  } else {
4655  Warning("Process", "not a TSlave object");
4656  }
4657  }
4658  }
4659  }
4660 
4661  if ( !IsClient() ) HandleTimer(0);
4662  PDB(kGlobal,1) Info("Process","Calling Collect");
4663  proof->Collect(&usedmasters);
4664  HandleTimer(0);
4665 
4666  }
4667 
4668  StopFeedback();
4669 
4670  PDB(kGlobal,1) Info("Process","Calling Merge Output");
4671  MergeOutput();
4672 
4673  TPerfStats::Stop();
4674 
4675  return 0;
4676 }
4677 
4678 ////////////////////////////////////////////////////////////////////////////////
4679 /// Report progress.
4680 
4682 {
4683  Int_t idx = fSlaves.IndexOf(sl);
4684  fSlaveProgress[idx] = processed;
4685  if (fSlaveTotals[idx] != total)
4686  Warning("Progress", "total events has changed for slave %s", sl->GetName());
4687  fSlaveTotals[idx] = total;
4688 
4689  Long64_t tot = 0;
4690  Int_t i;
4691  for (i = 0; i < fSlaveTotals.GetSize(); i++) tot += fSlaveTotals[i];
4692  Long64_t proc = 0;
4693  for (i = 0; i < fSlaveProgress.GetSize(); i++) proc += fSlaveProgress[i];
4694 
4695  Progress(tot, proc);
4696 }
4697 
4698 ////////////////////////////////////////////////////////////////////////////////
4699 /// Report progress.
4700 
4702  Long64_t processed, Long64_t bytesread,
4703  Float_t initTime, Float_t procTime,
4704  Float_t evtrti, Float_t mbrti)
4705 {
4706  PDB(kGlobal,2)
4707  Info("Progress","%s: %lld %lld %f %f %f %f", sl->GetName(),
4708  processed, bytesread, initTime, procTime, evtrti, mbrti);
4709 
4710  Int_t idx = fSlaves.IndexOf(sl);
4711  if (fSlaveTotals[idx] != total)
4712  Warning("Progress", "total events has changed for slave %s", sl->GetName());
4713  fSlaveTotals[idx] = total;
4714  fSlaveProgress[idx] = processed;
4715  fSlaveBytesRead[idx] = bytesread;
4716  fSlaveInitTime[idx] = (initTime > -1.) ? initTime : fSlaveInitTime[idx];
4717  fSlaveProcTime[idx] = (procTime > -1.) ? procTime : fSlaveProcTime[idx];
4718  fSlaveEvtRti[idx] = (evtrti > -1.) ? evtrti : fSlaveEvtRti[idx];
4719  fSlaveMBRti[idx] = (mbrti > -1.) ? mbrti : fSlaveMBRti[idx];
4720 
4721  Int_t i;
4722  Long64_t tot = 0;
4723  Long64_t proc = 0;
4724  Long64_t bytes = 0;
4725  Float_t init = -1.;
4726  Float_t ptime = -1.;
4727  Float_t erti = 0.;
4728  Float_t srti = 0.;
4729  Int_t nerti = 0;
4730  Int_t nsrti = 0;
4731  for (i = 0; i < fSlaveTotals.GetSize(); i++) {
4732  tot += fSlaveTotals[i];
4733  if (i < fSlaveProgress.GetSize())
4734  proc += fSlaveProgress[i];
4735  if (i < fSlaveBytesRead.GetSize())
4736  bytes += fSlaveBytesRead[i];
4737  if (i < fSlaveInitTime.GetSize())
4738  if (fSlaveInitTime[i] > -1. && (init < 0. || fSlaveInitTime[i] < init))
4739  init = fSlaveInitTime[i];
4740  if (i < fSlaveProcTime.GetSize())
4741  if (fSlaveProcTime[i] > -1. && (ptime < 0. || fSlaveProcTime[i] > ptime))
4742  ptime = fSlaveProcTime[i];
4743  if (i < fSlaveEvtRti.GetSize())
4744  if (fSlaveEvtRti[i] > -1.) {
4745  erti += fSlaveEvtRti[i];
4746  nerti++;
4747  }
4748  if (i < fSlaveMBRti.GetSize())
4749  if (fSlaveMBRti[i] > -1.) {
4750  srti += fSlaveMBRti[i];
4751  nsrti++;
4752  }
4753  }
4754  srti = (nsrti > 0) ? srti / nerti : 0.;
4755 
4756  Progress(tot, proc, bytes, init, ptime, erti, srti);
4757 }
4758 
4759 ////////////////////////////////////////////////////////////////////////////////
4760 /// Progress signal.
4761 
4763 {
4764  if (pi) {
4765  PDB(kGlobal,2)
4766  Info("Progress","%s: %lld %lld %lld %f %f %f %f %d %f", wrk->GetOrdinal(),
4767  pi->fTotal, pi->fProcessed, pi->fBytesRead,
4768  pi->fInitTime, pi->fProcTime, pi->fEvtRateI, pi->fMBRateI,
4769  pi->fActWorkers, pi->fEffSessions);
4770 
4771  Int_t idx = fSlaves.IndexOf(wrk);
4772  if (fSlaveTotals[idx] != pi->fTotal)
4773  Warning("Progress", "total events has changed for worker %s", wrk->GetName());
4774  fSlaveTotals[idx] = pi->fTotal;
4775  fSlaveProgress[idx] = pi->fProcessed;
4776  fSlaveBytesRead[idx] = pi->fBytesRead;
4777  fSlaveInitTime[idx] = (pi->fInitTime > -1.) ? pi->fInitTime : fSlaveInitTime[idx];
4778  fSlaveProcTime[idx] = (pi->fProcTime > -1.) ? pi->fProcTime : fSlaveProcTime[idx];
4779  fSlaveEvtRti[idx] = (pi->fEvtRateI > -1.) ? pi->fEvtRateI : fSlaveEvtRti[idx];
4780  fSlaveMBRti[idx] = (pi->fMBRateI > -1.) ? pi->fMBRateI : fSlaveMBRti[idx];
4781  fSlaveActW[idx] = (pi->fActWorkers > -1) ? pi->fActWorkers : fSlaveActW[idx];
4782  fSlaveTotS[idx] = (pi->fTotSessions > -1) ? pi->fTotSessions : fSlaveTotS[idx];
4783  fSlaveEffS[idx] = (pi->fEffSessions > -1.) ? pi->fEffSessions : fSlaveEffS[idx];
4784 
4785  Int_t i;
4786  Int_t nerti = 0;
4787  Int_t nsrti = 0;
4788  TProofProgressInfo pisum(0, 0, 0, -1., -1., 0., 0., 0, 0, 0.);
4789  for (i = 0; i < fSlaveTotals.GetSize(); i++) {
4790  pisum.fTotal += fSlaveTotals[i];
4791  if (i < fSlaveProgress.GetSize())
4792  pisum.fProcessed += fSlaveProgress[i];
4793  if (i < fSlaveBytesRead.GetSize())
4794  pisum.fBytesRead += fSlaveBytesRead[i];
4795  if (i < fSlaveInitTime.GetSize())
4796  if (fSlaveInitTime[i] > -1. && (pisum.fInitTime < 0. || fSlaveInitTime[i] < pisum.fInitTime))
4797  pisum.fInitTime = fSlaveInitTime[i];
4798  if (i < fSlaveProcTime.GetSize())
4799  if (fSlaveProcTime[i] > -1. && (pisum.fProcTime < 0. || fSlaveProcTime[i] > pisum.fProcTime))
4800  pisum.fProcTime = fSlaveProcTime[i];
4801  if (i < fSlaveEvtRti.GetSize())
4802  if (fSlaveEvtRti[i] > -1.) {
4803  pisum.fEvtRateI += fSlaveEvtRti[i];
4804  nerti++;
4805  }
4806  if (i < fSlaveMBRti.GetSize())
4807  if (fSlaveMBRti[i] > -1.) {
4808  pisum.fMBRateI += fSlaveMBRti[i];
4809  nsrti++;
4810  }
4811  if (i < fSlaveActW.GetSize())
4812  pisum.fActWorkers += fSlaveActW[i];
4813  if (i < fSlaveTotS.GetSize())
4814  if (fSlaveTotS[i] > -1 && (pisum.fTotSessions < 0. || fSlaveTotS[i] > pisum.fTotSessions))
4815  pisum.fTotSessions = fSlaveTotS[i];
4816  if (i < fSlaveEffS.GetSize())
4817  if (fSlaveEffS[i] > -1. && (pisum.fEffSessions < 0. || fSlaveEffS[i] > pisum.fEffSessions))
4818  pisum.fEffSessions = fSlaveEffS[i];
4819  }
4820  pisum.fMBRateI = (nsrti > 0) ? pisum.fMBRateI / nerti : 0.;
4821 
4822  Progress(&pisum);
4823  }
4824 }
4825 
4826 ////////////////////////////////////////////////////////////////////////////////
4827 /// Send progress and feedback to client.
4828 
4830 {
4831  if (fFeedbackTimer == 0) return kFALSE; // timer stopped already
4832 
4833  Int_t i;
4834  Long64_t tot = 0;
4835  Long64_t proc = 0;
4836  Long64_t bytes = 0;
4837  Float_t init = -1.;
4838  Float_t ptime = -1.;
4839  Float_t erti = 0.;
4840  Float_t srti = 0.;
4841  Int_t nerti = 0;
4842  Int_t nsrti = 0;
4843  for (i = 0; i < fSlaveTotals.GetSize(); i++) {
4844  tot += fSlaveTotals[i];
4845  if (i < fSlaveProgress.GetSize())
4846  proc += fSlaveProgress[i];
4847  if (i < fSlaveBytesRead.GetSize())
4848  bytes += fSlaveBytesRead[i];
4849  if (i < fSlaveInitTime.GetSize())
4850  if (fSlaveInitTime[i] > -1. && (init < 0. || fSlaveInitTime[i] < init))
4851  init = fSlaveInitTime[i];
4852  if (i < fSlaveProcTime.GetSize())
4853  if (fSlaveProcTime[i] > -1. && (ptime < 0. || fSlaveProcTime[i] > ptime))
4854  ptime = fSlaveProcTime[i];
4855  if (i < fSlaveEvtRti.GetSize())
4856  if (fSlaveEvtRti[i] > -1.) {
4857  erti += fSlaveEvtRti[i];
4858  nerti++;
4859  }
4860  if (i < fSlaveMBRti.GetSize())
4861  if (fSlaveMBRti[i] > -1.) {
4862  srti += fSlaveMBRti[i];
4863  nsrti++;
4864  }
4865  }
4866  erti = (nerti > 0) ? erti / nerti : 0.;
4867  srti = (nsrti > 0) ? srti / nerti : 0.;
4868 
4870  if (gProofServ->GetProtocol() > 25) {
4871  // Fill the message now
4872  TProofProgressInfo pi(tot, proc, bytes, init, ptime,
4873  erti, srti, -1,
4875  m << &pi;
4876  } else {
4877 
4878  m << tot << proc << bytes << init << ptime << erti << srti;
4879  }
4880 
4881  // send message to client;
4882  gProofServ->GetSocket()->Send(m);
4883 
4884  if (fReturnFeedback)
4886  else
4887  return kFALSE;
4888 }
4889 
4890 ////////////////////////////////////////////////////////////////////////////////
4891 /// Setup reporting of feedback objects and progress messages.
4892 
4894 {
4895  if (IsClient()) return; // Client does not need timer
4896 
4898 
4899  if (fFeedbackTimer) {
4901  return;
4902  } else {
4904  }
4905 
4906  // setup the timer for progress message
4907  SafeDelete(fFeedbackTimer);
4908  fFeedbackPeriod = 2000;
4909  TProof::GetParameter(fInput, "PROOF_FeedbackPeriod", fFeedbackPeriod);
4910  fFeedbackTimer = new TTimer;
4911  fFeedbackTimer->SetObject(this);
4912  fFeedbackTimer->Start(fFeedbackPeriod, kTRUE);
4913 }
const char * GetHost() const
Definition: TUrl.h:76
const char * GetName() const
Returns name of object.
Definition: TObjString.h:42
Int_t fTotSessions
Definition: TProof.h:197
const char * GetSessionDir() const
Definition: TProofServ.h:260
Bool_t IsRetrieve() const
TList * GetOutputList()
Definition: TQueryResult.h:139
virtual const char * BaseName(const char *pathname)
Base name of a file name. Base name of /user/root is root.
Definition: TSystem.cxx:912
virtual Int_t GetEntries() const
Definition: TCollection.h:92
Long64_t Process(const char *selector, Long64_t nentries=-1, Option_t *option="")
Process the specified TSelector file 'nentries' times.
virtual Int_t Write(const char *name=0, Int_t option=0, Int_t bufsize=0)
Write this object to the current directory.
Definition: TObject.cxx:823
virtual const char * GetTitle() const
Returns title of object.
Definition: TNamed.h:52
void AddFeedback(const char *name)
Add object to feedback list.
Definition: TProof.cxx:10567
virtual Bool_t AccessPathName(const char *path, EAccessMode mode=kFileExists)
Returns FALSE if one can access a file using the specified access mode.
Definition: TSystem.cxx:1213
Long64_t fTotal
Definition: TProof.h:189
TNtuple * GetProgressPerf(Bool_t steal=kFALSE)
static FILE * SetErrorHandlerFile(FILE *ferr)
Set the file stream where to log (default stderr).
Ssiz_t Last(char c) const
Find last occurrence of a character c.
Definition: TString.cxx:851
void Progress(Long64_t total, Long64_t processed)
Get query progress information.
Definition: TProof.cxx:9779
ErrorHandlerFunc_t SetErrorHandler(ErrorHandlerFunc_t newhandler)
Set an errorhandler function. Returns the old handler.
Definition: TError.cxx:106
virtual Int_t Fill(Double_t x)
Increment bin with abscissa X by 1.
Definition: TH1.cxx:3159
const char * GetOutputFileName() const
virtual TString SplitAclicMode(const char *filename, TString &mode, TString &args, TString &io) const
This method split a filename of the form: ~~~ {.cpp} [path/]macro.C[+|++[k|f|g|O|c|s|d|v|-]][(args)]...
Definition: TSystem.cxx:3994
void SetMerging(Bool_t on=kTRUE)
Switch on/off merge timer.
An array of TObjects.
Definition: TObjArray.h:39
const char * GetOrdinal() const
Definition: TSlave.h:135
TProofProgressStatus * fProgressStatus
Definition: TProofPlayer.h:95
TFileInfo * GetFileInfo(const char *type="TTree")
Return the content of this element in the form of a TFileInfo.
Definition: TDSet.cxx:212
float xmin
Definition: THbookFile.cxx:93
Bool_t IsDraw() const
Definition: TQueryResult.h:152
virtual void Delete(Option_t *option="")
Remove all objects from the list AND delete all heap based objects.
Definition: TList.cxx:405
UInt_t Convert(Bool_t toGMT=kFALSE) const
Convert fDatime from TDatime format to the standard time_t format.
Definition: TDatime.cxx:179
void ValidateDSet(TDSet *dset)
Validate a TDSet.
Double_t RealTime()
Stop the stopwatch (if it is running) and return the realtime (in seconds) passed between the start a...
Definition: TStopwatch.cxx:108
Internal class steering processing in PROOF.
Definition: TProofPlayer.h:78
void SetWorkerOrdinal(const char *ordinal)
virtual Double_t GetBinContent(Int_t bin) const
Return content of bin number bin.
Definition: TH1.cxx:4629
long long Long64_t
Definition: RtypesCore.h:69
EExitStatus fExitStatus
status of query in progress
Definition: TProofPlayer.h:93
void Activate(TList *slaves=0)
Activate slave server list.
Definition: TProof.cxx:2396
TSocket * GetSocket() const
Definition: TSlave.h:138
virtual TDSetElement * Next(Long64_t totalEntries=-1)
Returns next TDSetElement.
Definition: TDSet.cxx:394
Int_t fPhysRam
Definition: TSystem.h:169
Int_t AssertSelector(const char *selector_file)
Make sure that a valid selector object Return -1 in case of problems, 0 otherwise.
void FinalizationDone()
Definition: TProof.h:732
R__EXTERN Int_t gErrorIgnoreLevel
Definition: TError.h:107
void StoreOutput(TList *out)
Store output list (may not be used in this class).
virtual const char * WorkingDirectory()
Return working directory.
Definition: TSystem.cxx:865
virtual const char * GetName() const
Return name of this collection.
Int_t GetLearnEntries()
Return the number of entries in the learning phase.
virtual Bool_t InheritsFrom(const char *classname) const
Returns kTRUE if object inherits from class "classname".
Definition: TObject.cxx:487
static TMD5 * FileChecksum(const char *file)
Returns checksum of specified file.
Definition: TMD5.cxx:473
Long64_t GetEventsProcessed() const
Definition: TProofPlayer.h:224
Bool_t IsFinalized() const
Definition: TQueryResult.h:153
virtual Bool_t SendProcessingProgress(Double_t, Double_t, Bool_t=kFALSE)
void SetMemValues(Long_t vmem=-1, Long_t rmem=-1, Bool_t master=kFALSE)
Set max memory values.
Definition: TStatus.cxx:159
ClassImp(TSeqCollection) Int_t TSeqCollection TIter next(this)
Return index of object in collection.
Ssiz_t Length() const
Definition: TString.h:390
const double pi
const char Int_t const char TProof Int_t const char const char * msd
Definition: TXSlave.cxx:46
void Print(Option_t *option="") const
Dump the class content.
Collectable string class.
Definition: TObjString.h:32
#define kPEX_ABORTED
float Float_t
Definition: RtypesCore.h:53
Long64_t fBytesRead
Definition: TProof.h:191
virtual void SetDirectory(TDirectory *dir)
By default when an histogram is created, it is added to the list of histogram objects in the current ...
Definition: TH1.cxx:8266
void SetDir(const char *dir, Bool_t raw=kFALSE)
TProofLockPath * GetCacheLock()
Definition: TProofServ.h:294
return c
const char Option_t
Definition: RtypesCore.h:62
tuple offset
Definition: tree.py:93
float ymin
Definition: THbookFile.cxx:93
const char * GetObjName() const
Definition: TDSet.h:229
TObject * FindObject(const char *name) const
Find object using its name.
Definition: THashList.cxx:213
void SetValue(TObject *val)
Definition: TMap.h:126
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
Definition: TSocket.cxx:520
This class represents a WWW compatible URL.
Definition: TUrl.h:41
TString & ReplaceAll(const TString &s1, const TString &s2)
Definition: TString.h:635
const char * GetDataDir() const
Definition: TProofServ.h:263
void SetWriteV3(Bool_t on=kTRUE)
Set/Reset the 'OldStreamer' bit in this instance and its elements.
Definition: TDSet.cxx:1853
virtual Int_t GetDimension() const
Definition: TH1.h:283
static void FilterLocalroot(TString &path, const char *url="root://dum/")
If 'path' is local and 'dsrv' is Xrootd, apply 'path.Localroot' settings, if any. ...
TMacro * GetSelecHdr() const
Definition: TQueryResult.h:135
TQueryResult * GetQueryResult(const char *ref)
Get query result instances referenced 'ref' from the list of results.
virtual Bool_t JoinProcess(TList *workers)
Not implemented: meaningful only in the remote player. Returns kFALSE.
Bool_t CheckMemUsage(Long64_t &mfreq, Bool_t &w80r, Bool_t &w80v, TString &wmsg)
Check the memory usage, if requested.
This class implements a data set to be used for PROOF processing.
Definition: TDSet.h:153
virtual Long64_t Finalize(Bool_t force=kFALSE, Bool_t sync=kFALSE)
Finalize query (may not be used in this class).
Bool_t fSaveResultsPerPacket
Definition: TProofPlayer.h:119
virtual void SetName(const char *name)
Change (i.e.
Definition: TNamed.cxx:128
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
void StopProcess(Bool_t abort, Int_t timeout=-1)
Stop process after this event.
void MayNotUse(const char *method) const
Use this method to signal that a method (defined in a base class) may not be called in a derived clas...
Definition: TObject.cxx:971
const char * GetProtocol() const
Definition: TUrl.h:73
TH1 * h
Definition: legend2.C:5
void SetParameter(const char *par, const char *value)
Set input list parameter.
Definition: TProof.cxx:10400
void Add(const char *mesg)
Add an error message.
Definition: TStatus.cxx:46
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition: TObject.cxx:892
Long64_t GetBytesRead() const
virtual Bool_t Merge(Bool_t=kTRUE)
Merge the files.
void AddOutput(TList *out)
Incorporate output list (may not be used in this class).
A ROOT file is a suite of consecutive data records (TKey instances) with a well defined format...
Definition: TFile.h:45
TString fSelectorFileName
Definition: TProofPlayer.h:311
void SetupFeedback()
Setup reporting of feedback objects.
ERunStatus GetRunStatus() const
Definition: TProof.h:979
static Long_t GetVirtMemMax()
VirtMemMax getter.
const char * GetOptions() const
Definition: TQueryResult.h:127
virtual int MakeDirectory(const char *name)
Make a directory.
Definition: TSystem.cxx:821
virtual void MergeOutput(Bool_t savememvalues=kFALSE)
Merge objects in output the lists.
void SetLastUpdate(Double_t updtTime=0)
Update time stamp either with the passed value (if > 0) or with the current time. ...
virtual TObject * Get(const char *namecycle)
Return pointer to object identified by namecycle.
virtual Bool_t ChangeDirectory(const char *path)
Change directory.
Definition: TSystem.cxx:856
Bool_t Notify()
Definition: TTimer.cxx:65
void RemoveQueryResult(const char *ref)
Remove all query result instances referenced 'ref' from the list of results.
Bool_t HandleTimer(TTimer *timer)
Send progress and feedback to client.
#define gROOT
Definition: TROOT.h:344
TString fLogFileName
Definition: TProof.h:542
Int_t AdoptFile(TFile *f)
Adopt a file already open.
TDSetElement * GetNextPacket(TSlave *slave, TMessage *r)
Get next packet for specified slave.
Float_t fEvtRateI
Definition: TProof.h:194
const char * GetFileName() const
UInt_t GetTypeOpt() const
virtual int Load(const char *module, const char *entry="", Bool_t system=kFALSE)
Load a shared library.
Definition: TSystem.cxx:1766
Implement Tree drawing using PROOF.
Definition: TProofDraw.h:57
virtual const char * TempDirectory() const
Return a user configured or systemwide directory to create temporary files in.
Definition: TSystem.cxx:1395
Basic string class.
Definition: TString.h:137
void SetReadCalls(Long64_t readCalls)
TAlienJobStatus * status
Definition: TAlienJob.cxx:51
int Int_t
Definition: RtypesCore.h:41
virtual const char * DirName(const char *pathname)
Return the directory name in pathname.
Definition: TSystem.cxx:980
virtual TDirectory * mkdir(const char *name, const char *title="")
Create a sub-directory and return a pointer to the created directory.
Definition: TDirectory.cxx:955
bool Bool_t
Definition: RtypesCore.h:59
void RemoveFeedback(const char *name)
Remove object from feedback list.
Definition: TProof.cxx:10578
const Bool_t kFALSE
Definition: Rtypes.h:92
#define ENDTRY
Definition: TException.h:73
virtual TObject * FindObject(const char *name) const
Find an object in this list using its name.
Definition: TList.cxx:497
Int_t Broadcast(const TMessage &mess, TList *slaves)
Broadcast a message to all slaves in the specified list.
Definition: TProof.cxx:2482
virtual char * Which(const char *search, const char *file, EAccessMode mode=kFileExists)
Find location of file in a search path.
Definition: TSystem.cxx:1459
Bool_t HistoSameAxis(TH1 *h0, TH1 *h1)
Return kTRUE is the histograms 'h0' and 'h1' have the same binning and ranges on the axis (i...
Bool_t IsValid() const
virtual Int_t GetNbinsX() const
Definition: TH1.h:296
void SetOption(Option_t *option)
Definition: TDrawFeedback.h:56
TMacro * GetSelecImp() const
Definition: TQueryResult.h:136
const char * GetTopSessionTag() const
Definition: TProofServ.h:259
virtual Double_t GetEntries() const
return the current number of entries
Definition: TH1.cxx:4051
Bool_t IsSync() const
Definition: TProof.h:707
virtual void SetInitTime()
Set the initialization time.
TLatex * t1
Definition: textangle.C:20
TQueryResult * fQuery
Definition: TProofPlayer.h:102
Bool_t BeginsWith(const char *s, ECaseCompare cmp=kExact) const
Definition: TString.h:558
void SetDispatchTimer(Bool_t on=kTRUE)
Enable/disable the timer to dispatch pening events while processing.
Implementation of TProof controlling PROOF federated clusters.
TString & Insert(Ssiz_t pos, const char *s)
Definition: TString.h:592
Bool_t GetValid() const
Definition: TDSet.h:121
Short_t Abs(Short_t d)
Definition: TMathBase.h:110
virtual TObject * At(Int_t idx) const
Returns the object at position idx. Returns 0 if idx is out of range.
Definition: TList.cxx:311
Int_t fActWorkers
Definition: TProof.h:196
static Float_t GetMemHWM()
MemHWM getter.
virtual int GetProcInfo(ProcInfo_t *info) const
Returns cpu and memory used by this process into the ProcInfo_t structure.
Definition: TSystem.cxx:2391
TFile * f
const Int_t kBreak
Definition: TError.h:42
void MapOutputListToDataMembers() const
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
Definition: TObject.cxx:732
static TOutputListSelectorDataMap * FindInList(TCollection *coll)
Find a TOutputListSelectorDataMap in a collection.
Int_t SavePartialResults(Bool_t queryend=kFALSE, Bool_t force=kFALSE)
Save the partial results of this query to a dedicated file under the user data directory.
TList * fInput
Definition: TProofPlayer.h:84
TString & Replace(Ssiz_t pos, Ssiz_t n, const char *s)
Definition: TString.h:625
static const char * GetMacroPath()
Get macro search path. Static utility function.
Definition: TROOT.cxx:2446
Double_t GetProcTime() const
virtual void SetValue(const char *name, const char *value, EEnvLevel level=kEnvChange, const char *type=0)
Set the value of a resource or create a new resource.
Definition: TEnv.cxx:749
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=1, Int_t netopt=0)
Create / open a file.
Definition: TFile.cxx:3851
const char * GetObjName() const
Definition: TDSet.h:122
TMessage * fProcessMessage
Histogram with packets being processed (owned by TPerfStats)
Definition: TProofPlayer.h:310
This class defines a UUID (Universally Unique IDentifier), also known as GUIDs (Globally Unique IDent...
Definition: TUUID.h:44
virtual Int_t GetN() const
Definition: TEventList.h:58
virtual void StopProcess(Bool_t abort, Bool_t stoptimer=kFALSE)
Stop process.
Long64_t GetNum() const
Definition: TDSet.h:116
const char * Data() const
Definition: TString.h:349
void SetStopTimer(Bool_t on=kTRUE, Bool_t abort=kFALSE, Int_t timeout=0)
Enable/disable the timer to stop/abort processing.
Manages an element of a TDSet.
Definition: TDSet.h:68
virtual TObject * ReadObject(const TClass *cl)
Read object from I/O buffer.
static void SetLastEntry(Long64_t lastentry)
Set the last entry before exception.
const char * GetDirectory() const
Definition: TDSet.h:230
static struct mg_connection * fc(struct mg_context *ctx)
Definition: civetweb.c:839
#define SafeDelete(p)
Definition: RConfig.h:436
virtual int Unlink(const char *name)
Unlink, i.e. remove, a file.
Definition: TSystem.cxx:1294
virtual TObject * Clone(const char *newname="") const
Make a clone of an object using the Streamer facility.
Definition: TObject.cxx:203
Double_t dot(const TVector2 &v1, const TVector2 &v2)
Definition: CsgOps.cxx:333
void Stop()
Stop the stopwatch.
Definition: TStopwatch.cxx:75
virtual void SetAutoFlush(Long64_t autof=-30000000)
This function may be called at the start of a program to change the default value for fAutoFlush...
Definition: TTree.cxx:7382
static TString Format(const char *fmt,...)
Static method which formats a string using a printf style format descriptor and return a TString...
Definition: TString.cxx:2321
#define PDB(mask, level)
Definition: TProofDebug.h:58
Long64_t fReadBytesRun
Definition: TProofPlayer.h:97
THashList implements a hybrid collection class consisting of a hash table and a list to store TObject...
Definition: THashList.h:36
TEventIter * fEvIter
period (ms) for sending intermediate results
Definition: TProofPlayer.h:91
const char * ord
Definition: TXSlave.cxx:46
Float_t GetEffSessions() const
Definition: TProofServ.h:277
This code implements the MD5 message-digest algorithm.
Definition: TMD5.h:46
const char * GetMsd() const
Definition: TDSet.h:119
This class holds the status of an ongoing operation and collects error messages.
Definition: TStatus.h:39
The TNamed class is the base class for all named ROOT classes.
Definition: TNamed.h:33
void Class()
Definition: Class.C:29
EQueryMode GetQueryMode(Option_t *mode=0) const
Find out the query mode based on the current setting and 'mode'.
Definition: TProof.cxx:6117
virtual Bool_t IsEmpty() const
Definition: TCollection.h:99
void IncEntries(Long64_t entries=1)
Long64_t GetBytesRead() const
static Long64_t GetFileBytesRead()
Static function returning the total number of bytes read from all files.
Definition: TFile.cxx:4319
void StopFeedback()
Stop reporting of feedback objects.
Int_t GetQuerySeqNum() const
Definition: TProofServ.h:273
static EFileType GetType(const char *name, Option_t *option="", TString *prefix=0)
Resolve the file type as a function of the protocol field in 'name'.
Definition: TFile.cxx:4566
int d
Definition: tornado.py:11
void AddQueryResult(TQueryResult *q)
Add query result to the list, making sure that there are no duplicates.
virtual TObject * FindObject(const char *name) const
search object named name in the list of functions
Definition: TH1.cxx:3578
R__EXTERN TVirtualMonitoringWriter * gMonitoringWriter
virtual Bool_t HandleTimer(TTimer *timer)
Execute action in response of a timer timing out.
Definition: TObject.cxx:469
virtual const char * Getenv(const char *env)
Get environment variable.
Definition: TSystem.cxx:1575
void DeleteDrawFeedback(TDrawFeedback *f)
Delete draw feedback object.
Int_t Collect(const TSlave *sl, Long_t timeout=-1, Int_t endtype=-1, Bool_t deactonfail=kFALSE)
Collect responses from slave sl.
Definition: TProof.cxx:2676
TList * GetListOfElements() const
Definition: TDSet.h:231
if(pyself &&pyself!=Py_None)
void SetDrawFeedbackOption(TDrawFeedback *f, Option_t *opt)
Set draw feedback option.
void ResetMergePrg()
Reset the merge progress notificator.
Definition: TProof.cxx:2472
A sorted doubly linked list.
Definition: TSortedList.h:30
void Info(const char *location, const char *msgfmt,...)
TList * MergeFeedback()
Merge feedback lists.
Long64_t fProcessedRun
Read calls in this run.
Definition: TProofPlayer.h:99
virtual Long64_t Process(TDSet *set, const char *selector, Option_t *option="", Long64_t nentries=-1, Long64_t firstentry=0)
Process specified TDSet on PROOF.
TProofProgressStatus * GetProgressStatus() const
Definition: TProofPlayer.h:242
TObject * GetValue(const char *keyname) const
Returns a pointer to the value associated with keyname as name of the key.
Definition: TMap.cxx:235
tuple np
Definition: multifit.py:30
void Validate()
Validate the TDSet by opening files.
Definition: TDSet.cxx:1568
Int_t Atoi() const
Return integer value of string.
Definition: TString.cxx:1951
Bool_t IsParallel() const
Definition: TProof.h:975
void SetOutputFileName(const char *name)
Set the name of the output file; in the form of an Url.
static void GetMemValues(Long_t &vmax, Long_t &rmax)
Get memory usage.
Definition: TPerfStats.cxx:790
virtual void SetupFeedback()
Set up feedback (may not be used in this class).
Int_t fNotIdle
Definition: TProof.h:536
Long64_t GetMsgSizeHWM() const
Definition: TProofServ.h:287
static Long_t GetResMemMax()
ResMemMax getter.
void SetBytesRead(Long64_t bytesRead)
TList * GetOutputList() const
Get output list.
Method or function calling interface.
Definition: TMethodCall.h:41
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition: TObject.cxx:918
void SetCurrentQuery(TQueryResult *q)
Set current query and save previous value.
TH1F * h1
Definition: legend1.C:5
const Bool_t kSortDescending
Definition: TList.h:41
A container class for query results.
Definition: TQueryResult.h:44
Double_t GetXmin() const
Definition: TAxis.h:137
TDSetElement * Current() const
Definition: TDSet.h:238
#define kPEX_STOPPED
virtual Bool_t OutputFile(const char *url, Bool_t force)
Open merger output file.
void Error(const char *location, const char *msgfmt,...)
THashList * fOutput
Definition: TProofPlayer.h:85
static void SetLimitsFinder(THLimitsFinder *finder)
This static function can be used to specify a finder derived from THLimitsFinder. ...
char * out
Definition: TBase64.cxx:29
virtual Int_t SendAsynMessage(const char *msg, Bool_t lf=kTRUE)
Send an asychronous message to the master / client .
static void Start(TList *input, TList *output)
Initialize PROOF statistics run.
Definition: TPerfStats.cxx:744
void SetInitTime()
Set init time.
Bool_t IsLite() const
Definition: TProof.h:969
TObject * GetParameter(const char *par) const
Get specified parameter.
Definition: TProof.cxx:10496
Long64_t GetFirst() const
Definition: TDSet.h:114
void SetFileName(const char *name)
Float_t fProcTime
Definition: TProof.h:193
void Feedback(TList *objs)
Feedback signal.
Int_t GetBufferSize() const
Definition: TH1.h:238
TFileMerger * GetFileMerger(Bool_t local=kFALSE)
Get instance of the file merger to be used in 'merge' mode.
virtual Bool_t Notify()
Notify when timer times out.
Definition: TTimer.cxx:141
A doubly linked list.
Definition: TList.h:47
TObject * GetEntryList() const
Definition: TDSet.h:251
Bool_t fRedirLog
Definition: TProof.h:541
void ClearInput()
Clear input list.
void UpdateAutoBin(const char *name, Double_t &xmin, Double_t &xmax, Double_t &ymin, Double_t &ymax, Double_t &zmin, Double_t &zmax)
Update automatic binning parameters for given object "name".
static Float_t GetMemStop()
MemStop getter.
#define CATCH(n)
Definition: TException.h:67
Int_t GetPort() const
Definition: TUrl.h:87
static void Setup(TList *input)
Setup the PROOF input list with requested statistics and tracing options.
Definition: TPerfStats.cxx:727
virtual Bool_t AddFile(TFile *source, Bool_t own, Bool_t cpProgress)
Add the TFile to this file merger and give ownership of the TFile to this object (unless kFALSE is re...
void HandleGetTreeHeader(TMessage *mess)
Handle tree header request.
const char * GetName() const
Returns name of object.
Definition: TSlave.h:128
TSocket * GetSocket() const
Definition: TProofServ.h:270
EFileType
File type.
Definition: TFile.h:163
tuple outfile
Definition: mrt.py:21
TThread * t[5]
Definition: threadsh1.C:13
void Clear(Option_t *option="")
Remove all objects from the list.
Definition: THashList.cxx:168
float ymax
Definition: THbookFile.cxx:93
TDSetElement * GetNextPacket(TSlave *slave, TMessage *r)
Get next packet (may not be used in this class).
static Bool_t gAbort
const char * GetFileName() const
Definition: TDSet.h:113
Float_t fInitTime
Definition: TProof.h:192
Int_t fCpus
Definition: TSystem.h:165
const char * GetPrefix() const
Definition: TProofServ.h:289
ROOT::R::TRInterface & r
Definition: Object.C:4
Bool_t EndsWith(const char *pat, ECaseCompare cmp=kExact) const
Return true if string ends with the specified string.
Definition: TString.cxx:2207
Class to manage histogram axis.
Definition: TAxis.h:36
Float_t GetProcTime() const
R__EXTERN TSystem * gSystem
Definition: TSystem.h:545
Int_t GetNbins() const
Definition: TAxis.h:125
static void GetLocalServer(TString &dsrv)
Extract LOCALDATASERVER info in 'dsrv'.
const char * GetLibList() const
Definition: TQueryResult.h:137
1-D histogram with a int per channel (see TH1 documentation)}
Definition: TH1.h:529
static Bool_t IsStandardDraw(const char *selec)
Find out if this is a standard selection used for Draw actions (either TSelectorDraw, TProofDraw or deriving from them).
Definition: TSelector.cxx:235
TDirectory * GetDirectory() const
Definition: TTree.h:385
This class provides file copy and merging services.
Definition: TFileMerger.h:30
virtual Int_t GetValue(const char *name, Int_t dflt)
Returns the integer value for a resource.
Definition: TEnv.cxx:494
virtual const char * ClassName() const
Returns name of class to which the object belongs.
Definition: TObject.cxx:187
void AddInput(TObject *inp)
Add object to input list.
virtual void StopFeedback()
Stop feedback (may not be used in this class).
void SetupFeedback()
Setup reporting of feedback objects and progress messages.
virtual TObject * Remove(TObject *obj)
Remove object from the list.
Definition: TList.cxx:675
Long64_t GetTotalEntries() const
Long_t fMemVirtual
Definition: TSystem.h:207
TObject * Next()
Definition: TCollection.h:158
Int_t fSeqNum
Definition: TProof.h:557
virtual TEnvRec * Lookup(const char *n)
Loop over all resource records and return the one with name.
Definition: TEnv.cxx:550
void AddOutput(TList *out)
Incorporate the content of the received output list 'out' into the final output list fOutput...
TObject * Value() const
Definition: TMap.h:125
void SetMerged(Bool_t merged=kTRUE)
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
Definition: TString.cxx:2308
virtual Bool_t JoinProcess(TList *workers)
Prepares the given list of new workers to join a progressing process.
TList * GetInputList()
Definition: TQueryResult.h:128
Bool_t TestBit(UInt_t f) const
Definition: TObject.h:173
TMarker * m
Definition: textangle.C:8
Bool_t SetDataMembers(TSelector *sel) const
Given an output list, set the data members of a TSelector.
char * Form(const char *fmt,...)
Class to steer the merging of files produced on the workers.
Long64_t Process(TDSet *set, const char *selector, Option_t *option="", Long64_t nentries=-1, Long64_t firstentry=0)
Process specified TDSet on PROOF worker.
bool first
Definition: line3Dfit.C:48
virtual Bool_t SendProcessingStatus(const char *, Bool_t=kFALSE)
const char * AsString() const
Return UUID as string. Copy string immediately since it will be reused.
Definition: TUUID.cxx:536
void SaveSource(FILE *fp)
Save macro source in file pointer fp.
Definition: TMacro.cxx:379
virtual Int_t GetNbinsZ() const
Definition: TH1.h:298
A TEventList object is a list of selected events (entries) in a TTree.
Definition: TEventList.h:33
Handles synchronous and a-synchronous timer events.
Definition: TTimer.h:57
virtual const char * GetName() const
Returns name of object.
Definition: TNamed.h:51
void SetLastMergingMsg(TObject *obj)
Set the message to be notified in case of exception.
Long64_t fProcessed
Definition: TProof.h:190
Int_t GetProtocol() const
Definition: TProofServ.h:265
The ROOT global object gROOT contains a list of all defined classes.
Definition: TClass.h:81
TAxis * GetYaxis()
Definition: TH1.h:320
virtual Bool_t HandleTimer(TTimer *timer)
Send feedback objects to client.
float xmax
Definition: THbookFile.cxx:93
TStopwatch fQuerySTW
Definition: TProof.h:627
Int_t ReinitSelector(TQueryResult *qr)
Reinitialize fSelector using the selector files in the query result.
static Int_t GetFileReadCalls()
Static function returning the total number of read calls from all files.
Definition: TFile.cxx:4336
const char * GetCacheDir() const
Definition: TProofServ.h:262
Bool_t IsNull() const
Definition: TString.h:387
void SetName(const char *name)
Definition: TCollection.h:116
Int_t fProtocol
Definition: TProof.h:605
void Reset(Detail::TBranchProxy *x)
void HandleRecvHisto(TMessage *mess)
Receive histo from slave.
Int_t SendFile(const char *file, Int_t opt=(kBinary|kForward|kCp|kCpBin), const char *rfile=0, TSlave *sl=0)
Send a file to master or slave servers.
Definition: TProof.cxx:6892
void Warning(const char *location, const char *msgfmt,...)
Float_t fEffSessions
Definition: TProof.h:198
Long64_t GetEntries() const
Long64_t entry
Long64_t Finalize(Bool_t force=kFALSE, Bool_t sync=kFALSE)
Finalize query (may not be used in this class).
void FeedBackCanvas(const char *name, Bool_t create)
Create/destroy a named canvas for feedback.
const Double_t * GetBuffer() const
Definition: TH1.h:239
void DeleteValues()
Remove all (key,value) pairs from the map AND delete the values when they are allocated on the heap...
Definition: TMap.cxx:150
#define Printf
Definition: TGeoToOCC.h:18
Bool_t Matches(const char *ref)
Return TRUE if reference ref matches.
Int_t AddOutputObject(TObject *obj)
Incorporate the received object 'obj' into the output list fOutput.
TH1F * total
Definition: threadsh2.C:15
TDatime GetStartTime() const
Definition: TQueryResult.h:125
void InitWithPrototype(TClass *cl, const char *method, const char *proto, Bool_t objectIsConst=kFALSE, ROOT::EFunctionMatchMode mode=ROOT::kConversionMatch)
Initialize the method invocation environment.
const char * GetUrl(Bool_t withDeflt=kFALSE) const
Return full URL.
Definition: TUrl.cxx:385
Int_t GetSize() const
Definition: TArray.h:49
Long64_t Merge(TCollection *list)
Merge objects from the list into this object.
static TEventIter * Create(TDSet *dset, TSelector *sel, Long64_t first, Long64_t num)
Create and instance of the appropriate iterator.
Definition: TEventIter.cxx:150
virtual Int_t FindBin(Double_t x)
Find bin number corresponding to abscissa x.
Definition: TAxis.cxx:264
void UpdateProgressInfo()
Update fProgressStatus.
static TSelector * GetSelector(const char *filename)
The code in filename is loaded (interpreted or compiled, see below), filename must contain a valid cl...
Definition: TSelector.cxx:140
virtual Int_t RedirectOutput(const char *name, const char *mode="a", RedirectHandle_t *h=0)
Redirect standard output (stdout, stderr) to the specified file.
Definition: TSystem.cxx:1625
void SetHost(const char *host)
Definition: TUrl.h:93
TString & Remove(Ssiz_t pos)
Definition: TString.h:616
long Long_t
Definition: RtypesCore.h:50
int Ssiz_t
Definition: RtypesCore.h:63
Bool_t IsClient() const
Definition: TProofPlayer.h:220
The packetizer is a load balancing object created for each query.
TList * GetInputList()
Get input list.
Definition: TProof.cxx:10331
Bool_t IsClient() const
Is the player running on the client?
TString fOutputFilePath
Definition: TProofPlayer.h:115
Class used by TMap to store (key,value) pairs.
Definition: TMap.h:106
Int_t GetBufferLength() const
Definition: TH1.h:237
TFile * fOutputFile
Definition: TProofPlayer.h:116
TList * GetListOfActiveSlaves() const
Definition: TProof.h:761
virtual const char * GetIncludePath()
Get the list of include path.
Definition: TSystem.cxx:3728
virtual Int_t GetSize() const
Definition: TCollection.h:95
virtual void SetDirectory(TDirectory *dir)
Change the tree's directory.
Definition: TTree.cxx:8064
virtual void MergeOutput(Bool_t savememvalues=kFALSE)
Merge output (may not be used in this class).
void RestorePreviousQuery()
Definition: TProofPlayer.h:187
void ResetParam()
Reset parameter list. To be used before the first call the SetParam().
TList * GetMergeList() const
Definition: TFileMerger.h:80
static Int_t init()
virtual Func_t DynFindSymbol(const char *module, const char *entry)
Find specific entry point in specified library.
Definition: TSystem.cxx:1930
virtual const char * GetName() const
Returns name of object.
Definition: TObject.cxx:415
Int_t fMergersCount
Definition: TProof.h:584
double Double_t
Definition: RtypesCore.h:55
virtual const char * HostName()
Return the system's host name.
Definition: TSystem.cxx:307
void RedirectOutput(Bool_t on=kTRUE)
Control output redirection to TProof::fLogFileW.
char * DynamicPathName(const char *lib, Bool_t quiet=kFALSE)
Find a dynamic library called lib using the system search paths.
Definition: TSystem.cxx:1906
Int_t Lock()
Locks the directory.
TObject * Key() const
Definition: TMap.h:124
#define TRY
Definition: TException.h:60
Bool_t IsMaster() const
Definition: TProofServ.h:306
Describe directory structure in memory.
Definition: TDirectory.h:44
void Feedback(TList *objs)
Get list of feedback objects.
Definition: TProof.cxx:9846
TMap implements an associative array of (key,value) pairs using a THashTable for efficient retrieval ...
Definition: TMap.h:44
int type
Definition: TGX11.cxx:120
R__EXTERN TEnv * gEnv
Definition: TEnv.h:174
ClassImp(TMCParticle) void TMCParticle printf(": p=(%7.3f,%7.3f,%9.3f) ;", fPx, fPy, fPz)
void HandleGetTreeHeader(TMessage *mess)
Handle tree header request.
void dir(char *path=0)
Definition: rootalias.C:30
const char * GetOrdinal() const
Definition: TProofServ.h:266
TNamed()
Definition: TNamed.h:40
virtual void SendInputDataFile()
Send the input data objects to the master; the objects are taken from the dedicated list and / or the...
Definition: TProof.cxx:10190
This class controls a Parallel ROOT Facility, PROOF, cluster.
Definition: TProof.h:342
EExitStatus GetExitStatus() const
Definition: TProofPlayer.h:223
Double_t GetXmax() const
Definition: TAxis.h:138
int nentries
Definition: THbookFile.cxx:89
ErrorHandlerFunc_t fErrorHandler
tdset for current processing
Definition: TProofPlayer.h:307
Set the selector's data members to the corresponding elements of the output list. ...
The TH1 histogram class.
Definition: TH1.h:80
virtual TMD5 * Checksum()
Returns checksum of the current content.
Definition: TMacro.cxx:192
Class to find axis limits and synchronize them between workers.
virtual TDSetElement * GetNextPacket(TSlave *sl, TMessage *r)
Get next packet.
Bool_t IsMaster() const
Definition: TProof.h:972
virtual void Reset()
Reset or initialize access to the elements.
Definition: TDSet.cxx:1347
Bool_t MergeOutputFiles()
Merge output in files.
virtual void DispatchOneEvent(Bool_t pendingOnly=kFALSE)
Dispatch a single event.
Definition: TSystem.cxx:433
Long64_t DrawSelect(TDSet *set, const char *varexp, const char *selection, Option_t *option="", Long64_t nentries=-1, Long64_t firstentry=0)
Draw (may not be used in this class).
Bool_t IsMerged() const
Int_t InitPacketizer(TDSet *dset, Long64_t nentries, Long64_t first, const char *defpackunit, const char *defpackdata)
Init the packetizer Return 0 on success (fPacketizer is correctly initialized), -1 on failure...
Int_t Unlock()
Unlock the directory.
static TClass * GetClass(const char *name, Bool_t load=kTRUE, Bool_t silent=kFALSE)
Static method returning pointer to TClass of the specified class name.
Definition: TClass.cxx:2801
virtual void Clear(Option_t *option="")
Remove all objects from the list.
Definition: TList.cxx:349
void SetFailedPackets(TList *list)
Bool_t IsRegister() const
void SetSelectorDataMembersFromOutputList()
Set the selector's data members: find the mapping of data members to otuput list entries in the outpu...
Long64_t Finalize(Int_t query=-1, Bool_t force=kFALSE)
Finalize the qry-th query in fQueries.
Definition: TProof.cxx:5881
virtual Bool_t SendSelector(const char *selector_file)
Send the selector file(s) to master or worker nodes.
#define name(a, b)
Definition: linkTestLib0.cpp:5
Bool_t HandleTimer(TTimer *timer)
Handle timer event.
virtual Bool_t Add(TF1 *h1, Double_t c1=1, Option_t *option="")
Performs the operation: this = this + c1*f1 if errors are defined (see TH1::Sumw2), errors are also recalculated.
Definition: TH1.cxx:780
TAxis * GetZaxis()
Definition: TH1.h:321
virtual void Add(const TEventList *list)
Merge contents of alist with this list.
Definition: TEventList.cxx:114
void Throw(int code)
If an exception context has been set (using the TRY and RETRY macros) jump back to where it was set...
Definition: TException.cxx:27
virtual Int_t DrawCanvas(TObject *obj)
Draw the object if it is a canvas.
FILE * fLogFileW
Definition: TProof.h:543
Mother of all ROOT objects.
Definition: TObject.h:58
void Lookup(Bool_t removeMissing=kFALSE, TList **missingFiles=0)
Resolve the end-point URL for the current elements of this data set If the removeMissing option is se...
Definition: TDSet.cxx:1584
const char Int_t const char TProof * proof
Definition: TXSlave.cxx:46
virtual TObject * First() const
Return the first object in the list. Returns 0 when list is empty.
Definition: TList.cxx:557
void StopFeedback()
Stop feedback.
virtual Int_t GetNbinsY() const
Definition: TH1.h:297
virtual Long64_t * GetList() const
Definition: TEventList.h:57
typedef void((*Func_t)())
TObject * HandleHistogram(TObject *obj, Bool_t &merged)
Low statistic histograms need a special treatment when using autobin.
TUrl * GetCurrentUrl() const
Return the current url.
Definition: TFileInfo.cxx:246
Int_t Collect(ESlaves list=kActive, Long_t timeout=-1, Int_t endtype=-1, Bool_t deactonfail=kFALSE)
Collect responses from the slave servers.
Definition: TProof.cxx:2734
void Progress(Long64_t total, Long64_t processed)
Report progress (may not be used in this class).
static void SetMacroPath(const char *newpath)
Set or extend the macro search path.
Definition: TROOT.cxx:2480
virtual Bool_t cd(const char *path=0)
Change current directory to "this" directory.
Definition: TDirectory.cxx:433
R__EXTERN TProofServ * gProofServ
Definition: TProofServ.h:360
Int_t AddOutputObject(TObject *obj)
Incorporate output object (may not be used in this class).
virtual void Add(TObject *obj)
Definition: TList.h:81
const Ssiz_t kNPOS
Definition: Rtypes.h:115
Int_t Incorporate(TObject *obj, TList *out, Bool_t &merged)
Incorporate object 'newobj' in the list 'outlist'.
Class that contains a list of TFileInfo's and accumulated meta data information about its entries...
void Execute(const char *, const char *, int *=0)
Execute method on this object with the given parameter string, e.g.
Definition: TMethodCall.h:68
Utility class to draw objects in the feedback list during queries.
Definition: TDrawFeedback.h:39
void SetObject(TObject *object)
Set the object to be notified at time out.
Definition: TTimer.cxx:182
void Progress(Long64_t total, Long64_t processed)
Progress signal.
Definition: TProofPlayer.h:446
TFileCollection * GetFileCollection()
Get instance of the file collection to be used in 'dataset' mode.
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
Definition: TString.h:567
TF1 * f1
Definition: legend1.C:11
void SetParam(Long_t l)
Add a long method parameter.
Double_t GetRate() const
Bool_t IsTopMaster() const
Definition: TProofServ.h:308
const char * GetDir(Bool_t raw=kFALSE) const
ClassImp(TSlaveInfo) Int_t TSlaveInfo const TSlaveInfo * si
Used to sort slaveinfos by ordinal.
Definition: TProof.cxx:183
R__EXTERN Int_t gDebug
Definition: Rtypes.h:128
void StopProcess(Bool_t abort, Int_t timeout=-1)
Stop the process after this event.
TVirtualPacketizer * fPacketizer
Definition: TProofPlayer.h:304
virtual TIterator * MakeIterator(Bool_t dir=kIterForward) const
Return a list iterator.
Definition: TList.cxx:604
Int_t GetDrawArgs(const char *var, const char *sel, Option_t *opt, TString &selector, TString &objname)
Parse the arguments from var, sel and opt and fill the selector and object name accordingly.
virtual void StoreFeedback(TObject *slave, TList *out)
Store feedback results from the specified slave.
const char * GetType() const
Definition: TDSet.h:228
void Reset()
Definition: TStopwatch.h:54
TObject * GetOutput(const char *name) const
Get output object by name.
Int_t GetTotSessions() const
Definition: TProofServ.h:275
Float_t GetMaxProcTime() const
Definition: TDSet.h:144
void SetExitStatus(Int_t est)
Definition: TStatus.h:73
static void Stop()
Terminate the PROOF statistics run.
Definition: TPerfStats.cxx:764
virtual Long64_t GetEntries() const
Definition: TTree.h:386
void StoreFeedback(TObject *slave, TList *out)
Store feedback list (may not be used in this class).
Long64_t GetCacheSize()
Return the size in bytes of the cache.
A TTree object has a header with a name and a title.
Definition: TTree.h:98
#define gDirectory
Definition: TDirectory.h:221
Float_t fMBRateI
Definition: TProof.h:195
static void SetLastMsg(const char *lastmsg)
Set the message to be sent back in case of exceptions.
std::mutex fStopTimerMtx
Definition: TProofPlayer.h:108
Class describing a generic file including meta information.
Definition: TFileInfo.h:50
Bool_t IsMerge() const
void ResetBit(UInt_t f)
Definition: TObject.h:172
Bool_t fCreateSelObj
the latest selector
Definition: TProofPlayer.h:87
virtual Bool_t ExpandPathName(TString &path)
Expand a pathname getting rid of special shell characters like ~.
Definition: TSystem.cxx:1191
TList * GetConfigParams(Bool_t steal=kFALSE)
virtual void PrintFiles(Option_t *options)
Print list of files being merged.
virtual Int_t IndexOf(const TObject *obj) const
TClass * GetClass() const
Definition: TMessage.h:76
Bool_t IsParallel() const
True if in parallel mode.
Float_t GetInitTime() const
Class describing a PROOF worker server.
Definition: TSlave.h:50
static void output(int code)
Definition: gifencode.c:226
Container class for processing statistics.
A TSelector object is used by the TTree::Draw, TTree::Scan, TTree::Process to navigate in a TTree and...
Definition: TSelector.h:39
static void ErrorHandler(Int_t level, Bool_t abort, const char *location, const char *msg)
The PROOF error handler function.
const Bool_t kTRUE
Definition: Rtypes.h:91
float * q
Definition: THbookFile.cxx:87
virtual void Reset()
Reset merger file list.
virtual void SetTitle(const char *title="")
Change (i.e. set) the title of the TNamed.
Definition: TNamed.cxx:152
TObject * obj
virtual TObject * FindObject(const char *name) const
Must be redefined in derived classes.
Definition: TObject.cxx:379
A List of entry numbers in a TTree or TChain.
Definition: TEntryList.h:27
static void AutoBinFunc(TString &key, Double_t &xmin, Double_t &xmax, Double_t &ymin, Double_t &ymax, Double_t &zmin, Double_t &zmax)
Int_t GetParallel() const
Returns number of slaves active in parallel mode.
Definition: TProof.cxx:2311
static void SetMemValues()
Record memory usage.
Definition: TPerfStats.cxx:778
const Int_t n
Definition: legend1.C:16
Long64_t DrawSelect(TDSet *set, const char *varexp, const char *selection, Option_t *option="", Long64_t nentries=-1, Long64_t firstentry=0)
Draw (support for TChain::Draw()).
virtual Int_t Write(const char *name=0, Int_t option=0, Int_t bufsize=0)
Write all objects in this collection.
void NotifyMemory(TObject *obj)
Printout the memory record after merging object 'obj' This record is used by the memory monitor...
Bool_t IsValid() const
Return true if the method call has been properly initialized and is usable.
void StoreOutput(TList *out)
Store received output list.
TStopwatch * fMergeSTW
Definition: TProofPlayer.h:313
Long_t fMemResident
Definition: TSystem.h:206
const char * GetFile() const
Definition: TUrl.h:78
virtual int GetSysInfo(SysInfo_t *info) const
Returns static system info, like OS type, CPU type, number of CPUs RAM size, etc into the SysInfo_t s...
Definition: TSystem.cxx:2360
void SetProcessing(Bool_t on=kTRUE)
Set processing bit according to 'on'.
TAxis * GetXaxis()
Definition: TH1.h:319
void Progress(Long64_t total, Long64_t processed)
Progress signal.
virtual ~TProofPlayer()
Destructor.
virtual Long64_t Merge(TCollection *list)
Add all histograms in the collection to this histogram.
Definition: TH1.cxx:5305
virtual void SetIncludePath(const char *includePath)
IncludePath should contain the list of compiler flags to indicate where to find user defined header f...
Definition: TSystem.cxx:3930
ClassImp(TProofPlayerLocal) Long64_t TProofPlayerLocal
Process the specified TSelector object 'nentries' times.
const char * GetDirectory() const
Return directory where to look for object.
Definition: TDSet.cxx:234
static Int_t SendInputData(TQueryResult *qr, TProof *p, TString &emsg)
Send the input data file to the workers.
Definition: TProof.cxx:12959
void Feedback(TList *objs)
Set feedback list (may not be used in this class).
TDrawFeedback * CreateDrawFeedback(TProof *p)
Draw feedback creation proxy.
Bool_t fSavePartialResults
Definition: TProofPlayer.h:118
Int_t Remove(TDSetElement *elem, Bool_t deleteElem=kTRUE)
Remove TDSetElement 'elem' from the list.
Definition: TDSet.cxx:1555
const char * GetDataDirOpts() const
Definition: TProofServ.h:264
Stopwatch class.
Definition: TStopwatch.h:30
virtual Int_t AddWorkers(TList *workers)
Adds new workers.
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition: TObject.cxx:904