Logo ROOT   6.08/07
Reference Guide
TProofPlayer.cxx
Go to the documentation of this file.
1 // @(#)root/proofplayer:$Id$
2 // Author: Maarten Ballintijn 07/01/02
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2001, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 /** \class TProofPlayer
13 \ingroup proofkernel
14 
15 Internal class steering processing in PROOF.
16 Instances of the TProofPlayer class are created on the worker nodes
17 per session and do the processing.
18 Instances of its subclass - TProofPlayerRemote are created per each
19 query on the master(s) and on the client. On the master(s),
20 TProofPlayerRemote coordinate processing, check the dataset, create
21 the packetizer and take care of merging the results of the workers.
22 The instance on the client collects information on the input
23 (dataset and selector), it invokes the Begin() method and finalizes
24 the query by calling Terminate().
25 
26 */
27 
28 #include "TProofDraw.h"
29 #include "TProofPlayer.h"
30 #include "THashList.h"
31 #include "TEnv.h"
32 #include "TEventIter.h"
33 #include "TVirtualPacketizer.h"
34 #include "TSelector.h"
35 #include "TSocket.h"
36 #include "TProofServ.h"
37 #include "TProof.h"
38 #include "TProofOutputFile.h"
39 #include "TProofSuperMaster.h"
40 #include "TSlave.h"
41 #include "TClass.h"
42 #include "TROOT.h"
43 #include "TError.h"
44 #include "TException.h"
45 #include "MessageTypes.h"
46 #include "TMessage.h"
47 #include "TDSetProxy.h"
48 #include "TString.h"
49 #include "TSystem.h"
50 #include "TFile.h"
51 #include "TFileCollection.h"
52 #include "TFileInfo.h"
53 #include "TFileMerger.h"
54 #include "TProofDebug.h"
55 #include "TTimer.h"
56 #include "TMap.h"
57 #include "TPerfStats.h"
58 #include "TStatus.h"
59 #include "TEventList.h"
60 #include "TProofLimitsFinder.h"
61 #include "THashList.h"
62 #include "TSortedList.h"
63 #include "TTree.h"
64 #include "TEntryList.h"
65 #include "TDSet.h"
66 #include "TDrawFeedback.h"
67 #include "TNamed.h"
68 #include "TObjString.h"
69 #include "TQueryResult.h"
70 #include "TMD5.h"
71 #include "TMethodCall.h"
72 #include "TObjArray.h"
73 #include "TH1.h"
74 #include "TVirtualMonitoring.h"
75 #include "TParameter.h"
77 #include "TStopwatch.h"
78 
79 // Timeout exception
80 #define kPEX_STOPPED 1001
81 #define kPEX_ABORTED 1002
82 
83 // To flag an abort condition: use a local static variable to avoid
84 // warnings about problems with longjumps
85 static Bool_t gAbort = kFALSE;
86 
87 class TAutoBinVal : public TNamed {
88 private:
89  Double_t fXmin, fXmax, fYmin, fYmax, fZmin, fZmax;
90 
91 public:
92  TAutoBinVal(const char *name, Double_t xmin, Double_t xmax, Double_t ymin,
93  Double_t ymax, Double_t zmin, Double_t zmax) : TNamed(name,"")
94  {
95  fXmin = xmin; fXmax = xmax;
96  fYmin = ymin; fYmax = ymax;
97  fZmin = zmin; fZmax = zmax;
98  }
99  void GetAll(Double_t& xmin, Double_t& xmax, Double_t& ymin,
100  Double_t& ymax, Double_t& zmin, Double_t& zmax)
101  {
102  xmin = fXmin; xmax = fXmax;
103  ymin = fYmin; ymax = fYmax;
104  zmin = fZmin; zmax = fZmax;
105  }
106 
107 };
108 
109 //
110 // Special timer to dispatch pending events while processing
111 ////////////////////////////////////////////////////////////////////////////////
112 
113 class TDispatchTimer : public TTimer {
114 private:
115  TProofPlayer *fPlayer;
116 
117 public:
118  TDispatchTimer(TProofPlayer *p) : TTimer(1000, kFALSE), fPlayer(p) { }
119 
120  Bool_t Notify();
121 };
122 ////////////////////////////////////////////////////////////////////////////////
123 /// Handle expiration of the timer associated with dispatching pending
124 /// events while processing. We must act as fast as possible here, so
125 /// we just set a flag submitting a request for dispatching pending events
126 
127 Bool_t TDispatchTimer::Notify()
128 {
129  if (gDebug > 0) printf("TDispatchTimer::Notify: called!\n");
130 
131  fPlayer->SetBit(TProofPlayer::kDispatchOneEvent);
132 
133  // Needed for the next shot
134  Reset();
135  return kTRUE;
136 }
137 
138 //
139 // Special timer to notify reach of max packet proc time
140 ////////////////////////////////////////////////////////////////////////////////
141 
142 class TProctimeTimer : public TTimer {
143 private:
144  TProofPlayer *fPlayer;
145 
146 public:
147  TProctimeTimer(TProofPlayer *p, Long_t to) : TTimer(to, kFALSE), fPlayer(p) { }
148 
149  Bool_t Notify();
150 };
151 ////////////////////////////////////////////////////////////////////////////////
152 /// Handle expiration of the timer associated with dispatching pending
153 /// events while processing. We must act as fast as possible here, so
154 /// we just set a flag submitting a request for dispatching pending events
155 
156 Bool_t TProctimeTimer::Notify()
157 {
158  if (gDebug > 0) printf("TProctimeTimer::Notify: called!\n");
159 
160  fPlayer->SetBit(TProofPlayer::kMaxProcTimeReached);
161 
162  // One shot only
163  return kTRUE;
164 }
165 
166 //
167 // Special timer to handle stop/abort request via exception raising
168 ////////////////////////////////////////////////////////////////////////////////
169 
170 class TStopTimer : public TTimer {
171 private:
172  Bool_t fAbort;
173  TProofPlayer *fPlayer;
174 
175 public:
176  TStopTimer(TProofPlayer *p, Bool_t abort, Int_t to);
177 
178  Bool_t Notify();
179 };
180 
181 ////////////////////////////////////////////////////////////////////////////////
182 /// Constructor for the timer to stop/abort processing.
183 /// The 'timeout' is in seconds.
184 /// Make sure that 'to' make sense, i.e. not larger than 10 days;
185 /// the minimum value is 10 ms (0 does not seem to start the timer ...).
186 
187 TStopTimer::TStopTimer(TProofPlayer *p, Bool_t abort, Int_t to)
188  : TTimer(((to <= 0 || to > 864000) ? 10 : to * 1000), kFALSE)
189 {
190  if (gDebug > 0)
191  Info ("TStopTimer","enter: %d, timeout: %d", abort, to);
192 
193  fPlayer = p;
194  fAbort = abort;
195 
196  if (gDebug > 1)
197  Info ("TStopTimer","timeout set to %s ms", fTime.AsString());
198 }
199 
200 ////////////////////////////////////////////////////////////////////////////////
201 /// Handle the signal coming from the expiration of the timer
202 /// associated with an abort or stop request.
203 /// We raise an exception which will be processed in the
204 /// event loop.
205 
206 Bool_t TStopTimer::Notify()
207 {
208  if (gDebug > 0) printf("TStopTimer::Notify: called!\n");
209 
210  if (fAbort)
212  else
214 
215  return kTRUE;
216 }
217 
218 //------------------------------------------------------------------------------
219 
221 
223 
224 ////////////////////////////////////////////////////////////////////////////////
225 /// Default ctor.
226 
228  : fAutoBins(0), fOutput(0), fSelector(0), fCreateSelObj(kTRUE), fSelectorClass(0),
229  fFeedbackTimer(0), fFeedbackPeriod(2000),
230  fEvIter(0), fSelStatus(0),
231  fTotalEvents(0), fReadBytesRun(0), fReadCallsRun(0), fProcessedRun(0),
232  fQueryResults(0), fQuery(0), fPreviousQuery(0), fDrawQueries(0),
233  fMaxDrawQueries(1), fStopTimer(0), fDispatchTimer(0),
234  fProcTimeTimer(0), fProcTime(0),
235  fOutputFile(0),
236  fSaveMemThreshold(-1), fSavePartialResults(kFALSE), fSaveResultsPerPacket(kFALSE)
237 {
238  fInput = new TList;
245 
246  static Bool_t initLimitsFinder = kFALSE;
247  if (!initLimitsFinder && gProofServ && !gProofServ->IsMaster()) {
249  initLimitsFinder = kTRUE;
250  }
251 
252 }
253 
254 ////////////////////////////////////////////////////////////////////////////////
255 /// Destructor.
256 
258 {
259  fInput->Clear("nodelete");
261  // The output list is owned by fSelector and destroyed in there
270 }
271 
272 ////////////////////////////////////////////////////////////////////////////////
273 /// Set processing bit according to 'on'
274 
276 {
277  if (on)
279  else
281 }
282 
283 ////////////////////////////////////////////////////////////////////////////////
284 /// Stop the process after this event. If timeout is positive, start
285 /// a timer firing after timeout seconds to hard-stop time-expensive
286 /// events.
287 
289 {
290  if (gDebug > 0)
291  Info ("StopProcess","abort: %d, timeout: %d", abort, timeout);
292 
293  if (fEvIter != 0)
294  fEvIter->StopProcess(abort);
295  Long_t to = 1;
296  if (abort == kTRUE) {
298  } else {
300  to = timeout;
301  }
302  // Start countdown, if needed
303  if (to > 0)
304  SetStopTimer(kTRUE, abort, to);
305 }
306 
307 ////////////////////////////////////////////////////////////////////////////////
308 /// Enable/disable the timer to dispatch pening events while processing.
309 
311 {
314  if (on) {
315  fDispatchTimer = new TDispatchTimer(this);
317  }
318 }
319 
320 ////////////////////////////////////////////////////////////////////////////////
321 /// Enable/disable the timer to stop/abort processing.
322 /// The 'timeout' is in seconds.
323 
325 {
326  std::lock_guard<std::mutex> lock(fStopTimerMtx);
327 
328  // Clean-up the timer
330  if (on) {
331  // create timer
332  fStopTimer = new TStopTimer(this, abort, timeout);
333  // Start the countdown
334  fStopTimer->Start();
335  if (gDebug > 0)
336  Info ("SetStopTimer", "%s timer STARTED (timeout: %d)",
337  (abort ? "ABORT" : "STOP"), timeout);
338  } else {
339  if (gDebug > 0)
340  Info ("SetStopTimer", "timer STOPPED");
341  }
342 }
343 
344 ////////////////////////////////////////////////////////////////////////////////
345 /// Add query result to the list, making sure that there are no
346 /// duplicates.
347 
349 {
350  if (!q) {
351  Warning("AddQueryResult","query undefined - do nothing");
352  return;
353  }
354 
355  // Treat differently normal and draw queries
356  if (!(q->IsDraw())) {
357  if (!fQueryResults) {
358  fQueryResults = new TList;
359  fQueryResults->Add(q);
360  } else {
361  TIter nxr(fQueryResults);
362  TQueryResult *qr = 0;
363  TQueryResult *qp = 0;
364  while ((qr = (TQueryResult *) nxr())) {
365  // If same query, remove old version and break
366  if (*qr == *q) {
367  fQueryResults->Remove(qr);
368  delete qr;
369  break;
370  }
371  // Record position according to start time
372  if (qr->GetStartTime().Convert() <= q->GetStartTime().Convert())
373  qp = qr;
374  }
375 
376  if (!qp) {
378  } else {
379  fQueryResults->AddAfter(qp, q);
380  }
381  }
382  } else if (IsClient()) {
383  // If max reached, eliminate first the oldest one
385  TIter nxr(fQueryResults);
386  TQueryResult *qr = 0;
387  while ((qr = (TQueryResult *) nxr())) {
388  // If same query, remove old version and break
389  if (qr->IsDraw()) {
390  fDrawQueries--;
391  fQueryResults->Remove(qr);
392  delete qr;
393  break;
394  }
395  }
396  }
397  // Add new draw query
398  if (fDrawQueries >= 0 && fDrawQueries < fMaxDrawQueries) {
399  fDrawQueries++;
400  if (!fQueryResults)
401  fQueryResults = new TList;
402  fQueryResults->Add(q);
403  }
404  }
405 }
406 
407 ////////////////////////////////////////////////////////////////////////////////
408 /// Remove all query result instances referenced 'ref' from
409 /// the list of results.
410 
411 void TProofPlayer::RemoveQueryResult(const char *ref)
412 {
413  if (fQueryResults) {
414  TIter nxq(fQueryResults);
415  TQueryResult *qr = 0;
416  while ((qr = (TQueryResult *) nxq())) {
417  if (qr->Matches(ref)) {
418  fQueryResults->Remove(qr);
419  delete qr;
420  }
421  }
422  }
423 }
424 
425 ////////////////////////////////////////////////////////////////////////////////
426 /// Get query result instances referenced 'ref' from
427 /// the list of results.
428 
430 {
431  if (fQueryResults) {
432  if (ref && strlen(ref) > 0) {
433  TIter nxq(fQueryResults);
434  TQueryResult *qr = 0;
435  while ((qr = (TQueryResult *) nxq())) {
436  if (qr->Matches(ref))
437  return qr;
438  }
439  } else {
440  // Get last
441  return (TQueryResult *) fQueryResults->Last();
442  }
443  }
444 
445  // Nothing found
446  return (TQueryResult *)0;
447 }
448 
449 ////////////////////////////////////////////////////////////////////////////////
450 /// Set current query and save previous value.
451 
453 {
455  fQuery = q;
456 }
457 
458 ////////////////////////////////////////////////////////////////////////////////
459 /// Add object to input list.
460 
462 {
463  fInput->Add(inp);
464 }
465 
466 ////////////////////////////////////////////////////////////////////////////////
467 /// Clear input list.
468 
470 {
471  fInput->Clear();
472 }
473 
474 ////////////////////////////////////////////////////////////////////////////////
475 /// Get output object by name.
476 
478 {
479  if (fOutput)
480  return fOutput->FindObject(name);
481  return 0;
482 }
483 
484 ////////////////////////////////////////////////////////////////////////////////
485 /// Get output list.
486 
488 {
489  TList *ol = fOutput;
490  if (!ol && fQuery)
491  ol = fQuery->GetOutputList();
492  return ol;
493 }
494 
495 ////////////////////////////////////////////////////////////////////////////////
496 /// Reinitialize fSelector using the selector files in the query result.
497 /// Needed when Finalize is called after a Process execution for the same
498 /// selector name.
499 
501 {
502  Int_t rc = 0;
503 
504  // Make sure we have a query
505  if (!qr) {
506  Info("ReinitSelector", "query undefined - do nothing");
507  return -1;
508  }
509 
510  // Selector name
511  TString selec = qr->GetSelecImp()->GetName();
512  if (selec.Length() <= 0) {
513  Info("ReinitSelector", "selector name undefined - do nothing");
514  return -1;
515  }
516 
517  // Find out if this is a standard selection used for Draw actions
518  Bool_t stdselec = TSelector::IsStandardDraw(selec);
519 
520  // Find out if this is a precompiled selector: in such a case we do not
521  // have the code in TMacros, so we must rely on local libraries
522  Bool_t compselec = (selec.Contains(".") || stdselec) ? kFALSE : kTRUE;
523 
524  // If not, find out if it needs to be expanded
525  TString ipathold;
526  if (!stdselec && !compselec) {
527  // Check checksums for the versions of the selector files
528  Bool_t expandselec = kTRUE;
529  TString dir, ipath;
530  char *selc = gSystem->Which(TROOT::GetMacroPath(), selec, kReadPermission);
531  if (selc) {
532  // Check checksums
533  TMD5 *md5icur = 0, *md5iold = 0, *md5hcur = 0, *md5hold = 0;
534  // Implementation files
535  md5icur = TMD5::FileChecksum(selc);
536  md5iold = qr->GetSelecImp()->Checksum();
537  // Header files
538  TString selh(selc);
539  Int_t dot = selh.Last('.');
540  if (dot != kNPOS) selh.Remove(dot);
541  selh += ".h";
543  md5hcur = TMD5::FileChecksum(selh);
544  md5hold = qr->GetSelecHdr()->Checksum();
545 
546  // If nothing has changed nothing to do
547  if (md5hcur && md5hold && md5icur && md5iold)
548  if (*md5hcur == *md5hold && *md5icur == *md5iold)
549  expandselec = kFALSE;
550 
551  SafeDelete(md5icur);
552  SafeDelete(md5hcur);
553  SafeDelete(md5iold);
554  SafeDelete(md5hold);
555  if (selc) delete [] selc;
556  }
557 
558  Bool_t ok = kTRUE;
559  // Expand selector files, if needed
560  if (expandselec) {
561 
562  ok = kFALSE;
563  // Expand files in a temporary directory
564  TUUID u;
565  dir = Form("%s/%s",gSystem->TempDirectory(),u.AsString());
566  if (!(gSystem->MakeDirectory(dir))) {
567 
568  // Export implementation file
569  selec = Form("%s/%s",dir.Data(),selec.Data());
570  qr->GetSelecImp()->SaveSource(selec);
571 
572  // Export header file
573  TString seleh = Form("%s/%s",dir.Data(),qr->GetSelecHdr()->GetName());
574  qr->GetSelecHdr()->SaveSource(seleh);
575 
576  // Adjust include path
577  ipathold = gSystem->GetIncludePath();
578  ipath = Form("-I%s %s", dir.Data(), gSystem->GetIncludePath());
579  gSystem->SetIncludePath(ipath.Data());
580 
581  ok = kTRUE;
582  }
583  }
584  TString opt(qr->GetOptions());
585  Ssiz_t id = opt.Last('#');
586  if (id != kNPOS && id < opt.Length() - 1)
587  selec += opt(id + 1, opt.Length());
588 
589  if (!ok) {
590  Info("ReinitSelector", "problems locating or exporting selector files");
591  return -1;
592  }
593  }
594 
595  // Cleanup previous stuff
597  fSelectorClass = 0;
598 
599  // Init the selector now
600  Int_t iglevelsave = gErrorIgnoreLevel;
601  if (compselec)
602  // Silent error printout on first attempt
604 
605  if ((fSelector = TSelector::GetSelector(selec))) {
606  if (compselec)
607  gErrorIgnoreLevel = iglevelsave; // restore ignore level
608  fSelectorClass = fSelector->IsA();
610 
611  } else {
612  if (compselec) {
613  gErrorIgnoreLevel = iglevelsave; // restore ignore level
614  // Retry by loading first the libraries listed in TQueryResult, if any
615  if (strlen(qr->GetLibList()) > 0) {
616  TString sl(qr->GetLibList());
617  TObjArray *oa = sl.Tokenize(" ");
618  if (oa) {
619  Bool_t retry = kFALSE;
620  TIter nxl(oa);
621  TObjString *os = 0;
622  while ((os = (TObjString *) nxl())) {
623  TString lib = gSystem->BaseName(os->GetName());
624  if (lib != "lib") {
625  lib.ReplaceAll("-l", "lib");
626  if (gSystem->Load(lib) == 0)
627  retry = kTRUE;
628  }
629  }
630  // Retry now, if the case
631  if (retry)
633  }
634  }
635  }
636  if (!fSelector) {
637  if (compselec)
638  Info("ReinitSelector", "compiled selector re-init failed:"
639  " automatic reload unsuccessful:"
640  " please load manually the correct library");
641  rc = -1;
642  }
643  }
644  if (fSelector) {
645  // Draw needs to reinit temp histos
647  if (stdselec) {
648  ((TProofDraw *)fSelector)->DefVar();
649  } else {
650  // variables may have been initialized in Begin()
651  fSelector->Begin(0);
652  }
653  }
654 
655  // Restore original include path, if needed
656  if (ipathold.Length() > 0)
657  gSystem->SetIncludePath(ipathold.Data());
658 
659  return rc;
660 }
661 
662 ////////////////////////////////////////////////////////////////////////////////
663 /// Incorporate output object (may not be used in this class).
664 
666 {
667  MayNotUse("AddOutputObject");
668  return -1;
669 }
670 
671 ////////////////////////////////////////////////////////////////////////////////
672 /// Incorporate output list (may not be used in this class).
673 
675 {
676  MayNotUse("AddOutput");
677 }
678 
679 ////////////////////////////////////////////////////////////////////////////////
680 /// Store output list (may not be used in this class).
681 
683 {
684  MayNotUse("StoreOutput");
685 }
686 
687 ////////////////////////////////////////////////////////////////////////////////
688 /// Store feedback list (may not be used in this class).
689 
691 {
692  MayNotUse("StoreFeedback");
693 }
694 
695 ////////////////////////////////////////////////////////////////////////////////
696 /// Report progress (may not be used in this class).
697 
698 void TProofPlayer::Progress(Long64_t /*total*/, Long64_t /*processed*/)
699 {
700  MayNotUse("Progress");
701 }
702 
703 ////////////////////////////////////////////////////////////////////////////////
704 /// Report progress (may not be used in this class).
705 
706 void TProofPlayer::Progress(Long64_t /*total*/, Long64_t /*processed*/,
707  Long64_t /*bytesread*/,
708  Float_t /*evtRate*/, Float_t /*mbRate*/,
709  Float_t /*evtrti*/, Float_t /*mbrti*/)
710 {
711  MayNotUse("Progress");
712 }
713 
714 ////////////////////////////////////////////////////////////////////////////////
715 /// Report progress (may not be used in this class).
716 
718 {
719  MayNotUse("Progress");
720 }
721 
722 ////////////////////////////////////////////////////////////////////////////////
723 /// Set feedback list (may not be used in this class).
724 
726 {
727  MayNotUse("Feedback");
728 }
729 
730 ////////////////////////////////////////////////////////////////////////////////
731 /// Draw feedback creation proxy. When accessed via TProof avoids
732 /// link dependency on libProofPlayer.
733 
735 {
736  return new TDrawFeedback(p);
737 }
738 
739 ////////////////////////////////////////////////////////////////////////////////
740 /// Set draw feedback option.
741 
743 {
744  if (f)
745  f->SetOption(opt);
746 }
747 
748 ////////////////////////////////////////////////////////////////////////////////
749 /// Delete draw feedback object.
750 
752 {
753  delete f;
754 }
755 
756 ////////////////////////////////////////////////////////////////////////////////
757 /// Save the partial results of this query to a dedicated file under the user
758 /// data directory. The file name has the form
759 /// <session_tag>.q<query_seq_num>.root
760 /// The file pat and the file are created if not existing already.
761 /// Only objects in the outputlist not being TProofOutputFile are saved.
762 /// The packets list 'packets' is saved if given.
763 /// Trees not attached to any file are attached to the open file.
764 /// If 'queryend' is kTRUE evrything is written out (TTrees included).
765 /// The actual saving action is controlled by 'force' and by fSavePartialResults /
766 /// fSaveResultsPerPacket:
767 ///
768 /// fSavePartialResults = kFALSE/kTRUE no-saving/saving
769 /// fSaveResultsPerPacket = kFALSE/kTRUE save-per-query/save-per-packet
770 ///
771 /// The function CheckMemUsage sets fSavePartialResults = 1 if fSaveMemThreshold > 0 and
772 /// ProcInfo_t::fMemResident >= fSaveMemThreshold: from that point on partial results
773 /// are always saved and expensive calls to TSystem::GetProcInfo saved.
774 /// The switch fSaveResultsPerPacket is instead controlled by the user or admin
775 /// who can also force saving in all cases; parameter PROOF_SavePartialResults or
776 /// RC env ProofPlayer.SavePartialResults .
777 /// However, if 'force' is kTRUE, fSavePartialResults and fSaveResultsPerPacket
778 /// are ignored.
779 /// Return -1 in case of problems, 0 otherwise.
780 
782 {
783  Bool_t save = (force || (fSavePartialResults &&
784  (queryend || fSaveResultsPerPacket))) ? kTRUE : kFALSE;
785  if (!save) {
786  PDB(kOutput, 2)
787  Info("SavePartialResults", "partial result saving disabled");
788  return 0;
789  }
790 
791  // Sanity check
792  if (!gProofServ) {
793  Error("SavePartialResults", "gProofServ undefined: something really wrong going on!!!");
794  return -1;
795  }
796  if (!fOutput) {
797  Error("SavePartialResults", "fOutput undefined: something really wrong going on!!!");
798  return -1;
799  }
800 
801  PDB(kOutput, 1)
802  Info("SavePartialResults", "start saving partial results {%d,%d,%d,%d}",
803  queryend, force, fSavePartialResults, fSaveResultsPerPacket);
804 
805  // Get list of processed packets from the iterator
806  PDB(kOutput, 2) Info("SavePartialResults", "fEvIter: %p", fEvIter);
807 
808  TList *packets = (fEvIter) ? fEvIter->GetPackets() : 0;
809  PDB(kOutput, 2) Info("SavePartialResults", "list of packets: %p, sz: %d",
810  packets, (packets ? packets->GetSize(): -1));
811 
812  // Open the file
813  const char *oopt = "UPDATE";
814  // Check if the file has already been defined
815  TString baseName(fOutputFilePath);
816  if (fOutputFilePath.IsNull()) {
817  baseName.Form("output-%s.q%d.root", gProofServ->GetTopSessionTag(), gProofServ->GetQuerySeqNum());
818  if (gProofServ->GetDataDirOpts() && strlen(gProofServ->GetDataDirOpts()) > 0) {
819  fOutputFilePath.Form("%s/%s?%s", gProofServ->GetDataDir(), baseName.Data(),
821  } else {
822  fOutputFilePath.Form("%s/%s", gProofServ->GetDataDir(), baseName.Data());
823  }
824  Info("SavePartialResults", "file with (partial) output: '%s'", fOutputFilePath.Data());
825  oopt = "RECREATE";
826  }
827  // Open the file in write mode
828  if (!(fOutputFile = TFile::Open(fOutputFilePath, oopt)) ||
829  (fOutputFile && fOutputFile->IsZombie())) {
830  Error("SavePartialResults", "cannot open '%s' for writing", fOutputFilePath.Data());
832  return -1;
833  }
834 
835  // Save current directory
836  TDirectory *curdir = gDirectory;
837  fOutputFile->cd();
838 
839  // Write first the packets list, if required
840  if (packets) {
841  TDirectory *packetsDir = fOutputFile->mkdir("packets");
842  if (packetsDir) packetsDir->cd();
844  fOutputFile->cd();
845  }
846 
847  Bool_t notempty = kFALSE;
848  // Write out the output list
849  TList torm;
850  TIter nxo(fOutput);
851  TObject *o = 0;
852  while ((o = nxo())) {
853  // Skip output file drivers
854  if (o->InheritsFrom(TProofOutputFile::Class())) continue;
855  // Skip control objets
856  if (!strncmp(o->GetName(), "PROOF_", 6)) continue;
857  // Skip data members mapping
859  // Skip missing file info
860  if (!strcmp(o->GetName(), "MissingFiles")) continue;
861  // Trees need a special treatment
862  if (o->InheritsFrom("TTree")) {
863  TTree *t = (TTree *) o;
864  TDirectory *d = t->GetDirectory();
865  // If the tree is not attached to any file ...
866  if (!d || (d && !d->InheritsFrom("TFile"))) {
867  // ... we attach it
869  }
870  if (t->GetDirectory() == fOutputFile) {
871  if (queryend) {
872  // ... we write it out
873  o->Write(0, TObject::kOverwrite);
874  // At least something in the file
875  notempty = kTRUE;
876  // Flag for removal from the outputlist
877  torm.Add(o);
878  // Prevent double-deletion attempts
879  t->SetDirectory(0);
880  } else {
881  // ... or we set in automatic flush mode
882  t->SetAutoFlush();
883  }
884  }
885  } else if (queryend || fSaveResultsPerPacket) {
886  // Save overwriting what's already there
887  o->Write(0, TObject::kOverwrite);
888  // At least something in the file
889  notempty = kTRUE;
890  // Flag for removal from the outputlist
891  if (queryend) torm.Add(o);
892  }
893  }
894 
895  // Restore previous directory
896  gDirectory = curdir;
897 
898  // Close the file if required
899  if (notempty) {
900  if (!fOutput->FindObject(baseName)) {
901  TProofOutputFile *po = 0;
902  // Get directions
903  TNamed *nm = (TNamed *) fInput->FindObject("PROOF_DefaultOutputOption");
904  TString oname = (nm) ? nm->GetTitle() : fOutputFilePath.Data();
905  if (nm && oname.BeginsWith("ds:")) {
906  oname.Replace(0, 3, "");
907  TString qtag =
909  oname.ReplaceAll("<qtag>", qtag);
910  // Create the TProofOutputFile for dataset creation
911  po = new TProofOutputFile(baseName, "DRO", oname.Data());
912  } else {
913  Bool_t hasddir = kFALSE;
914  // Create the TProofOutputFile for automatic merging
915  po = new TProofOutputFile(baseName, "M");
916  if (oname.BeginsWith("of:")) oname.Replace(0, 3, "");
917  if (gProofServ->IsTopMaster()) {
918  if (!strcmp(TUrl(oname, kTRUE).GetProtocol(), "file")) {
919  TString dsrv;
921  TProofServ::FilterLocalroot(oname, dsrv);
922  oname.Insert(0, dsrv);
923  }
924  } else {
925  if (nm) {
926  // The name has been sent by the client: resolve local place holders
927  oname.ReplaceAll("<file>", baseName);
928  } else {
929  // We did not get any indication; the final file will be in the datadir on
930  // the top master and it will be resolved there
931  oname.Form("<datadir>/%s", baseName.Data());
932  hasddir = kTRUE;
933  }
934  }
935  po->SetOutputFileName(oname.Data());
936  if (hasddir)
937  // Reset the bit, so that <datadir> has a chance to be resolved in AddOutputObject
939  po->SetName(gSystem->BaseName(oname.Data()));
940  }
941  po->AdoptFile(fOutputFile);
942  fOutput->Add(po);
943  // Flag the nature of this file
945  }
946  }
947  fOutputFile->Close();
949 
950  // If last call, cleanup the output list from objects saved to file
951  if (queryend && torm.GetSize() > 0) {
952  TIter nxrm(&torm);
953  while ((o = nxrm())) { fOutput->Remove(o); }
954  }
955  torm.SetOwner(kFALSE);
956 
957  PDB(kOutput, 1)
958  Info("SavePartialResults", "partial results saved to file");
959  // We are done
960  return 0;
961 }
962 
963 ////////////////////////////////////////////////////////////////////////////////
964 /// Make sure that a valid selector object
965 /// Return -1 in case of problems, 0 otherwise
966 
967 Int_t TProofPlayer::AssertSelector(const char *selector_file)
968 {
969  if (selector_file && strlen(selector_file)) {
971 
972  // Get selector files from cache
973  TString ocwd = gSystem->WorkingDirectory();
974  if (gProofServ) {
977  }
978 
979  fSelector = TSelector::GetSelector(selector_file);
980 
981  if (gProofServ) {
982  gSystem->ChangeDirectory(ocwd);
984  }
985 
986  if (!fSelector) {
987  Error("AssertSelector", "cannot load: %s", selector_file );
988  return -1;
989  }
990 
992  Info("AssertSelector", "Processing via filename (%s)", selector_file);
993  } else if (!fSelector) {
994  Error("AssertSelector", "no TSelector object define : cannot continue!");
995  return -1;
996  } else {
997  Info("AssertSelector", "Processing via TSelector object");
998  }
999  // Done
1000  return 0;
1001 }
1002 ////////////////////////////////////////////////////////////////////////////////
1003 /// Update fProgressStatus
1004 
1006 {
1007  if (fProgressStatus) {
1012  if (gMonitoringWriter)
1015  fProcessedRun = 0;
1016  }
1017 }
1018 
1019 ////////////////////////////////////////////////////////////////////////////////
1020 /// Process specified TDSet on PROOF worker.
1021 /// The return value is -1 in case of error and TSelector::GetStatus()
1022 /// in case of success.
1023 
1024 Long64_t TProofPlayer::Process(TDSet *dset, const char *selector_file,
1025  Option_t *option, Long64_t nentries,
1026  Long64_t first)
1027 {
1028  PDB(kGlobal,1) Info("Process","Enter");
1029 
1031  fOutput = 0;
1032 
1033  TCleanup clean(this);
1034 
1035  fSelectorClass = 0;
1036  TString wmsg;
1037  TRY {
1038  if (AssertSelector(selector_file) != 0 || !fSelector) {
1039  Error("Process", "cannot assert the selector object");
1040  return -1;
1041  }
1042 
1043  fSelectorClass = fSelector->IsA();
1044  Int_t version = fSelector->Version();
1045  if (version == 0 && IsClient()) fSelector->GetOutputList()->Clear();
1046 
1048 
1049  if (gProofServ)
1051 
1052  fSelStatus = new TStatus;
1054 
1055  fSelector->SetOption(option);
1057 
1058  // If in sequential (0-PROOF) mode validate the data set to get
1059  // the number of entries
1061  if (fTotalEvents < 0 && gProofServ &&
1063  dset->Validate();
1064  dset->Reset();
1065  TDSetElement *e = 0;
1066  while ((e = dset->Next())) {
1067  fTotalEvents += e->GetNum();
1068  }
1069  }
1070 
1071  dset->Reset();
1072 
1073  // Set parameters controlling the iterator behaviour
1074  Int_t useTreeCache = 1;
1075  if (TProof::GetParameter(fInput, "PROOF_UseTreeCache", useTreeCache) == 0) {
1076  if (useTreeCache > -1 && useTreeCache < 2)
1077  gEnv->SetValue("ProofPlayer.UseTreeCache", useTreeCache);
1078  }
1079  Long64_t cacheSize = -1;
1080  if (TProof::GetParameter(fInput, "PROOF_CacheSize", cacheSize) == 0) {
1081  TString sz = TString::Format("%lld", cacheSize);
1082  gEnv->SetValue("ProofPlayer.CacheSize", sz.Data());
1083  }
1084  // Parallel unzipping
1085  Int_t useParallelUnzip = 0;
1086  if (TProof::GetParameter(fInput, "PROOF_UseParallelUnzip", useParallelUnzip) == 0) {
1087  if (useParallelUnzip > -1 && useParallelUnzip < 2)
1088  gEnv->SetValue("ProofPlayer.UseParallelUnzip", useParallelUnzip);
1089  }
1090  // OS file caching (Mac Os X only)
1091  Int_t dontCacheFiles = 0;
1092  if (TProof::GetParameter(fInput, "PROOF_DontCacheFiles", dontCacheFiles) == 0) {
1093  if (dontCacheFiles == 1)
1094  gEnv->SetValue("ProofPlayer.DontCacheFiles", 1);
1095  }
1096  fEvIter = TEventIter::Create(dset, fSelector, first, nentries);
1097 
1098  // Control file object swap
1099  // <how>*10 + <force>
1100  // <how> = 0 end of run
1101  // 1 after each packet
1102  // <force> = 0 no, swap only if memory threshold is reached
1103  // 1 swap in all cases, accordingly to <how>
1104  Int_t opt = 0;
1105  if (TProof::GetParameter(fInput, "PROOF_SavePartialResults", opt) != 0) {
1106  opt = gEnv->GetValue("ProofPlayer.SavePartialResults", 0);
1107  }
1108  fSaveResultsPerPacket = (opt >= 10) ? kTRUE : kFALSE;
1109  fSavePartialResults = (opt%10 > 0) ? kTRUE : kFALSE;
1110  Info("Process", "save partial results? %d per-packet? %d", fSavePartialResults, fSaveResultsPerPacket);
1111 
1112  // Memory threshold for file object swap
1113  Float_t memfrac = gEnv->GetValue("ProofPlayer.SaveMemThreshold", -1.);
1114  if (memfrac > 0.) {
1115  // The threshold is per core
1116  SysInfo_t si;
1117  if (gSystem->GetSysInfo(&si) == 0) {
1118  fSaveMemThreshold = (Long_t) ((memfrac * si.fPhysRam * 1024.) / si.fCpus);
1119  Info("Process", "memory threshold for saving objects to file set to %ld kB",
1121  } else {
1122  Error("Process", "cannot get SysInfo_t (!)");
1123  }
1124  }
1125 
1126  if (version == 0) {
1127  PDB(kLoop,1) Info("Process","Call Begin(0)");
1128  fSelector->Begin(0);
1129  } else {
1130  if (IsClient()) {
1131  // on client (for local run)
1132  PDB(kLoop,1) Info("Process","Call Begin(0)");
1133  fSelector->Begin(0);
1134  }
1136  PDB(kLoop,1) Info("Process","Call SlaveBegin(0)");
1137  fSelector->SlaveBegin(0); // Init is called explicitly
1138  // from GetNextEvent()
1139  }
1140  }
1141 
1142  } CATCH(excode) {
1144  Error("Process","exception %d caught", excode);
1146  return -1;
1147  } ENDTRY;
1148 
1149  // Save the results, if needed, closing the file
1150  if (SavePartialResults(kFALSE) < 0)
1151  Warning("Process", "problems seetting up file-object swapping");
1152 
1153  // Create feedback lists, if required
1154  SetupFeedback();
1155 
1156  if (gMonitoringWriter)
1158 
1159  PDB(kLoop,1)
1160  Info("Process","Looping over Process()");
1161 
1162  // get the byte read counter at the beginning of processing
1165  fProcessedRun = 0;
1166  // force the first monitoring info
1167  if (gMonitoringWriter)
1169 
1170  // Start asynchronous timer to dispatch pending events
1172 
1173  // Loop over range
1174  gAbort = kFALSE;
1175  Long64_t entry;
1178 
1179  TRY {
1180 
1181  Int_t mrc = -1;
1182  // Get the frequency for checking memory consumption and logging information
1183  Long64_t memlogfreq = -1;
1184  if (((mrc = TProof::GetParameter(fInput, "PROOF_MemLogFreq", memlogfreq))) != 0) memlogfreq = -1;
1185  Long64_t singleshot = 1;
1186  Bool_t warnHWMres = kTRUE, warnHWMvir = kTRUE;
1187  TString lastMsg("(unfortunately no detailed info is available about current packet)");
1188 
1189  // Initial memory footprint
1190  if (!CheckMemUsage(singleshot, warnHWMres, warnHWMvir, wmsg)) {
1191  Error("Process", "%s", wmsg.Data());
1192  wmsg.Insert(0, TString::Format("ERROR:%s, after SlaveBegin(), ", gProofServ->GetOrdinal()));
1193  fSelStatus->Add(wmsg.Data());
1194  if (gProofServ) {
1195  gProofServ->SendAsynMessage(wmsg.Data());
1197  }
1200  } else if (!wmsg.IsNull()) {
1201  Warning("Process", "%s", wmsg.Data());
1202  }
1203 
1204  TPair *currentElem = 0;
1205  // The event loop on the worker
1206  Long64_t fst = -1, num;
1207  Long_t maxproctime = -1;
1208  Bool_t newrun = kFALSE;
1209  while ((fEvIter->GetNextPacket(fst, num) != -1) &&
1212  // This is needed by the inflate infrastructure to calculate
1213  // sleeping times
1215 
1216  // Give the possibility to the selector to access additional info in the
1217  // incoming packet
1218  if (dset->Current()) {
1219  if (!currentElem) {
1220  currentElem = new TPair(new TObjString("PROOF_CurrentElement"), dset->Current());
1221  fInput->Add(currentElem);
1222  } else {
1223  if (currentElem->Value() != dset->Current()) {
1224  currentElem->SetValue(dset->Current());
1225  } else if (dset->Current()->TestBit(TDSetElement::kNewRun)) {
1227  }
1228  }
1229  if (dset->Current()->TestBit(TDSetElement::kNewPacket)) {
1230  if (dset->TestBit(TDSet::kEmpty)) {
1231  lastMsg = "check logs for possible stacktrace - last cycle:";
1232  } else {
1233  TDSetElement *elem = dynamic_cast<TDSetElement *>(currentElem->Value());
1234  TString fn = (elem) ? elem->GetFileName() : "<undef>";
1235  lastMsg.Form("while processing dset:'%s', file:'%s'"
1236  " - check logs for possible stacktrace - last event:", dset->GetName(), fn.Data());
1237  }
1238  TProofServ::SetLastMsg(lastMsg);
1239  }
1240  // Set the max proc time, if any
1241  if (dset->Current()->GetMaxProcTime() >= 0.)
1242  maxproctime = (Long_t) (1000 * dset->Current()->GetMaxProcTime());
1243  newrun = (dset->Current()->TestBit(TDSetElement::kNewPacket)) ? kTRUE : kFALSE;
1244  }
1245 
1248  // Setup packet proc time measurement
1249  if (maxproctime > 0) {
1250  if (!fProcTimeTimer) fProcTimeTimer = new TProctimeTimer(this, maxproctime);
1251  fProcTimeTimer->Start(maxproctime, kTRUE); // One shot
1252  if (!fProcTime) fProcTime = new TStopwatch();
1253  fProcTime->Reset(); // Reset counters
1254  }
1255  Long64_t refnum = num;
1256  if (refnum < 0 && maxproctime <= 0) {
1257  wmsg.Form("neither entries nor max proc time specified:"
1258  " risk of infinite loop: processing aborted");
1259  Error("Process", "%s", wmsg.Data());
1260  if (gProofServ) {
1261  wmsg.Insert(0, TString::Format("ERROR:%s, entry:%lld, ",
1263  gProofServ->SendAsynMessage(wmsg.Data());
1264  }
1267  break;
1268  }
1269  while (refnum < 0 || num--) {
1270 
1271  // Did we use all our time?
1273  fProcTime->Stop();
1274  if (!newrun && !TestBit(TProofPlayer::kMaxProcTimeExtended) && refnum > 0) {
1275  // How much are we left with?
1276  Float_t xleft = (refnum > num) ? (Float_t) num / (Float_t) (refnum) : 1.;
1277  if (xleft < 0.2) {
1278  // Give another try, 1.5 times the remaining measured expected time
1279  Long_t mpt = (Long_t) (1500 * num / ((Double_t)(refnum - num) / fProcTime->RealTime()));
1281  fProcTimeTimer->Start(mpt, kTRUE); // One shot
1283  }
1284  }
1286  Info("Process", "max proc time reached (%ld msecs): packet processing stopped:\n%s",
1287  maxproctime, lastMsg.Data());
1288 
1289  break;
1290  }
1291  }
1292 
1293  if (!(!fSelStatus->TestBit(TStatus::kNotOk) &&
1294  fSelector->GetAbort() == TSelector::kContinue)) break;
1295 
1296  // Get the netry number, taking into account entry or event lists
1297  entry = fEvIter->GetEntryNumber(fst);
1298  fst++;
1299 
1300  // Set the last entry
1301  TProofServ::SetLastEntry(entry);
1302 
1303  if (fSelector->Version() == 0) {
1304  PDB(kLoop,3)
1305  Info("Process","Call ProcessCut(%lld)", entry);
1306  if (fSelector->ProcessCut(entry)) {
1307  PDB(kLoop,3)
1308  Info("Process","Call ProcessFill(%lld)", entry);
1309  fSelector->ProcessFill(entry);
1310  }
1311  } else {
1312  PDB(kLoop,3)
1313  Info("Process","Call Process(%lld)", entry);
1314  fSelector->Process(entry);
1317  break;
1318  } else if (fSelector->GetAbort() == TSelector::kAbortFile) {
1319  Info("Process", "packet processing aborted following the"
1320  " selector settings:\n%s", lastMsg.Data());
1323  }
1324  }
1326 
1327  // Check the memory footprint, if required
1328  if (memlogfreq > 0 && (GetEventsProcessed() + fProcessedRun)%memlogfreq == 0) {
1329  if (!CheckMemUsage(memlogfreq, warnHWMres, warnHWMvir, wmsg)) {
1330  Error("Process", "%s", wmsg.Data());
1331  if (gProofServ) {
1332  wmsg.Insert(0, TString::Format("ERROR:%s, entry:%lld, ",
1333  gProofServ->GetOrdinal(), entry));
1334  gProofServ->SendAsynMessage(wmsg.Data());
1335  }
1339  break;
1340  } else {
1341  if (!wmsg.IsNull()) {
1342  Warning("Process", "%s", wmsg.Data());
1343  if (gProofServ) {
1344  wmsg.Insert(0, TString::Format("WARNING:%s, entry:%lld, ",
1345  gProofServ->GetOrdinal(), entry));
1346  gProofServ->SendAsynMessage(wmsg.Data());
1347  }
1348  }
1349  }
1350  }
1354  }
1356  if (fSelStatus->TestBit(TStatus::kNotOk) || gROOT->IsInterrupted()) break;
1357 
1358  // Make sure that the selector abort status is reset
1360  fSelector->Abort("status reset", TSelector::kContinue);
1361  }
1362  }
1363 
1364  } CATCH(excode) {
1365  if (excode == kPEX_STOPPED) {
1366  Info("Process","received stop-process signal");
1368  } else if (excode == kPEX_ABORTED) {
1369  gAbort = kTRUE;
1370  Info("Process","received abort-process signal");
1372  } else {
1373  Error("Process","exception %d caught", excode);
1374  // Perhaps we need a dedicated status code here ...
1375  gAbort = kTRUE;
1377  }
1379  } ENDTRY;
1380 
1381  // Clean-up the envelop for the current element
1382  TPair *currentElem = 0;
1383  if ((currentElem = (TPair *) fInput->FindObject("PROOF_CurrentElement"))) {
1384  if ((currentElem = (TPair *) fInput->Remove(currentElem))) {
1385  delete currentElem->Key();
1386  delete currentElem;
1387  }
1388  }
1389 
1390  // Final memory footprint
1391  Long64_t singleshot = 1;
1392  Bool_t warnHWMres = kTRUE, warnHWMvir = kTRUE;
1393  Bool_t shrc = CheckMemUsage(singleshot, warnHWMres, warnHWMvir, wmsg);
1394  if (!wmsg.IsNull()) Warning("Process", "%s (%s)", wmsg.Data(), shrc ? "warn" : "hwm");
1395 
1396  PDB(kGlobal,2)
1397  Info("Process","%lld events processed", fProgressStatus->GetEntries());
1398 
1399  if (gMonitoringWriter) {
1403  }
1404 
1405  // Stop active timers
1407  if (fStopTimer != 0)
1409  if (fFeedbackTimer != 0)
1410  HandleTimer(0);
1411 
1412  StopFeedback();
1413 
1414  // Save the results, if needed, closing the file
1415  if (SavePartialResults(kTRUE) < 0)
1416  Warning("Process", "problems saving the results to file");
1417 
1419 
1420  // Finalize
1421 
1422  if (fExitStatus != kAborted) {
1423 
1424  TIter nxo(GetOutputList());
1425  TObject *o = 0;
1426  while ((o = nxo())) {
1427  // Special treatment for files
1428  if (o->IsA() == TProofOutputFile::Class()) {
1430  of->Print();
1432  const char *dir = of->GetDir();
1433  if (!dir || (dir && strlen(dir) <= 0)) {
1435  } else if (dir && strlen(dir) > 0) {
1436  TUrl u(dir);
1437  if (!strcmp(u.GetHost(), "localhost") || !strcmp(u.GetHost(), "127.0.0.1") ||
1438  !strcmp(u.GetHost(), "localhost.localdomain")) {
1439  u.SetHost(TUrl(gSystem->HostName()).GetHostFQDN());
1440  of->SetDir(u.GetUrl(kTRUE));
1441  }
1442  of->Print();
1443  }
1444  }
1445  }
1446 
1448 
1450  if (fSelector->Version() == 0) {
1451  PDB(kLoop,1) Info("Process","Call Terminate()");
1452  fSelector->Terminate();
1453  } else {
1454  PDB(kLoop,1) Info("Process","Call SlaveTerminate()");
1457  PDB(kLoop,1) Info("Process","Call Terminate()");
1458  fSelector->Terminate();
1459  }
1460  }
1461  }
1462 
1463  // Add Selector status in the output list so it can be returned to the client as done
1464  // by Tree::Process (see ROOT-748). The status from the various workers will be added.
1465  fOutput->Add(new TParameter<Long64_t>("PROOF_SelectorStatus", (Long64_t) fSelector->GetStatus()));
1466 
1467  if (gProofServ && !gProofServ->IsParallel()) { // put all the canvases onto the output list
1468  TIter nxc(gROOT->GetListOfCanvases());
1469  while (TObject *c = nxc())
1470  fOutput->Add(c);
1471  }
1472  }
1473 
1474  if (gProofServ)
1475  TPerfStats::Stop();
1476 
1477  return 0;
1478 }
1479 
1480 ////////////////////////////////////////////////////////////////////////////////
1481 /// Process specified TDSet on PROOF worker with TSelector object
1482 /// The return value is -1 in case of error and TSelector::GetStatus()
1483 /// in case of success.
1484 
1486  Option_t *option, Long64_t nentries,
1487  Long64_t first)
1488 {
1489  if (!selector) {
1490  Error("Process", "selector object undefiend!");
1491  return -1;
1492  }
1493 
1495  fSelector = selector;
1497  return Process(dset, (const char *)0, option, nentries, first);
1498 }
1499 
1500 ////////////////////////////////////////////////////////////////////////////////
1501 /// Not implemented: meaningful only in the remote player. Returns kFALSE.
1502 
1504 {
1505  return kFALSE;
1506 }
1507 
1508 ////////////////////////////////////////////////////////////////////////////////
1509 /// Check the memory usage, if requested.
1510 /// Return kTRUE if OK, kFALSE if above 95% of at least one between virtual or
1511 /// resident limits are depassed.
1512 
1514  Bool_t &w80v, TString &wmsg)
1515 {
1516  Long64_t processed = GetEventsProcessed() + fProcessedRun;
1517  if (mfreq > 0 && processed%mfreq == 0) {
1518  // Record the memory information
1519  ProcInfo_t pi;
1520  if (!gSystem->GetProcInfo(&pi)){
1521  wmsg = "";
1522  if (gProofServ)
1523  Info("CheckMemUsage|Svc", "Memory %ld virtual %ld resident event %lld",
1524  pi.fMemVirtual, pi.fMemResident, processed);
1525  // Save info in TStatus
1527  // Apply limit on virtual memory, if any: warn if above 80%, stop if above 95% of max
1528  if (TProofServ::GetVirtMemMax() > 0) {
1530  wmsg.Form("using more than %d%% of allowed virtual memory (%ld kB)"
1531  " - STOP processing", (Int_t) (TProofServ::GetMemStop() * 100), pi.fMemVirtual);
1532  return kFALSE;
1533  } else if (pi.fMemVirtual > TProofServ::GetMemHWM() * TProofServ::GetVirtMemMax() && w80v) {
1534  // Refine monitoring
1535  mfreq = 1;
1536  wmsg.Form("using more than %d%% of allowed virtual memory (%ld kB)",
1537  (Int_t) (TProofServ::GetMemHWM() * 100), pi.fMemVirtual);
1538  w80v = kFALSE;
1539  }
1540  }
1541  // Apply limit on resident memory, if any: warn if above 80%, stop if above 95% of max
1542  if (TProofServ::GetResMemMax() > 0) {
1544  wmsg.Form("using more than %d%% of allowed resident memory (%ld kB)"
1545  " - STOP processing", (Int_t) (TProofServ::GetMemStop() * 100), pi.fMemResident);
1546  return kFALSE;
1547  } else if (pi.fMemResident > TProofServ::GetMemHWM() * TProofServ::GetResMemMax() && w80r) {
1548  // Refine monitoring
1549  mfreq = 1;
1550  if (wmsg.Length() > 0) {
1551  wmsg.Form("using more than %d%% of allowed both virtual and resident memory ({%ld,%ld} kB)",
1552  (Int_t) (TProofServ::GetMemHWM() * 100), pi.fMemVirtual, pi.fMemResident);
1553  } else {
1554  wmsg.Form("using more than %d%% of allowed resident memory (%ld kB)",
1555  (Int_t) (TProofServ::GetMemHWM() * 100), pi.fMemResident);
1556  }
1557  w80r = kFALSE;
1558  }
1559  }
1560  // In saving-partial-results mode flag the saving regime when reached to save expensive calls
1561  // to TSystem::GetProcInfo in SavePartialResults
1563  }
1564  }
1565  // Done
1566  return kTRUE;
1567 }
1568 
1569 ////////////////////////////////////////////////////////////////////////////////
1570 /// Finalize query (may not be used in this class).
1571 
1573 {
1574  MayNotUse("Finalize");
1575  return -1;
1576 }
1577 
1578 ////////////////////////////////////////////////////////////////////////////////
1579 /// Finalize query (may not be used in this class).
1580 
1582 {
1583  MayNotUse("Finalize");
1584  return -1;
1585 }
1586 ////////////////////////////////////////////////////////////////////////////////
1587 /// Merge output (may not be used in this class).
1588 
1590 {
1591  MayNotUse("MergeOutput");
1592  return;
1593 }
1594 
1595 ////////////////////////////////////////////////////////////////////////////////
1596 
1598 {
1600  fOutput->Add(olsdm);
1601 }
1602 
1603 ////////////////////////////////////////////////////////////////////////////////
1604 /// Update automatic binning parameters for given object "name".
1605 
1609  Double_t& zmin, Double_t& zmax)
1610 {
1611  if ( fAutoBins == 0 ) {
1612  fAutoBins = new THashList;
1613  }
1614 
1615  TAutoBinVal *val = (TAutoBinVal*) fAutoBins->FindObject(name);
1616 
1617  if ( val == 0 ) {
1618  //look for info in higher master
1619  if (gProofServ && !gProofServ->IsTopMaster()) {
1620  TString key = name;
1621  TProofLimitsFinder::AutoBinFunc(key,xmin,xmax,ymin,ymax,zmin,zmax);
1622  }
1623 
1624  val = new TAutoBinVal(name,xmin,xmax,ymin,ymax,zmin,zmax);
1625  fAutoBins->Add(val);
1626  } else {
1627  val->GetAll(xmin,xmax,ymin,ymax,zmin,zmax);
1628  }
1629 }
1630 
1631 ////////////////////////////////////////////////////////////////////////////////
1632 /// Get next packet (may not be used in this class).
1633 
1635 {
1636  MayNotUse("GetNextPacket");
1637  return 0;
1638 }
1639 
1640 ////////////////////////////////////////////////////////////////////////////////
1641 /// Set up feedback (may not be used in this class).
1642 
1644 {
1645  MayNotUse("SetupFeedback");
1646 }
1647 
1648 ////////////////////////////////////////////////////////////////////////////////
1649 /// Stop feedback (may not be used in this class).
1650 
1652 {
1653  MayNotUse("StopFeedback");
1654 }
1655 
1656 ////////////////////////////////////////////////////////////////////////////////
1657 /// Draw (may not be used in this class).
1658 
1659 Long64_t TProofPlayer::DrawSelect(TDSet * /*set*/, const char * /*varexp*/,
1660  const char * /*selection*/, Option_t * /*option*/,
1661  Long64_t /*nentries*/, Long64_t /*firstentry*/)
1662 {
1663  MayNotUse("DrawSelect");
1664  return -1;
1665 }
1666 
1667 ////////////////////////////////////////////////////////////////////////////////
1668 /// Handle tree header request.
1669 
1671 {
1672  MayNotUse("HandleGetTreeHeader|");
1673 }
1674 
1675 ////////////////////////////////////////////////////////////////////////////////
1676 /// Receive histo from slave.
1677 
1679 {
1680  TObject *obj = mess->ReadObject(mess->GetClass());
1681  if (obj->InheritsFrom(TH1::Class())) {
1682  TH1 *h = (TH1*)obj;
1683  h->SetDirectory(0);
1684  TH1 *horg = (TH1*)gDirectory->GetList()->FindObject(h->GetName());
1685  if (horg)
1686  horg->Add(h);
1687  else
1689  }
1690 }
1691 
1692 ////////////////////////////////////////////////////////////////////////////////
1693 /// Draw the object if it is a canvas.
1694 /// Return 0 in case of success, 1 if it is not a canvas or libProofDraw
1695 /// is not available.
1696 
1698 {
1699  static Int_t (*gDrawCanvasHook)(TObject *) = 0;
1700 
1701  // Load the library the first time
1702  if (!gDrawCanvasHook) {
1703  // Load library needed for graphics ...
1704  TString drawlib = "libProofDraw";
1705  char *p = 0;
1706  if ((p = gSystem->DynamicPathName(drawlib, kTRUE))) {
1707  delete[] p;
1708  if (gSystem->Load(drawlib) != -1) {
1709  // Locate DrawCanvas
1710  Func_t f = 0;
1711  if ((f = gSystem->DynFindSymbol(drawlib,"DrawCanvas")))
1712  gDrawCanvasHook = (Int_t (*)(TObject *))(f);
1713  else
1714  Warning("DrawCanvas", "can't find DrawCanvas");
1715  } else
1716  Warning("DrawCanvas", "can't load %s", drawlib.Data());
1717  } else
1718  Warning("DrawCanvas", "can't locate %s", drawlib.Data());
1719  }
1720  if (gDrawCanvasHook && obj)
1721  return (*gDrawCanvasHook)(obj);
1722  // No drawing hook or object undefined
1723  return 1;
1724 }
1725 
1726 ////////////////////////////////////////////////////////////////////////////////
1727 /// Parse the arguments from var, sel and opt and fill the selector and
1728 /// object name accordingly.
1729 /// Return 0 in case of success, 1 if libProofDraw is not available.
1730 
1731 Int_t TProofPlayer::GetDrawArgs(const char *var, const char *sel, Option_t *opt,
1732  TString &selector, TString &objname)
1733 {
1734  static Int_t (*gGetDrawArgsHook)(const char *, const char *, Option_t *,
1735  TString &, TString &) = 0;
1736 
1737  // Load the library the first time
1738  if (!gGetDrawArgsHook) {
1739  // Load library needed for graphics ...
1740  TString drawlib = "libProofDraw";
1741  char *p = 0;
1742  if ((p = gSystem->DynamicPathName(drawlib, kTRUE))) {
1743  delete[] p;
1744  if (gSystem->Load(drawlib) != -1) {
1745  // Locate GetDrawArgs
1746  Func_t f = 0;
1747  if ((f = gSystem->DynFindSymbol(drawlib,"GetDrawArgs")))
1748  gGetDrawArgsHook = (Int_t (*)(const char *, const char *, Option_t *,
1749  TString &, TString &))(f);
1750  else
1751  Warning("GetDrawArgs", "can't find GetDrawArgs");
1752  } else
1753  Warning("GetDrawArgs", "can't load %s", drawlib.Data());
1754  } else
1755  Warning("GetDrawArgs", "can't locate %s", drawlib.Data());
1756  }
1757  if (gGetDrawArgsHook)
1758  return (*gGetDrawArgsHook)(var, sel, opt, selector, objname);
1759  // No parser hook or object undefined
1760  return 1;
1761 }
1762 
1763 ////////////////////////////////////////////////////////////////////////////////
1764 /// Create/destroy a named canvas for feedback
1765 
1766 void TProofPlayer::FeedBackCanvas(const char *name, Bool_t create)
1767 {
1768  static void (*gFeedBackCanvasHook)(const char *, Bool_t) = 0;
1769 
1770  // Load the library the first time
1771  if (!gFeedBackCanvasHook) {
1772  // Load library needed for graphics ...
1773  TString drawlib = "libProofDraw";
1774  char *p = 0;
1775  if ((p = gSystem->DynamicPathName(drawlib, kTRUE))) {
1776  delete[] p;
1777  if (gSystem->Load(drawlib) != -1) {
1778  // Locate FeedBackCanvas
1779  Func_t f = 0;
1780  if ((f = gSystem->DynFindSymbol(drawlib,"FeedBackCanvas")))
1781  gFeedBackCanvasHook = (void (*)(const char *, Bool_t))(f);
1782  else
1783  Warning("FeedBackCanvas", "can't find FeedBackCanvas");
1784  } else
1785  Warning("FeedBackCanvas", "can't load %s", drawlib.Data());
1786  } else
1787  Warning("FeedBackCanvas", "can't locate %s", drawlib.Data());
1788  }
1789  if (gFeedBackCanvasHook) (*gFeedBackCanvasHook)(name, create);
1790  // No parser hook or object undefined
1791  return;
1792 }
1793 
1794 ////////////////////////////////////////////////////////////////////////////////
1795 /// Return the size in bytes of the cache
1796 
1798 {
1799  if (fEvIter) return fEvIter->GetCacheSize();
1800  return -1;
1801 }
1802 
1803 ////////////////////////////////////////////////////////////////////////////////
1804 /// Return the number of entries in the learning phase
1805 
1807 {
1808  if (fEvIter) return fEvIter->GetLearnEntries();
1809  return -1;
1810 }
1811 
1812 ////////////////////////////////////////////////////////////////////////////////
1813 /// Switch on/off merge timer
1814 
1816 {
1817  if (on) {
1818  if (!fMergeSTW) fMergeSTW = new TStopwatch();
1819  PDB(kGlobal,1)
1820  Info("SetMerging", "ON: mergers: %d", fProof->fMergersCount);
1821  if (fNumMergers <= 0 && fProof->fMergersCount > 0)
1822  fNumMergers = fProof->fMergersCount;
1823  } else if (fMergeSTW) {
1824  fMergeSTW->Stop();
1825  Float_t rt = fMergeSTW->RealTime();
1826  PDB(kGlobal,1)
1827  Info("SetMerging", "OFF: rt: %f, mergers: %d", rt, fNumMergers);
1828  if (fQuery) {
1829  if (!fProof->TestBit(TProof::kIsClient) || fProof->IsLite()) {
1830  // On the master (or in Lite()) we set the merging time and the numebr of mergers
1831  fQuery->SetMergeTime(rt);
1832  fQuery->SetNumMergers(fNumMergers);
1833  } else {
1834  // In a standard client we save the transfer-to-client time
1835  fQuery->SetRecvTime(rt);
1836  }
1837  PDB(kGlobal,2) fQuery->Print("F");
1838  }
1839  }
1840 }
1841 
1842 //------------------------------------------------------------------------------
1843 
1845 
1846 ////////////////////////////////////////////////////////////////////////////////
1847 /// Process the specified TSelector object 'nentries' times.
1848 /// Used to test the PROOF interator mechanism for cycle-driven selectors in a
1849 /// local session.
1850 /// The return value is -1 in case of error and TSelector::GetStatus()
1851 /// in case of success.
1852 
1854  Long64_t nentries, Option_t *option)
1855 {
1856  if (!selector) {
1857  Error("Process", "selector object undefiend!");
1858  return -1;
1859  }
1860 
1861  TDSetProxy *set = new TDSetProxy("", "", "");
1862  set->SetBit(TDSet::kEmpty);
1863  set->SetBit(TDSet::kIsLocal);
1864  Long64_t rc = Process(set, selector, option, nentries);
1865  SafeDelete(set);
1866 
1867  // Done
1868  return rc;
1869 }
1870 
1871 ////////////////////////////////////////////////////////////////////////////////
1872 /// Process the specified TSelector file 'nentries' times.
1873 /// Used to test the PROOF interator mechanism for cycle-driven selectors in a
1874 /// local session.
1875 /// Process specified TDSet on PROOF worker with TSelector object
1876 /// The return value is -1 in case of error and TSelector::GetStatus()
1877 /// in case of success.
1878 
1880  Long64_t nentries, Option_t *option)
1881 {
1882  TDSetProxy *set = new TDSetProxy("", "", "");
1883  set->SetBit(TDSet::kEmpty);
1884  set->SetBit(TDSet::kIsLocal);
1885  Long64_t rc = Process(set, selector, option, nentries);
1886  SafeDelete(set);
1887 
1888  // Done
1889  return rc;
1890 }
1891 
1892 
1893 //------------------------------------------------------------------------------
1894 
1896 
1897 ////////////////////////////////////////////////////////////////////////////////
1898 /// Destructor.
1899 
1901 {
1902  SafeDelete(fOutput); // owns the output list
1903  SafeDelete(fOutputLists);
1904 
1905  // Objects stored in maps are already deleted when merging the feedback
1906  SafeDelete(fFeedbackLists);
1907  SafeDelete(fPacketizer);
1908 
1909  SafeDelete(fProcessMessage);
1910 }
1911 
1912 ////////////////////////////////////////////////////////////////////////////////
1913 /// Init the packetizer
1914 /// Return 0 on success (fPacketizer is correctly initialized), -1 on failure.
1915 
1917  Long64_t first, const char *defpackunit,
1918  const char *defpackdata)
1919 {
1920  SafeDelete(fPacketizer);
1921  PDB(kGlobal,1) Info("Process","Enter");
1922  fDSet = dset;
1924 
1925  // This is done here to pickup on the fly changes
1926  Int_t honebyone = 1;
1927  if (TProof::GetParameter(fInput, "PROOF_MergeTH1OneByOne", honebyone) != 0)
1928  honebyone = gEnv->GetValue("ProofPlayer.MergeTH1OneByOne", 1);
1929  fMergeTH1OneByOne = (honebyone == 1) ? kTRUE : kFALSE;
1930 
1931  Bool_t noData = dset->TestBit(TDSet::kEmpty) ? kTRUE : kFALSE;
1932 
1933  TString packetizer;
1934  TList *listOfMissingFiles = 0;
1935 
1936  TMethodCall callEnv;
1937  TClass *cl;
1938  noData = dset->TestBit(TDSet::kEmpty) ? kTRUE : kFALSE;
1939 
1940  if (noData) {
1941 
1942  if (TProof::GetParameter(fInput, "PROOF_Packetizer", packetizer) != 0)
1943  packetizer = defpackunit;
1944  else
1945  Info("InitPacketizer", "using alternate packetizer: %s", packetizer.Data());
1946 
1947  // Get linked to the related class
1948  cl = TClass::GetClass(packetizer);
1949  if (cl == 0) {
1950  Error("InitPacketizer", "class '%s' not found", packetizer.Data());
1952  return -1;
1953  }
1954 
1955  // Init the constructor
1956  callEnv.InitWithPrototype(cl, cl->GetName(),"TList*,Long64_t,TList*,TProofProgressStatus*");
1957  if (!callEnv.IsValid()) {
1958  Error("InitPacketizer",
1959  "cannot find correct constructor for '%s'", cl->GetName());
1961  return -1;
1962  }
1963  callEnv.ResetParam();
1964  callEnv.SetParam((Long_t) fProof->GetListOfActiveSlaves());
1965  callEnv.SetParam((Long64_t) nentries);
1966  callEnv.SetParam((Long_t) fInput);
1967  callEnv.SetParam((Long_t) fProgressStatus);
1968 
1969  } else if (dset->TestBit(TDSet::kMultiDSet)) {
1970 
1971  // We have to process many datasets in one go, keeping them separate
1972  if (fProof->GetRunStatus() != TProof::kRunning) {
1973  // We have been asked to stop
1974  Error("InitPacketizer", "received stop/abort request");
1976  return -1;
1977  }
1978 
1979  // The multi packetizer
1980  packetizer = "TPacketizerMulti";
1981 
1982  // Get linked to the related class
1983  cl = TClass::GetClass(packetizer);
1984  if (cl == 0) {
1985  Error("InitPacketizer", "class '%s' not found", packetizer.Data());
1987  return -1;
1988  }
1989 
1990  // Init the constructor
1991  callEnv.InitWithPrototype(cl, cl->GetName(),"TDSet*,TList*,Long64_t,Long64_t,TList*,TProofProgressStatus*");
1992  if (!callEnv.IsValid()) {
1993  Error("InitPacketizer", "cannot find correct constructor for '%s'", cl->GetName());
1995  return -1;
1996  }
1997  callEnv.ResetParam();
1998  callEnv.SetParam((Long_t) dset);
1999  callEnv.SetParam((Long_t) fProof->GetListOfActiveSlaves());
2000  callEnv.SetParam((Long64_t) first);
2001  callEnv.SetParam((Long64_t) nentries);
2002  callEnv.SetParam((Long_t) fInput);
2003  callEnv.SetParam((Long_t) fProgressStatus);
2004 
2005  // We are going to test validity during the packetizer initialization
2006  dset->SetBit(TDSet::kValidityChecked);
2007  dset->ResetBit(TDSet::kSomeInvalid);
2008 
2009  } else {
2010 
2011  // Lookup - resolve the end-point urls to optmize the distribution.
2012  // The lookup was previously called in the packetizer's constructor.
2013  // A list for the missing files may already have been added to the
2014  // output list; otherwise, if needed it will be created inside
2015  if ((listOfMissingFiles = (TList *)fInput->FindObject("MissingFiles"))) {
2016  // Move it to the output list
2017  fInput->Remove(listOfMissingFiles);
2018  } else {
2019  listOfMissingFiles = new TList;
2020  }
2021  // Do the lookup; we only skip it if explicitly requested so.
2022  TString lkopt;
2023  if (TProof::GetParameter(fInput, "PROOF_LookupOpt", lkopt) != 0 || lkopt != "none")
2024  dset->Lookup(kTRUE, &listOfMissingFiles);
2025 
2026  if (fProof->GetRunStatus() != TProof::kRunning) {
2027  // We have been asked to stop
2028  Error("InitPacketizer", "received stop/abort request");
2030  return -1;
2031  }
2032 
2033  if (!(dset->GetListOfElements()) ||
2034  !(dset->GetListOfElements()->GetSize())) {
2035  if (gProofServ)
2036  gProofServ->SendAsynMessage("InitPacketizer: No files from the data set were found - Aborting");
2037  Error("InitPacketizer", "No files from the data set were found - Aborting");
2039  if (listOfMissingFiles) {
2040  listOfMissingFiles->SetOwner();
2041  fOutput->Remove(listOfMissingFiles);
2042  SafeDelete(listOfMissingFiles);
2043  }
2044  return -1;
2045  }
2046 
2047  if (TProof::GetParameter(fInput, "PROOF_Packetizer", packetizer) != 0)
2048  // Using standard packetizer TAdaptivePacketizer
2049  packetizer = defpackdata;
2050  else
2051  Info("InitPacketizer", "using alternate packetizer: %s", packetizer.Data());
2052 
2053  // Get linked to the related class
2054  cl = TClass::GetClass(packetizer);
2055  if (cl == 0) {
2056  Error("InitPacketizer", "class '%s' not found", packetizer.Data());
2058  return -1;
2059  }
2060 
2061  // Init the constructor
2062  callEnv.InitWithPrototype(cl, cl->GetName(),"TDSet*,TList*,Long64_t,Long64_t,TList*,TProofProgressStatus*");
2063  if (!callEnv.IsValid()) {
2064  Error("InitPacketizer", "cannot find correct constructor for '%s'", cl->GetName());
2066  return -1;
2067  }
2068  callEnv.ResetParam();
2069  callEnv.SetParam((Long_t) dset);
2070  callEnv.SetParam((Long_t) fProof->GetListOfActiveSlaves());
2071  callEnv.SetParam((Long64_t) first);
2072  callEnv.SetParam((Long64_t) nentries);
2073  callEnv.SetParam((Long_t) fInput);
2074  callEnv.SetParam((Long_t) fProgressStatus);
2075 
2076  // We are going to test validity during the packetizer initialization
2077  dset->SetBit(TDSet::kValidityChecked);
2078  dset->ResetBit(TDSet::kSomeInvalid);
2079  }
2080 
2081  // Get an instance of the packetizer
2082  Long_t ret = 0;
2083  callEnv.Execute(ret);
2084  if ((fPacketizer = (TVirtualPacketizer *)ret) == 0) {
2085  Error("InitPacketizer", "cannot construct '%s'", cl->GetName());
2087  return -1;
2088  }
2089 
2090  if (!fPacketizer->IsValid()) {
2091  Error("InitPacketizer",
2092  "instantiated packetizer object '%s' is invalid", cl->GetName());
2094  SafeDelete(fPacketizer);
2095  return -1;
2096  }
2097 
2098  // In multi mode retrieve the list of missing files
2099  if (!noData && dset->TestBit(TDSet::kMultiDSet)) {
2100  if ((listOfMissingFiles = (TList *) fInput->FindObject("MissingFiles"))) {
2101  // Remove it; it will be added to the output list
2102  fInput->Remove(listOfMissingFiles);
2103  }
2104  }
2105 
2106  if (!noData) {
2107  // Add invalid elements to the list of missing elements
2108  TDSetElement *elem = 0;
2109  if (dset->TestBit(TDSet::kSomeInvalid)) {
2110  TIter nxe(dset->GetListOfElements());
2111  while ((elem = (TDSetElement *)nxe())) {
2112  if (!elem->GetValid()) {
2113  if (!listOfMissingFiles)
2114  listOfMissingFiles = new TList;
2115  listOfMissingFiles->Add(elem->GetFileInfo(dset->GetType()));
2116  dset->Remove(elem, kFALSE);
2117  }
2118  }
2119  // The invalid elements have been removed
2121  }
2122 
2123  // Record the list of missing or invalid elements in the output list
2124  if (listOfMissingFiles && listOfMissingFiles->GetSize() > 0) {
2125  TIter missingFiles(listOfMissingFiles);
2126  TString msg;
2127  if (gDebug > 0) {
2128  TFileInfo *fi = 0;
2129  while ((fi = (TFileInfo *) missingFiles.Next())) {
2130  if (fi->GetCurrentUrl()) {
2131  msg = Form("File not found: %s - skipping!",
2132  fi->GetCurrentUrl()->GetUrl());
2133  } else {
2134  msg = Form("File not found: %s - skipping!", fi->GetName());
2135  }
2137  }
2138  }
2139  // Make sure it will be sent back
2140  if (!GetOutput("MissingFiles")) {
2141  listOfMissingFiles->SetName("MissingFiles");
2142  AddOutputObject(listOfMissingFiles);
2143  }
2144  TStatus *tmpStatus = (TStatus *)GetOutput("PROOF_Status");
2145  if (!tmpStatus) AddOutputObject((tmpStatus = new TStatus()));
2146 
2147  // Estimate how much data are missing
2148  Int_t ngood = dset->GetListOfElements()->GetSize();
2149  Int_t nbad = listOfMissingFiles->GetSize();
2150  Double_t xb = Double_t(nbad) / Double_t(ngood + nbad);
2151  msg = Form(" About %.2f %c of the requested files (%d out of %d) were missing or unusable; details in"
2152  " the 'missingFiles' list", xb * 100., '%', nbad, nbad + ngood);
2153  tmpStatus->Add(msg.Data());
2154  msg = Form(" +++\n"
2155  " +++ About %.2f %c of the requested files (%d out of %d) are missing or unusable; details in"
2156  " the 'MissingFiles' list\n"
2157  " +++", xb * 100., '%', nbad, nbad + ngood);
2159  } else {
2160  // Cleanup
2161  SafeDelete(listOfMissingFiles);
2162  }
2163  }
2164 
2165  // Done
2166  return 0;
2167 }
2168 
2169 ////////////////////////////////////////////////////////////////////////////////
2170 /// Process specified TDSet on PROOF.
2171 /// This method is called on client and on the PROOF master.
2172 /// The return value is -1 in case of an error and TSelector::GetStatus() in
2173 /// in case of success.
2174 
2175 Long64_t TProofPlayerRemote::Process(TDSet *dset, const char *selector_file,
2176  Option_t *option, Long64_t nentries,
2177  Long64_t first)
2178 {
2179  PDB(kGlobal,1) Info("Process", "Enter");
2180 
2181  fDSet = dset;
2183 
2184  if (!fProgressStatus) {
2185  Error("Process", "No progress status");
2186  return -1;
2187  }
2189 
2190  // delete fOutput;
2191  if (!fOutput)
2192  fOutput = new THashList;
2193  else
2194  fOutput->Clear();
2195 
2196  SafeDelete(fFeedbackLists);
2197 
2198  if (fProof->IsMaster()){
2200  } else {
2202  }
2203 
2204  TStopwatch elapsed;
2205 
2206  // Define filename
2207  TString fn;
2208  fSelectorFileName = selector_file;
2209 
2210  if (fCreateSelObj) {
2211  if(!SendSelector(selector_file)) return -1;
2212  fn = gSystem->BaseName(selector_file);
2213  } else {
2214  fn = selector_file;
2215  }
2216 
2217  TMessage mesg(kPROOF_PROCESS);
2218 
2219  // Parse option
2220  Bool_t sync = (fProof->GetQueryMode(option) == TProof::kSync);
2221 
2222  TList *inputtmp = 0; // List of temporary input objects
2223  TDSet *set = dset;
2224  if (fProof->IsMaster()) {
2225 
2226  PDB(kPacketizer,1) Info("Process","Create Proxy TDSet");
2227  set = new TDSetProxy( dset->GetType(), dset->GetObjName(),
2228  dset->GetDirectory() );
2229  if (dset->TestBit(TDSet::kEmpty))
2230  set->SetBit(TDSet::kEmpty);
2231 
2232  if (InitPacketizer(dset, nentries, first, "TPacketizerUnit", "TPacketizer") != 0) {
2233  Error("Process", "cannot init the packetizer");
2235  return -1;
2236  }
2237 
2238  // Reset start, this is now managed by the packetizer
2239  first = 0;
2240 
2241  // Negative memlogfreq disable checks.
2242  // If 0 is passed we try to have 100 messages about memory
2243  // Otherwise we use the frequency passed.
2244  Int_t mrc = -1;
2245  Long64_t memlogfreq = -1, mlf;
2246  if (gSystem->Getenv("PROOF_MEMLOGFREQ")) {
2247  TString clf(gSystem->Getenv("PROOF_MEMLOGFREQ"));
2248  if (clf.IsDigit()) { memlogfreq = clf.Atoi(); mrc = 0; }
2249  }
2250  if ((mrc = TProof::GetParameter(fProof->GetInputList(), "PROOF_MemLogFreq", mlf)) == 0) memlogfreq = mlf;
2251  if (memlogfreq == 0) {
2252  memlogfreq = fPacketizer->GetTotalEntries()/(fProof->GetParallel()*100);
2253  if (memlogfreq <= 0) memlogfreq = 1;
2254  }
2255  if (mrc == 0) fProof->SetParameter("PROOF_MemLogFreq", memlogfreq);
2256 
2257 
2258  // Send input data, if any
2259  TString emsg;
2260  if (TProof::SendInputData(fQuery, fProof, emsg) != 0)
2261  Warning("Process", "could not forward input data: %s", emsg.Data());
2262 
2263  // Attach to the transient histogram with the assigned packets, if required
2264  if (fInput->FindObject("PROOF_StatsHist") != 0) {
2265  if (!(fProcPackets = (TH1I *) fOutput->FindObject("PROOF_ProcPcktHist"))) {
2266  Warning("Process", "could not attach to histogram 'PROOF_ProcPcktHist'");
2267  } else {
2268  PDB(kLoop,1)
2269  Info("Process", "attached to histogram 'PROOF_ProcPcktHist' to record"
2270  " packets being processed");
2271  }
2272  }
2273 
2274  } else {
2275 
2276  // Check whether we have to enforce the use of submergers
2277  if (gEnv->Lookup("Proof.UseMergers") && !fInput->FindObject("PROOF_UseMergers")) {
2278  Int_t smg = gEnv->GetValue("Proof.UseMergers",-1);
2279  if (smg >= 0) {
2280  fInput->Add(new TParameter<Int_t>("PROOF_UseMergers", smg));
2281  if (gEnv->Lookup("Proof.MergersByHost")) {
2282  Int_t mbh = gEnv->GetValue("Proof.MergersByHost",0);
2283  if (mbh != 0) {
2284  // Administrator settings have the priority
2285  TObject *o = 0;
2286  if ((o = fInput->FindObject("PROOF_MergersByHost"))) { fInput->Remove(o); delete o; }
2287  fInput->Add(new TParameter<Int_t>("PROOF_MergersByHost", mbh));
2288  }
2289  }
2290  }
2291  }
2292 
2293  // For a new query clients should make sure that the temporary
2294  // output list is empty
2295  if (fOutputLists) {
2296  fOutputLists->Delete();
2297  delete fOutputLists;
2298  fOutputLists = 0;
2299  }
2300 
2301  if (!sync) {
2302  gSystem->RedirectOutput(fProof->fLogFileName);
2303  Printf(" ");
2304  Info("Process","starting new query");
2305  }
2306 
2307  // Define fSelector in Client if processing with filename
2308  if (fCreateSelObj) {
2310  if (!(fSelector = TSelector::GetSelector(selector_file))) {
2311  if (!sync)
2312  gSystem->RedirectOutput(0);
2313  return -1;
2314  }
2315  }
2316 
2317  fSelectorClass = 0;
2318  fSelectorClass = fSelector->IsA();
2319 
2320  // Add fSelector to inputlist if processing with object
2321  if (!fCreateSelObj) {
2322  // In any input list was set into the selector move it to the PROOF
2323  // input list, because we do not want to stream the selector one
2324  if (fSelector->GetInputList() && fSelector->GetInputList()->GetSize() > 0) {
2325  TIter nxi(fSelector->GetInputList());
2326  TObject *o = 0;
2327  while ((o = nxi())) {
2328  if (!fInput->FindObject(o)) {
2329  fInput->Add(o);
2330  if (!inputtmp) {
2331  inputtmp = new TList;
2332  inputtmp->SetOwner(kFALSE);
2333  }
2334  inputtmp->Add(o);
2335  }
2336  }
2337  }
2338  fInput->Add(fSelector);
2339  }
2340  // Set the input list for initialization
2342  fSelector->SetOption(option);
2344 
2345  PDB(kLoop,1) Info("Process","Call Begin(0)");
2346  fSelector->Begin(0);
2347 
2348  // Reset the input list to avoid double streaming and related problems (saving
2349  // the TQueryResult)
2351 
2352  // Send large input data objects, if any
2353  fProof->SendInputDataFile();
2354 
2355  if (!sync)
2356  gSystem->RedirectOutput(0);
2357  }
2358 
2359  TCleanup clean(this);
2360  SetupFeedback();
2361 
2362  TString opt = option;
2363 
2364  // Old servers need a dedicated streamer
2365  if (fProof->fProtocol < 13)
2366  dset->SetWriteV3(kTRUE);
2367 
2368  // Workers will get the entry ranges from the packetizer
2369  Long64_t num = (gProofServ && gProofServ->IsMaster() && gProofServ->IsParallel()) ? -1 : nentries;
2370  Long64_t fst = (gProofServ && gProofServ->IsMaster() && gProofServ->IsParallel()) ? -1 : first;
2371 
2372  // Entry- or Event- list ?
2373  TEntryList *enl = (!fProof->IsMaster()) ? dynamic_cast<TEntryList *>(set->GetEntryList())
2374  : (TEntryList *)0;
2375  TEventList *evl = (!fProof->IsMaster() && !enl) ? dynamic_cast<TEventList *>(set->GetEntryList())
2376  : (TEventList *)0;
2377  if (fProof->fProtocol > 14) {
2378  if (fProcessMessage) delete fProcessMessage;
2379  fProcessMessage = new TMessage(kPROOF_PROCESS);
2380  mesg << set << fn << fInput << opt << num << fst << evl << sync << enl;
2381  (*fProcessMessage) << set << fn << fInput << opt << num << fst << evl << sync << enl;
2382  } else {
2383  mesg << set << fn << fInput << opt << num << fst << evl << sync;
2384  if (enl)
2385  // Not supported remotely
2386  Warning("Process","entry lists not supported by the server");
2387  }
2388 
2389  // Reset the merging progress information
2390  fProof->ResetMergePrg();
2391 
2392  Int_t nb = fProof->Broadcast(mesg);
2393  PDB(kGlobal,1) Info("Process", "Broadcast called: %d workers notified", nb);
2394  if (fProof->IsLite()) fProof->fNotIdle += nb;
2395 
2396  // Reset streamer choice
2397  if (fProof->fProtocol < 13)
2398  dset->SetWriteV3(kFALSE);
2399 
2400  // Redirect logs from master to special log frame
2401  if (IsClient())
2402  fProof->fRedirLog = kTRUE;
2403 
2404  if (!IsClient()){
2405  // Signal the start of finalize for the memory log grepping
2406  Info("Process|Svc", "Start merging Memory information");
2407  }
2408 
2409  if (!sync) {
2410  if (IsClient()) {
2411  // Asynchronous query: just make sure that asynchronous input
2412  // is enabled and return the prompt
2413  PDB(kGlobal,1) Info("Process","Asynchronous processing:"
2414  " activating CollectInputFrom");
2415  fProof->Activate();
2416 
2417  // Receive the acknowledgement and query sequential number
2418  fProof->Collect();
2419 
2420  return fProof->fSeqNum;
2421 
2422  } else {
2423  PDB(kGlobal,1) Info("Process","Calling Collect");
2424  fProof->Collect();
2425 
2426  HandleTimer(0); // force an update of final result
2427  // This forces a last call to TPacketizer::HandleTimer via the second argument
2428  // (the first is ignored). This is needed when some events were skipped so that
2429  // the total number of entries is not the one requested. The packetizer has no
2430  // way in such a case to understand that processing is finished: it must be told.
2431  if (fPacketizer) {
2432  fPacketizer->StopProcess(kFALSE, kTRUE);
2433  // The progress timer will now stop itself at the next call
2434  fPacketizer->SetBit(TVirtualPacketizer::kIsDone);
2435  // Store process info
2436  elapsed.Stop();
2437  if (fQuery)
2438  fQuery->SetProcessInfo(0, 0., fPacketizer->GetBytesRead(),
2439  fPacketizer->GetInitTime(),
2440  elapsed.RealTime());
2441  }
2442  StopFeedback();
2443 
2444  return Finalize(kFALSE,sync);
2445  }
2446  } else {
2447 
2448  PDB(kGlobal,1) Info("Process","Synchronous processing: calling Collect");
2449  fProof->Collect();
2450  if (!(fProof->IsSync())) {
2451  // The server required to switch to asynchronous mode
2452  Info("Process", "switching to the asynchronous mode ...");
2453  return fProof->fSeqNum;
2454  }
2455 
2456  // Restore prompt logging, for clients (Collect leaves things as they were
2457  // at the time it was called)
2458  if (IsClient())
2459  fProof->fRedirLog = kFALSE;
2460 
2461  if (!IsClient()) {
2462  // Force an update of final result
2463  HandleTimer(0);
2464  // This forces a last call to TPacketizer::HandleTimer via the second argument
2465  // (the first is ignored). This is needed when some events were skipped so that
2466  // the total number of entries is not the one requested. The packetizer has no
2467  // way in such a case to understand that processing is finished: it must be told.
2468  if (fPacketizer) {
2469  fPacketizer->StopProcess(kFALSE, kTRUE);
2470  // The progress timer will now stop itself at the next call
2471  fPacketizer->SetBit(TVirtualPacketizer::kIsDone);
2472  // Store process info
2473  if (fQuery)
2474  fQuery->SetProcessInfo(0, 0., fPacketizer->GetBytesRead(),
2475  fPacketizer->GetInitTime(),
2476  fPacketizer->GetProcTime());
2477  }
2478  } else {
2479  // Set the input list: maybe required at termination
2481  }
2482  StopFeedback();
2483 
2484  Long64_t rc = -1;
2486  rc = Finalize(kFALSE,sync);
2487 
2488  // Remove temporary input objects, if any
2489  if (inputtmp) {
2490  TIter nxi(inputtmp);
2491  TObject *o = 0;
2492  while ((o = nxi())) fInput->Remove(o);
2493  SafeDelete(inputtmp);
2494  }
2495 
2496  // Done
2497  return rc;
2498  }
2499 }
2500 
2501 ////////////////////////////////////////////////////////////////////////////////
2502 /// Process specified TDSet on PROOF.
2503 /// This method is called on client and on the PROOF master.
2504 /// The return value is -1 in case of an error and TSelector::GetStatus() in
2505 /// in case of success.
2506 
2508  Option_t *option, Long64_t nentries,
2509  Long64_t first)
2510 {
2511  if (!selector) {
2512  Error("Process", "selector object undefined");
2513  return -1;
2514  }
2515 
2516  // Define fSelector in Client
2517  if (IsClient() && (selector != fSelector)) {
2519  fSelector = selector;
2520  }
2521 
2523  Long64_t rc = Process(dset, selector->ClassName(), option, nentries, first);
2524  fCreateSelObj = kTRUE;
2525 
2526  // Done
2527  return rc;
2528 }
2529 
2530 ////////////////////////////////////////////////////////////////////////////////
2531 /// Prepares the given list of new workers to join a progressing process.
2532 /// Returns kTRUE on success, kFALSE otherwise.
2533 
2535 {
2536  if (!fProcessMessage || !fProof || !fPacketizer) {
2537  Error("Process", "Should not happen: fProcessMessage=%p fProof=%p fPacketizer=%p",
2538  fProcessMessage, fProof, fPacketizer);
2539  return kFALSE;
2540  }
2541 
2542  if (!workers || !fProof->IsMaster()) {
2543  Error("Process", "Invalid call");
2544  return kFALSE;
2545  }
2546 
2547  PDB(kGlobal, 1)
2548  Info("Process", "Preparing %d new worker(s) to process", workers->GetEntries());
2549 
2550  // Sends the file associated to the TSelector, if necessary
2551  if (fCreateSelObj) {
2552  PDB(kGlobal, 2)
2553  Info("Process", "Sending selector file %s", fSelectorFileName.Data());
2554  if(!SendSelector(fSelectorFileName.Data())) {
2555  Error("Process", "Problems in sending selector file %s", fSelectorFileName.Data());
2556  return kFALSE;
2557  }
2558  }
2559 
2560  if (fProof->IsLite()) fProof->fNotIdle += workers->GetSize();
2561 
2562  PDB(kGlobal, 2)
2563  Info("Process", "Adding new workers to the packetizer");
2564  if (fPacketizer->AddWorkers(workers) == -1) {
2565  Error("Process", "Cannot add new workers to the packetizer!");
2566  return kFALSE; // TODO: make new wrks inactive
2567  }
2568 
2569  PDB(kGlobal, 2)
2570  Info("Process", "Broadcasting process message to new workers");
2571  fProof->Broadcast(*fProcessMessage, workers);
2572 
2573  // Don't call Collect(): we came here from a global Collect() already which
2574  // will take care of new workers as well
2575 
2576  return kTRUE;
2577 
2578 }
2579 
2580 ////////////////////////////////////////////////////////////////////////////////
2581 /// Merge output in files
2582 
2584 {
2585  PDB(kOutput,1) Info("MergeOutputFiles", "enter: fOutput size: %d", fOutput->GetSize());
2586  PDB(kOutput,2) fOutput->ls();
2587 
2588  TList *rmList = 0;
2589  if (fMergeFiles) {
2590  TIter nxo(fOutput);
2591  TObject *o = 0;
2592  TProofOutputFile *pf = 0;
2593  while ((o = nxo())) {
2594  if ((pf = dynamic_cast<TProofOutputFile*>(o))) {
2595 
2596  PDB(kOutput,2) pf->Print();
2597 
2598  if (pf->IsMerge()) {
2599 
2600  // Point to the merger
2601  Bool_t localMerge = (pf->GetTypeOpt() == TProofOutputFile::kLocal) ? kTRUE : kFALSE;
2602  TFileMerger *filemerger = pf->GetFileMerger(localMerge);
2603  if (!filemerger) {
2604  Error("MergeOutputFiles", "file merger is null in TProofOutputFile! Protocol error?");
2605  pf->Print();
2606  continue;
2607  }
2608  // If only one instance the list in the merger is not yet created: do it now
2609  if (!pf->IsMerged()) {
2610  PDB(kOutput,2) pf->Print();
2611  TString fileLoc = TString::Format("%s/%s", pf->GetDir(), pf->GetFileName());
2612  filemerger->AddFile(fileLoc);
2613  }
2614  // Datadir
2615  TString ddir, ddopts;
2616  if (gProofServ) {
2617  ddir.Form("%s/", gProofServ->GetDataDir());
2619  }
2620  // Set the output file
2621  TString outfile(pf->GetOutputFileName());
2622  if (outfile.Contains("<datadir>/")) {
2623  outfile.ReplaceAll("<datadir>/", ddir.Data());
2624  if (!ddopts.IsNull())
2625  outfile += TString::Format("?%s", ddopts.Data());
2626  pf->SetOutputFileName(outfile);
2627  }
2628  if ((gProofServ && gProofServ->IsTopMaster()) || (fProof && fProof->IsLite())) {
2630  TString srv;
2632  TUrl usrv(srv);
2633  Bool_t localFile = kFALSE;
2634  if (pf->IsRetrieve()) {
2635  // This file will be retrieved by the client: we created it in the data dir
2636  // and save the file URL on the client in the title
2637  if (outfile.BeginsWith("client:")) outfile.Replace(0, 7, "");
2638  TString bn = gSystem->BaseName(TUrl(outfile.Data(), kTRUE).GetFile());
2639  // The output file path on the master
2640  outfile.Form("%s%s", ddir.Data(), bn.Data());
2641  // Save the client path in the title if not defined yet
2642  if (strlen(pf->GetTitle()) <= 0) pf->SetTitle(bn);
2643  // The file is local
2644  localFile = kTRUE;
2645  } else {
2646  // Check if the file is on the master or elsewhere
2647  if (outfile.BeginsWith("master:")) outfile.Replace(0, 7, "");
2648  // Check locality
2649  TUrl uof(outfile.Data(), kTRUE);
2650  TString lfn;
2651  ftyp = TFile::GetType(uof.GetUrl(), "RECREATE", &lfn);
2652  if (ftyp == TFile::kLocal && !srv.IsNull()) {
2653  // Check if is a different server
2654  if (uof.GetPort() > 0 && usrv.GetPort() > 0 &&
2655  usrv.GetPort() != uof.GetPort()) ftyp = TFile::kNet;
2656  }
2657  // If it is really local set the file name
2658  if (ftyp == TFile::kLocal) outfile = lfn;
2659  // The file maybe local
2660  if (ftyp == TFile::kLocal || ftyp == TFile::kFile) localFile = kTRUE;
2661  }
2662  // The remote output file name (the one to be used by the client)
2663  TString outfilerem(outfile);
2664  // For local files we add the local server
2665  if (localFile) {
2666  // Remove prefix, if any, if included and if Xrootd
2667  TProofServ::FilterLocalroot(outfilerem, srv);
2668  outfilerem.Insert(0, srv);
2669  }
2670  // Save the new remote output filename
2671  pf->SetOutputFileName(outfilerem);
2672  // Align the filename
2673  pf->SetFileName(gSystem->BaseName(outfilerem));
2674  }
2675  if (!filemerger->OutputFile(outfile)) {
2676  Error("MergeOutputFiles", "cannot open the output file");
2677  continue;
2678  }
2679  // Merge
2680  PDB(kSubmerger,2) filemerger->PrintFiles("");
2681  if (!filemerger->Merge()) {
2682  Error("MergeOutputFiles", "cannot merge the output files");
2683  continue;
2684  }
2685  // Remove the files
2686  TList *fileList = filemerger->GetMergeList();
2687  if (fileList) {
2688  TIter next(fileList);
2689  TObjString *url = 0;
2690  while((url = (TObjString*)next())) {
2691  TUrl u(url->GetName());
2692  if (!strcmp(u.GetProtocol(), "file")) {
2693  gSystem->Unlink(u.GetFile());
2694  } else {
2695  gSystem->Unlink(url->GetName());
2696  }
2697  }
2698  }
2699  // Reset the merger
2700  filemerger->Reset();
2701 
2702  } else {
2703 
2704  // If not yet merged (for example when having only 1 active worker,
2705  // we need to create the dataset by calling Merge on an effectively empty list
2706  if (!pf->IsMerged()) {
2707  TList dumlist;
2708  dumlist.Add(new TNamed("dum", "dum"));
2709  dumlist.SetOwner(kTRUE);
2710  pf->Merge(&dumlist);
2711  }
2712  // Point to the dataset
2714  if (!fc) {
2715  Error("MergeOutputFiles", "file collection is null in TProofOutputFile! Protocol error?");
2716  pf->Print();
2717  continue;
2718  }
2719  // Add the collection to the output list for registration and/or to be returned
2720  // to the client
2721  fOutput->Add(fc);
2722  // Do not cleanup at destruction
2723  pf->ResetFileCollection();
2724  // Tell the main thread to register this dataset, if needed
2725  if (pf->IsRegister()) {
2726  TString opt;
2727  if ((pf->GetTypeOpt() & TProofOutputFile::kOverwrite)) opt += "O";
2728  if ((pf->GetTypeOpt() & TProofOutputFile::kVerify)) opt += "V";
2729  if (!fOutput->FindObject("PROOFSERV_RegisterDataSet"))
2730  fOutput->Add(new TNamed("PROOFSERV_RegisterDataSet", ""));
2731  TString tag = TString::Format("DATASET_%s", pf->GetTitle());
2732  fOutput->Add(new TNamed(tag, opt));
2733  }
2734  // Remove this object from the output list and schedule it for distruction
2735  fOutput->Remove(pf);
2736  if (!rmList) rmList = new TList;
2737  rmList->Add(pf);
2738  PDB(kOutput,2) fOutput->Print();
2739  }
2740  }
2741  }
2742  }
2743 
2744  // Remove objects scheduled for removal
2745  if (rmList && rmList->GetSize() > 0) {
2746  TIter nxo(rmList);
2747  TObject *o = 0;
2748  while((o = nxo())) {
2749  fOutput->Remove(o);
2750  }
2751  rmList->SetOwner(kTRUE);
2752  delete rmList;
2753  }
2754 
2755  PDB(kOutput,1) Info("MergeOutputFiles", "done!");
2756 
2757  // Done
2758  return kTRUE;
2759 }
2760 
2761 
2762 ////////////////////////////////////////////////////////////////////////////////
2763 /// Set the selector's data members:
2764 /// find the mapping of data members to otuput list entries in the output list
2765 /// and apply it.
2766 
2768 {
2771  if (!olsdm) {
2772  PDB(kOutput,1) Warning("SetSelectorDataMembersFromOutputList",
2773  "failed to find map object in output list!");
2774  return;
2775  }
2776 
2777  olsdm->SetDataMembers(fSelector);
2778 }
2779 
2780 ////////////////////////////////////////////////////////////////////////////////
2781 
2783 {
2784  // Finalize a query.
2785  // Returns -1 in case of an error, 0 otherwise.
2786 
2787  if (IsClient()) {
2788  if (fOutputLists == 0) {
2789  if (force)
2790  if (fQuery)
2791  return fProof->Finalize(Form("%s:%s", fQuery->GetTitle(),
2792  fQuery->GetName()), force);
2793  } else {
2794  // Make sure the all objects are in the output list
2795  PDB(kGlobal,1) Info("Finalize","Calling Merge Output to finalize the output list");
2796  MergeOutput();
2797  }
2798  }
2799 
2800  Long64_t rv = 0;
2801  if (fProof->IsMaster()) {
2802 
2803  // Fill information for monitoring and stop it
2804  TStatus *status = (TStatus *) fOutput->FindObject("PROOF_Status");
2805  if (!status) {
2806  // The query was aborted: let's add some info in the output list
2807  status = new TStatus();
2808  fOutput->Add(status);
2809  TString emsg = TString::Format("Query aborted after %lld entries", GetEventsProcessed());
2810  status->Add(emsg);
2811  }
2812  status->SetExitStatus((Int_t) GetExitStatus());
2813 
2814  PDB(kOutput,1) Info("Finalize","Calling Merge Output");
2815  // Some objects (e.g. histos in autobin) may not have been merged yet
2816  // do it now
2817  MergeOutput();
2818 
2819  fOutput->SetOwner();
2820 
2821  // Add the active-wrks-vs-proctime info from the packetizer
2822  if (fPacketizer) {
2823  TObject *pperf = (TObject *) fPacketizer->GetProgressPerf(kTRUE);
2824  if (pperf) fOutput->Add(pperf);
2825  TList *parms = fPacketizer->GetConfigParams(kTRUE);
2826  if (parms) {
2827  TIter nxo(parms);
2828  TObject *o = 0;
2829  while ((o = nxo())) fOutput->Add(o);
2830  }
2831 
2832  // If other invalid elements were found during processing, add them to the
2833  // list of missing elements
2834  TDSetElement *elem = 0;
2835  if (fPacketizer->GetFailedPackets()) {
2836  TString type = (fPacketizer->TestBit(TVirtualPacketizer::kIsTree)) ? "TTree" : "";
2837  TList *listOfMissingFiles = (TList *) fOutput->FindObject("MissingFiles");
2838  if (!listOfMissingFiles) {
2839  listOfMissingFiles = new TList;
2840  listOfMissingFiles->SetName("MissingFiles");
2841  }
2842  TIter nxe(fPacketizer->GetFailedPackets());
2843  while ((elem = (TDSetElement *)nxe()))
2844  listOfMissingFiles->Add(elem->GetFileInfo(type));
2845  if (!fOutput->FindObject(listOfMissingFiles)) fOutput->Add(listOfMissingFiles);
2846  }
2847  }
2848 
2849  TPerfStats::Stop();
2850  // Save memory usage on master
2851  Long_t vmaxmst, rmaxmst;
2852  TPerfStats::GetMemValues(vmaxmst, rmaxmst);
2853  status->SetMemValues(vmaxmst, rmaxmst, kTRUE);
2854 
2856 
2857  } else {
2858  if (fExitStatus != kAborted) {
2859 
2860  if (!sync) {
2861  // Reinit selector (with multi-sessioning we must do this until
2862  // TSelector::GetSelector() is optimized to i) avoid reloading of an
2863  // unchanged selector and ii) invalidate existing instances of
2864  // reloaded selector)
2865  if (ReinitSelector(fQuery) == -1) {
2866  Info("Finalize", "problems reinitializing selector \"%s\"",
2867  fQuery->GetSelecImp()->GetName());
2868  return -1;
2869  }
2870  }
2871 
2872  if (fPacketizer)
2873  if (TList *failedPackets = fPacketizer->GetFailedPackets()) {
2874  fPacketizer->SetFailedPackets(0);
2875  failedPackets->SetName("FailedPackets");
2876  AddOutputObject(failedPackets);
2877 
2878  TStatus *status = (TStatus *)GetOutput("PROOF_Status");
2879  if (!status) AddOutputObject((status = new TStatus()));
2880  status->Add("Some packets were not processed! Check the the"
2881  " 'FailedPackets' list in the output list");
2882  }
2883 
2884  // Some input parameters may be needed in Terminate
2886 
2888  if (output) {
2889  TIter next(fOutput);
2890  while(TObject* obj = next()) {
2891  if (fProof->IsParallel() || DrawCanvas(obj) == 1)
2892  // Either parallel or not a canvas or not able to display it:
2893  // just add to the list
2894  output->Add(obj);
2895  }
2896  } else {
2897  Warning("Finalize", "undefined output list in the selector! Protocol error?");
2898  }
2899 
2900  // We need to do this because the output list can be modified in TSelector::Terminate
2901  // in a way to invalidate existing objects; so we clean the links when still valid and
2902  // we re-copy back later
2904  fOutput->Clear("nodelete");
2905 
2906  // Map output objects to selector members
2907  SetSelectorDataMembersFromOutputList();
2908 
2909  PDB(kLoop,1) Info("Finalize","Call Terminate()");
2910  // This is the end of merging
2911  SetMerging(kFALSE);
2912  // We measure the merge time
2913  fProof->fQuerySTW.Reset();
2914  // Call Terminate now
2915  fSelector->Terminate();
2916 
2917  rv = fSelector->GetStatus();
2918 
2919  // Copy the output list back and clean the selector's list
2920  TIter it(output);
2921  while(TObject* o = it()) {
2922  fOutput->Add(o);
2923  }
2924 
2925  // Save the output list in the current query, if any
2926  if (fQuery) {
2928  // Set in finalized state (cannot be done twice)
2929  fQuery->SetFinalized();
2930  } else {
2931  Warning("Finalize","current TQueryResult object is undefined!");
2932  }
2933 
2934  if (!fCreateSelObj) {
2937  if (output) output->Remove(fSelector);
2938  fSelector = 0;
2939  }
2940 
2941  // We have transferred copy of the output objects in TQueryResult,
2942  // so now we can cleanup the selector, making sure that we do not
2943  // touch the output objects
2944  if (output) { output->SetOwner(kFALSE); output->Clear("nodelete"); }
2946 
2947  // Delete fOutput (not needed anymore, cannot be finalized twice),
2948  // making sure that the objects saved in TQueryResult are not deleted
2950  fOutput->Clear("nodelete");
2952 
2953  } else {
2954 
2955  // Cleanup
2956  fOutput->SetOwner();
2958  if (!fCreateSelObj) fSelector = 0;
2959  }
2960  }
2961  PDB(kGlobal,1) Info("Process","exit");
2962 
2963  if (!IsClient()) {
2964  Info("Finalize", "finalization on %s finished", gProofServ->GetPrefix());
2965  }
2966  fProof->FinalizationDone();
2967 
2968  return rv;
2969 }
2970 
2971 ////////////////////////////////////////////////////////////////////////////////
2972 /// Finalize the results of a query already processed.
2973 
2975 {
2976  PDB(kGlobal,1) Info("Finalize(TQueryResult *)","Enter");
2977 
2978  if (!IsClient()) {
2979  Info("Finalize(TQueryResult *)",
2980  "method to be executed only on the clients");
2981  return -1;
2982  }
2983 
2984  if (!qr) {
2985  Info("Finalize(TQueryResult *)", "query undefined");
2986  return -1;
2987  }
2988 
2989  if (qr->IsFinalized()) {
2990  Info("Finalize(TQueryResult *)", "query already finalized");
2991  return -1;
2992  }
2993 
2994  // Reset the list
2995  if (!fOutput)
2996  fOutput = new THashList;
2997  else
2998  fOutput->Clear();
2999 
3000  // Make sure that the temporary output list is empty
3001  if (fOutputLists) {
3002  fOutputLists->Delete();
3003  delete fOutputLists;
3004  fOutputLists = 0;
3005  }
3006 
3007  // Re-init the selector
3008  gSystem->RedirectOutput(fProof->fLogFileName);
3009 
3010  // Import the output list
3011  TList *tmp = (TList *) qr->GetOutputList();
3012  if (!tmp) {
3013  gSystem->RedirectOutput(0);
3014  Info("Finalize(TQueryResult *)", "outputlist is empty");
3015  return -1;
3016  }
3017  TList *out = fOutput;
3018  if (fProof->fProtocol < 11)
3019  out = new TList;
3020  TIter nxo(tmp);
3021  TObject *o = 0;
3022  while ((o = nxo()))
3023  out->Add(o->Clone());
3024 
3025  // Adopts the list
3026  if (fProof->fProtocol < 11) {
3027  out->SetOwner();
3028  StoreOutput(out);
3029  }
3030  gSystem->RedirectOutput(0);
3031 
3032  SetSelectorDataMembersFromOutputList();
3033 
3034  // Finalize it
3035  SetCurrentQuery(qr);
3036  Long64_t rc = Finalize();
3038 
3039  return rc;
3040 }
3041 
3042 ////////////////////////////////////////////////////////////////////////////////
3043 /// Send the selector file(s) to master or worker nodes.
3044 
3045 Bool_t TProofPlayerRemote::SendSelector(const char* selector_file)
3046 {
3047  // Check input
3048  if (!selector_file) {
3049  Info("SendSelector", "Invalid input: selector (file) name undefined");
3050  return kFALSE;
3051  }
3052 
3053  if (!strchr(gSystem->BaseName(selector_file), '.')) {
3054  if (gDebug > 1)
3055  Info("SendSelector", "selector name '%s' does not contain a '.':"
3056  " nothing to send, it will be loaded from a library", selector_file);
3057  return kTRUE;
3058  }
3059 
3060  // Extract the fine name first
3061  TString selec = selector_file;
3062  TString aclicMode;
3063  TString arguments;
3064  TString io;
3065  selec = gSystem->SplitAclicMode(selec, aclicMode, arguments, io);
3066 
3067  // Expand possible envs or '~'
3068  gSystem->ExpandPathName(selec);
3069 
3070  // Update the macro path
3072  TString np(gSystem->DirName(selec));
3073  if (!np.IsNull()) {
3074  np += ":";
3075  if (!mp.BeginsWith(np) && !mp.Contains(":"+np)) {
3076  Int_t ip = (mp.BeginsWith(".:")) ? 2 : 0;
3077  mp.Insert(ip, np);
3078  TROOT::SetMacroPath(mp);
3079  if (gDebug > 0)
3080  Info("SendSelector", "macro path set to '%s'", TROOT::GetMacroPath());
3081  }
3082  }
3083 
3084  // Header file
3085  TString header = selec;
3086  header.Remove(header.Last('.'));
3087  header += ".h";
3088  if (gSystem->AccessPathName(header, kReadPermission)) {
3089  TString h = header;
3090  header.Remove(header.Last('.'));
3091  header += ".hh";
3092  if (gSystem->AccessPathName(header, kReadPermission)) {
3093  Info("SendSelector",
3094  "header file not found: tried: %s %s", h.Data(), header.Data());
3095  return kFALSE;
3096  }
3097  }
3098 
3099  // Send files now
3100  if (fProof->SendFile(selec, (TProof::kBinary | TProof::kForward | TProof::kCp | TProof::kCpBin)) == -1) {
3101  Info("SendSelector", "problems sending implementation file %s", selec.Data());
3102  return kFALSE;
3103  }
3104  if (fProof->SendFile(header, (TProof::kBinary | TProof::kForward | TProof::kCp)) == -1) {
3105  Info("SendSelector", "problems sending header file %s", header.Data());
3106  return kFALSE;
3107  }
3108 
3109  return kTRUE;
3110 }
3111 
3112 ////////////////////////////////////////////////////////////////////////////////
3113 /// Merge objects in output the lists.
3114 
3116 {
3117  PDB(kOutput,1) Info("MergeOutput","Enter");
3118 
3119  TObject *obj = 0;
3120  if (fOutputLists) {
3121 
3122  TIter next(fOutputLists);
3123 
3124  TList *list;
3125  while ( (list = (TList *) next()) ) {
3126 
3127  if (!(obj = fOutput->FindObject(list->GetName()))) {
3128  obj = list->First();
3129  list->Remove(obj);
3130  fOutput->Add(obj);
3131  }
3132 
3133  if ( list->IsEmpty() ) continue;
3134 
3135  TMethodCall callEnv;
3136  if (obj->IsA())
3137  callEnv.InitWithPrototype(obj->IsA(), "Merge", "TCollection*");
3138  if (callEnv.IsValid()) {
3139  callEnv.SetParam((Long_t) list);
3140  callEnv.Execute(obj);
3141  } else {
3142  // No Merge interface, return individual objects
3143  while ( (obj = list->First()) ) {
3144  fOutput->Add(obj);
3145  list->Remove(obj);
3146  }
3147  }
3148  }
3149  SafeDelete(fOutputLists);
3150 
3151  } else {
3152 
3153  PDB(kOutput,1) Info("MergeOutput","fOutputLists empty");
3154  }
3155 
3156  if (!IsClient() || fProof->IsLite()) {
3157  // Merge the output files created on workers, if any
3158  MergeOutputFiles();
3159  }
3160 
3161  // If there are TProofOutputFile objects we have to make sure that the internal
3162  // information is consistent for the cases where this object is going to be merged
3163  // again (e.g. when using submergers or in a multi-master setup). This may not be
3164  // the case because the first coming in is taken as reference and it has the
3165  // internal dir and raw dir of the originating worker.
3166  TString key;
3167  TNamed *nm = 0;
3168  TList rmlist;
3169  TIter nxo(fOutput);
3170  while ((obj = nxo())) {
3171  TProofOutputFile *pf = dynamic_cast<TProofOutputFile *>(obj);
3172  if (pf) {
3173  if (gProofServ) {
3174  PDB(kOutput,2) Info("MergeOutput","found TProofOutputFile '%s'", obj->GetName());
3175  TString dir(pf->GetOutputFileName());
3176  PDB(kOutput,2) Info("MergeOutput","outputfilename: '%s'", dir.Data());
3177  // The dir
3178  if (dir.Last('/') != kNPOS) dir.Remove(dir.Last('/')+1);
3179  PDB(kOutput,2) Info("MergeOutput","dir: '%s'", dir.Data());
3180  pf->SetDir(dir);
3181  // The raw dir; for xrootd based system we include the 'localroot', if any
3182  TUrl u(dir);
3183  dir = u.GetFile();
3184  TString pfx = gEnv->GetValue("Path.Localroot","");
3185  if (!pfx.IsNull() &&
3186  (!strcmp(u.GetProtocol(), "root") || !strcmp(u.GetProtocol(), "xrd")))
3187  dir.Insert(0, pfx);
3188  PDB(kOutput,2) Info("MergeOutput","rawdir: '%s'", dir.Data());
3189  pf->SetDir(dir, kTRUE);
3190  // The worker ordinal
3192  // The saved output file name, if any
3193  key.Form("PROOF_OutputFileName_%s", pf->GetFileName());
3194  if ((nm = (TNamed *) fOutput->FindObject(key.Data()))) {
3195  pf->SetOutputFileName(nm->GetTitle());
3196  rmlist.Add(nm);
3198  pf->SetOutputFileName(0);
3200  }
3201  // The filename (order is important to exclude '.merger' from the key)
3202  dir = pf->GetFileName();
3204  dir += ".merger";
3205  pf->SetMerged(kFALSE);
3206  } else {
3207  if (dir.EndsWith(".merger")) dir.Remove(dir.Last('.'));
3208  }
3209  pf->SetFileName(dir);
3210  } else if (fProof->IsLite()) {
3211  // The ordinal
3212  pf->SetWorkerOrdinal("0");
3213  // The dir
3214  pf->SetDir(gSystem->DirName(pf->GetOutputFileName()));
3215  // The filename and raw dir
3216  TUrl u(pf->GetOutputFileName(), kTRUE);
3217  pf->SetFileName(gSystem->BaseName(u.GetFile()));
3218  pf->SetDir(gSystem->DirName(u.GetFile()), kTRUE);
3219  // Notify the output path
3220  Printf("\nOutput file: %s", pf->GetOutputFileName());
3221  }
3222  } else {
3223  PDB(kOutput,2) Info("MergeOutput","output object '%s' is not a TProofOutputFile", obj->GetName());
3224  }
3225  }
3226 
3227  // Remove temporary objects from fOutput
3228  if (rmlist.GetSize() > 0) {
3229  TIter nxrm(&rmlist);
3230  while ((obj = nxrm()))
3231  fOutput->Remove(obj);
3232  rmlist.SetOwner(kTRUE);
3233  }
3234 
3235  // If requested (typically in case of submerger to count possible side-effects in that process)
3236  // save the measured memory usage
3237  if (saveMemValues) {
3238  TPerfStats::Stop();
3239  // Save memory usage on master
3240  Long_t vmaxmst, rmaxmst;
3241  TPerfStats::GetMemValues(vmaxmst, rmaxmst);
3242  TStatus *status = (TStatus *) fOutput->FindObject("PROOF_Status");
3243  if (status) status->SetMemValues(vmaxmst, rmaxmst, kFALSE);
3244  }
3245 
3246  PDB(kOutput,1) fOutput->Print();
3247  PDB(kOutput,1) Info("MergeOutput","leave (%d object(s))", fOutput->GetSize());
3248 }
3249 
3250 ////////////////////////////////////////////////////////////////////////////////
3251 /// Progress signal.
3252 
3254 {
3255  if (IsClient()) {
3256  fProof->Progress(total, processed);
3257  } else {
3258  // Send to the previous tier
3260  m << total << processed;
3261  gProofServ->GetSocket()->Send(m);
3262  }
3263 }
3264 
3265 ////////////////////////////////////////////////////////////////////////////////
3266 /// Progress signal.
3267 
3269  Long64_t bytesread,
3270  Float_t initTime, Float_t procTime,
3271  Float_t evtrti, Float_t mbrti)
3272 {
3273  PDB(kGlobal,1)
3274  Info("Progress","%lld %lld %lld %f %f %f %f", total, processed, bytesread,
3275  initTime, procTime, evtrti, mbrti);
3276 
3277  if (IsClient()) {
3278  fProof->Progress(total, processed, bytesread, initTime, procTime, evtrti, mbrti);
3279  } else {
3280  // Send to the previous tier
3282  m << total << processed << bytesread << initTime << procTime << evtrti << mbrti;
3283  gProofServ->GetSocket()->Send(m);
3284  }
3285 }
3286 
3287 ////////////////////////////////////////////////////////////////////////////////
3288 /// Progress signal.
3289 
3291 {
3292  if (pi) {
3293  PDB(kGlobal,1)
3294  Info("Progress","%lld %lld %lld %f %f %f %f %d %f", pi->fTotal, pi->fProcessed, pi->fBytesRead,
3295  pi->fInitTime, pi->fProcTime, pi->fEvtRateI, pi->fMBRateI,
3296  pi->fActWorkers, pi->fEffSessions);
3297 
3298  if (IsClient()) {
3299  fProof->Progress(pi->fTotal, pi->fProcessed, pi->fBytesRead,
3300  pi->fInitTime, pi->fProcTime,
3301  pi->fEvtRateI, pi->fMBRateI,
3302  pi->fActWorkers, pi->fTotSessions, pi->fEffSessions);
3303  } else {
3304  // Send to the previous tier
3306  m << pi;
3307  gProofServ->GetSocket()->Send(m);
3308  }
3309  } else {
3310  Warning("Progress","TProofProgressInfo object undefined!");
3311  }
3312 }
3313 
3314 
3315 ////////////////////////////////////////////////////////////////////////////////
3316 /// Feedback signal.
3317 
3319 {
3320  fProof->Feedback(objs);
3321 }
3322 
3323 ////////////////////////////////////////////////////////////////////////////////
3324 /// Stop process after this event.
3325 
3327 {
3328  if (fPacketizer != 0)
3329  fPacketizer->StopProcess(abort, kFALSE);
3330  if (abort == kTRUE)
3332  else
3334 }
3335 
3336 ////////////////////////////////////////////////////////////////////////////////
3337 /// Incorporate the received object 'obj' into the output list fOutput.
3338 /// The latter is created if not existing.
3339 /// This method short cuts 'StoreOutput + MergeOutput' optimizing the memory
3340 /// consumption.
3341 /// Returns -1 in case of error, 1 if the object has been merged into another
3342 /// one (so that its ownership has not been taken and can be deleted), and 0
3343 /// otherwise.
3344 
3346 {
3347  PDB(kOutput,1)
3348  Info("AddOutputObject","Enter: %p (%s)", obj, obj ? obj->ClassName() : "undef");
3349 
3350  // We must something to process
3351  if (!obj) {
3352  PDB(kOutput,1) Info("AddOutputObject","Invalid input (obj == 0x0)");
3353  return -1;
3354  }
3355 
3356  // Create the output list, if not yet done
3357  if (!fOutput)
3358  fOutput = new THashList;
3359 
3360  // Flag about merging
3361  Bool_t merged = kTRUE;
3362 
3363  // Process event lists first
3364  TList *elists = dynamic_cast<TList *> (obj);
3365  if (elists && !strcmp(elists->GetName(), "PROOF_EventListsList")) {
3366 
3367  // Create a global event list, result of merging the event lists
3368  // coresponding to the various data set elements
3369  TEventList *evlist = new TEventList("PROOF_EventList");
3370 
3371  // Iterate the list of event list segments
3372  TIter nxevl(elists);
3373  TEventList *evl = 0;
3374  while ((evl = dynamic_cast<TEventList *> (nxevl()))) {
3375 
3376  // Find the file offset (fDSet is the current TDSet instance)
3377  // locating the element by name
3378  TIter nxelem(fDSet->GetListOfElements());
3379  TDSetElement *elem = 0;
3380  while ((elem = dynamic_cast<TDSetElement *> (nxelem()))) {
3381  if (!strcmp(elem->GetFileName(), evl->GetName()))
3382  break;
3383  }
3384  if (!elem) {
3385  Error("AddOutputObject", "Found an event list for %s, but no object with"
3386  " the same name in the TDSet", evl->GetName());
3387  continue;
3388  }
3389  Long64_t offset = elem->GetTDSetOffset();
3390 
3391  // Shift the list by the number of first event in that file
3392  Long64_t *arr = evl->GetList();
3393  Int_t num = evl->GetN();
3394  if (arr && offset > 0)
3395  for (Int_t i = 0; i < num; i++)
3396  arr[i] += offset;
3397 
3398  // Add to the global event list
3399  evlist->Add(evl);
3400  }
3401 
3402  // Incorporate the resulting global list in fOutput
3403  SetLastMergingMsg(evlist);
3404  Incorporate(evlist, fOutput, merged);
3405  NotifyMemory(evlist);
3406 
3407  // Delete the global list if merged
3408  if (merged)
3409  SafeDelete(evlist);
3410 
3411  // The original object has been transformed in something else; we do
3412  // not have ownership on it
3413  return 1;
3414  }
3415 
3416  // Check if we need to merge files
3417  TProofOutputFile *pf = dynamic_cast<TProofOutputFile*>(obj);
3418  if (pf) {
3419  fMergeFiles = kTRUE;
3420  if (!IsClient() || fProof->IsLite()) {
3421  if (pf->IsMerge()) {
3422  Bool_t hasfout = (pf->GetOutputFileName() &&
3423  strlen(pf->GetOutputFileName()) > 0 &&
3425  Bool_t setfout = (!hasfout || TestBit(TVirtualProofPlayer::kIsSubmerger)) ? kTRUE : kFALSE;
3426  if (setfout) {
3427 
3428  TString ddir, ddopts;
3429  if (gProofServ) {
3430  ddir.Form("%s/", gProofServ->GetDataDir());
3431  if (gProofServ->GetDataDirOpts()) ddopts = gProofServ->GetDataDirOpts();
3432  }
3433  // Set the output file
3434  TString outfile(pf->GetOutputFileName());
3435  outfile.ReplaceAll("<datadir>/", ddir.Data());
3436  if (!ddopts.IsNull()) outfile += TString::Format("?%s", ddopts.Data());
3437  pf->SetOutputFileName(outfile);
3438 
3439  if (gProofServ) {
3440  // If submerger, save first the existing filename, if any
3441  if (TestBit(TVirtualProofPlayer::kIsSubmerger) && hasfout) {
3442  TString key = TString::Format("PROOF_OutputFileName_%s", pf->GetFileName());
3443  if (!fOutput->FindObject(key.Data()))
3444  fOutput->Add(new TNamed(key.Data(), pf->GetOutputFileName()));
3445  }
3446  TString of;
3448  if (of.IsNull()) {
3449  // Assume an xroot server running on the machine
3450  of.Form("root://%s/", gSystem->HostName());
3451  if (gSystem->Getenv("XRDPORT")) {
3452  TString sp(gSystem->Getenv("XRDPORT"));
3453  if (sp.IsDigit())
3454  of.Form("root://%s:%s/", gSystem->HostName(), sp.Data());
3455  }
3456  }
3457  TString sessionPath(gProofServ->GetSessionDir());
3458  TProofServ::FilterLocalroot(sessionPath, of);
3459  of += TString::Format("%s/%s", sessionPath.Data(), pf->GetFileName());
3461  if (!of.EndsWith(".merger")) of += ".merger";
3462  } else {
3463  if (of.EndsWith(".merger")) of.Remove(of.Last('.'));
3464  }
3465  pf->SetOutputFileName(of);
3466  }
3467  }
3468  // Notify
3469  PDB(kOutput, 1) pf->Print();
3470  }
3471  } else {
3472  // On clients notify the output path
3473  Printf("Output file: %s", pf->GetOutputFileName());
3474  }
3475  }
3476 
3477  // For other objects we just run the incorporation procedure
3478  SetLastMergingMsg(obj);
3479  Incorporate(obj, fOutput, merged);
3480  NotifyMemory(obj);
3481 
3482  // We are done
3483  return (merged ? 1 : 0);
3484 }
3485 
3486 ////////////////////////////////////////////////////////////////////////////////
3487 /// Control output redirection to TProof::fLogFileW
3488 
3490 {
3491  if (on && fProof && fProof->fLogFileW) {
3492  TProofServ::SetErrorHandlerFile(fProof->fLogFileW);
3493  fErrorHandler = SetErrorHandler(TProofServ::ErrorHandler);
3494  } else if (!on) {
3495  if (fErrorHandler) {
3497  SetErrorHandler(fErrorHandler);
3498  }
3499  }
3500 }
3501 
3502 ////////////////////////////////////////////////////////////////////////////////
3503 /// Incorporate the content of the received output list 'out' into the final
3504 /// output list fOutput. The latter is created if not existing.
3505 /// This method short cuts 'StoreOutput + MergeOutput' limiting the memory
3506 /// consumption.
3507 
3509 {
3510  PDB(kOutput,1) Info("AddOutput","Enter");
3511 
3512  // We must something to process
3513  if (!out) {
3514  PDB(kOutput,1) Info("AddOutput","Invalid input (out == 0x0)");
3515  return;
3516  }
3517 
3518  // Create the output list, if not yet done
3519  if (!fOutput)
3520  fOutput = new THashList;
3521 
3522  // Process event lists first
3523  Bool_t merged = kTRUE;
3524  TList *elists = dynamic_cast<TList *> (out->FindObject("PROOF_EventListsList"));
3525  if (elists) {
3526 
3527  // Create a global event list, result of merging the event lists
3528  // corresponding to the various data set elements
3529  TEventList *evlist = new TEventList("PROOF_EventList");
3530 
3531  // Iterate the list of event list segments
3532  TIter nxevl(elists);
3533  TEventList *evl = 0;
3534  while ((evl = dynamic_cast<TEventList *> (nxevl()))) {
3535 
3536  // Find the file offset (fDSet is the current TDSet instance)
3537  // locating the element by name
3538  TIter nxelem(fDSet->GetListOfElements());
3539  TDSetElement *elem = 0;
3540  while ((elem = dynamic_cast<TDSetElement *> (nxelem()))) {
3541  if (!strcmp(elem->GetFileName(), evl->GetName()))
3542  break;
3543  }
3544  if (!elem) {
3545  Error("AddOutput", "Found an event list for %s, but no object with"
3546  " the same name in the TDSet", evl->GetName());
3547  continue;
3548  }
3549  Long64_t offset = elem->GetTDSetOffset();
3550 
3551  // Shift the list by the number of first event in that file
3552  Long64_t *arr = evl->GetList();
3553  Int_t num = evl->GetN();
3554  if (arr && offset > 0)
3555  for (Int_t i = 0; i < num; i++)
3556  arr[i] += offset;
3557 
3558  // Add to the global event list
3559  evlist->Add(evl);
3560  }
3561 
3562  // Remove and delete the events lists object to avoid spoiling iteration
3563  // during next steps
3564  out->Remove(elists);
3565  delete elists;
3566 
3567  // Incorporate the resulting global list in fOutput
3568  SetLastMergingMsg(evlist);
3569  Incorporate(evlist, fOutput, merged);
3570  NotifyMemory(evlist);
3571  }
3572 
3573  // Iterate on the remaining objects in the received list
3574  TIter nxo(out);
3575  TObject *obj = 0;
3576  while ((obj = nxo())) {
3577  SetLastMergingMsg(obj);
3578  Incorporate(obj, fOutput, merged);
3579  // If not merged, drop from the temporary list, as the ownership
3580  // passes to fOutput
3581  if (!merged)
3582  out->Remove(obj);
3583  NotifyMemory(obj);
3584  }
3585 
3586  // Done
3587  return;
3588 }
3589 
3590 ////////////////////////////////////////////////////////////////////////////////
3591 /// Printout the memory record after merging object 'obj'
3592 /// This record is used by the memory monitor
3593 
3595 {
3596  if (fProof && (!IsClient() || fProof->IsLite())){
3597  ProcInfo_t pi;
3598  if (!gSystem->GetProcInfo(&pi)){
3599  // For PROOF-Lite we redirect this output to a the open log file so that the
3600  // memory monitor can pick these messages up
3601  RedirectOutput(fProof->IsLite());
3602  Info("NotifyMemory|Svc", "Memory %ld virtual %ld resident after merging object %s",
3603  pi.fMemVirtual, pi.fMemResident, obj->GetName());
3604  RedirectOutput(0);
3605  }
3606  // Record also values for monitoring
3608  }
3609 }
3610 
3611 ////////////////////////////////////////////////////////////////////////////////
3612 /// Set the message to be notified in case of exception
3613 
3615 {
3616  TString lastMsg = TString::Format("while merging object '%s'", obj->GetName());
3617  TProofServ::SetLastMsg(lastMsg);
3618 }
3619 
3620 ////////////////////////////////////////////////////////////////////////////////
3621 /// Incorporate object 'newobj' in the list 'outlist'.
3622 /// The object is merged with an object of the same name already existing in
3623 /// the list, or just added.
3624 /// The boolean merged is set to kFALSE when the object is just added to 'outlist';
3625 /// this happens if the Merge() method does not exist or if a object named as 'obj'
3626 /// is not already in the list. If the obj is not 'merged' than it should not be
3627 /// deleted, unless outlist is not owner of its objects.
3628 /// Return 0 on success, -1 on error.
3629 
3631 {
3632  merged = kTRUE;
3633 
3634  PDB(kOutput,1)
3635  Info("Incorporate", "enter: obj: %p (%s), list: %p",
3636  newobj, newobj ? newobj->ClassName() : "undef", outlist);
3637 
3638  // The object and list must exist
3639  if (!newobj || !outlist) {
3640  Error("Incorporate","Invalid inputs: obj: %p, list: %p", newobj, outlist);
3641  return -1;
3642  }
3643 
3644  // Special treatment for histograms in autobin mode
3645  Bool_t specialH =
3646  (!fProof || !fProof->TestBit(TProof::kIsClient) || fProof->IsLite()) ? kTRUE : kFALSE;
3647  if (specialH && newobj->InheritsFrom(TH1::Class())) {
3648  if (!HandleHistogram(newobj, merged)) {
3649  if (merged) {
3650  PDB(kOutput,1) Info("Incorporate", "histogram object '%s' merged", newobj->GetName());
3651  } else {
3652  PDB(kOutput,1) Info("Incorporate", "histogram object '%s' added to the"
3653  " appropriate list for delayed merging", newobj->GetName());
3654  }
3655  return 0;
3656  }
3657  }
3658 
3659  // Check if an object with the same name exists already
3660  TObject *obj = outlist->FindObject(newobj->GetName());
3661 
3662  // If no, add the new object and return
3663  if (!obj) {
3664  outlist->Add(newobj);
3665  merged = kFALSE;
3666  // Done
3667  return 0;
3668  }
3669 
3670  // Locate the Merge(TCollection *) method
3671  TMethodCall callEnv;
3672  if (obj->IsA())
3673  callEnv.InitWithPrototype(obj->IsA(), "Merge", "TCollection*");
3674  if (callEnv.IsValid()) {
3675  // Found: put the object in a one-element list
3676  static TList *xlist = new TList;
3677  xlist->Add(newobj);
3678  // Call the method
3679  callEnv.SetParam((Long_t) xlist);
3680  callEnv.Execute(obj);
3681  // Ready for next call
3682  xlist->Clear();
3683  } else {
3684  // Not found: return individual objects
3685  outlist->Add(newobj);
3686  merged = kFALSE;
3687  }
3688 
3689  // Done
3690  return 0;
3691 }
3692 
3693 ////////////////////////////////////////////////////////////////////////////////
3694 /// Low statistic histograms need a special treatment when using autobin
3695 
3697 {
3698  TH1 *h = dynamic_cast<TH1 *>(obj);
3699  if (!h) {
3700  // Not an histo
3701  return obj;
3702  }
3703 
3704  // This is only used if we return (TObject *)0 and there is only one case
3705  // when we set this to kTRUE
3706  merged = kFALSE;
3707 
3708  // Does is still needs binning ?
3709  Bool_t tobebinned = (h->GetBuffer()) ? kTRUE : kFALSE;
3710 
3711  // Number of entries
3712  Int_t nent = h->GetBufferLength();
3713  PDB(kOutput,2) Info("HandleHistogram", "h:%s ent:%d, buffer size: %d",
3714  h->GetName(), nent, h->GetBufferSize());
3715 
3716  // Attach to the list in the outputlists, if any
3717  TList *list = 0;
3718  if (!fOutputLists) {
3719  PDB(kOutput,2) Info("HandleHistogram", "create fOutputLists");
3720  fOutputLists = new TList;
3721  fOutputLists->SetOwner();
3722  }
3723  list = (TList *) fOutputLists->FindObject(h->GetName());
3724 
3725  TH1 *href = 0;
3726  if (tobebinned) {
3727 
3728  // The histogram needs to be projected in a reasonable range: we
3729  // do this at the end with all the histos, so we need to create
3730  // a list here
3731  if (!list) {
3732  // Create the list
3733  list = new TList;
3734  list->SetName(h->GetName());
3735  list->SetOwner();
3736  fOutputLists->Add(list);
3737  // Move in it any previously merged object from the output list
3738  if (fOutput && (href = (TH1 *) fOutput->FindObject(h->GetName()))) {
3739  fOutput->Remove(href);
3740  list->Add(href);
3741  }
3742  }
3743  TIter nxh(list);
3744  while ((href = (TH1 *) nxh())) {
3745  if (href->GetBuffer() && href->GetBufferLength() < nent) break;
3746  }
3747  if (href) {
3748  list->AddBefore(href, h);
3749  } else {
3750  list->Add(h);
3751  }
3752  // Done
3753  return (TObject *)0;
3754 
3755  } else {
3756 
3757  if (list) {
3758  TIter nxh(list);
3759  while ((href = (TH1 *) nxh())) {
3760  if (href->GetBuffer() || href->GetEntries() < nent) break;
3761  }
3762  if (href) {
3763  list->AddBefore(href, h);
3764  } else {
3765  list->Add(h);
3766  }
3767  // Done
3768  return (TObject *)0;
3769 
3770  } else {
3771  // Check if we can 'Add' the histogram to an existing one; this is more efficient
3772  // then using Merge
3773  TH1 *hout = (TH1*) fOutput->FindObject(h->GetName());
3774  if (hout) {
3775  // Remove the existing histo from the output list ...
3776  fOutput->Remove(hout);
3777  // ... and create either the list to merge in one-go at the end
3778  // (more efficient than merging one by one) or, if too big, merge
3779  // these two and start the 'one-by-one' technology
3780  Int_t hsz = h->GetNbinsX() * h->GetNbinsY() * h->GetNbinsZ();
3781  if (fMergeTH1OneByOne || (gProofServ && hsz > gProofServ->GetMsgSizeHWM())) {
3782  list = new TList;
3783  list->Add(hout);
3784  h->Merge(list);
3785  list->SetOwner();
3786  delete list;
3787  return h;
3788  } else {
3789  list = new TList;
3790  list->SetName(h->GetName());
3791  list->SetOwner();
3792  fOutputLists->Add(list);
3793  // Add the existing and the incoming histos
3794  list->Add(hout);
3795  list->Add(h);
3796  // Done
3797  return (TObject *)0;
3798  }
3799  } else {
3800  // This is the first one; add it to the output list
3801  fOutput->Add(h);
3802  return (TObject *)0;
3803  }
3804  }
3805  }
3806  PDB(kOutput,1) Info("HandleHistogram", "leaving");
3807 }
3808 
3809 ////////////////////////////////////////////////////////////////////////////////
3810 /// Return kTRUE is the histograms 'h0' and 'h1' have the same binning and ranges
3811 /// on the axis (i.e. if they can be just Add-ed for merging).
3812 
3814 {
3815  Bool_t rc = kFALSE;
3816  if (!h0 || !h1) return rc;
3817 
3818  TAxis *a0 = 0, *a1 = 0;
3819 
3820  // Check X
3821  a0 = h0->GetXaxis();
3822  a1 = h1->GetXaxis();
3823  if (a0->GetNbins() == a1->GetNbins())
3824  if (TMath::Abs(a0->GetXmax() - a1->GetXmax()) < 1.e-9)
3825  if (TMath::Abs(a0->GetXmin() - a1->GetXmin()) < 1.e-9) rc = kTRUE;
3826 
3827  // Check Y, if needed
3828  if (h0->GetDimension() > 1) {
3829  rc = kFALSE;
3830  a0 = h0->GetYaxis();
3831  a1 = h1->GetYaxis();
3832  if (a0->GetNbins() == a1->GetNbins())
3833  if (TMath::Abs(a0->GetXmax() - a1->GetXmax()) < 1.e-9)
3834  if (TMath::Abs(a0->GetXmin() - a1->GetXmin()) < 1.e-9) rc = kTRUE;
3835  }
3836 
3837  // Check Z, if needed
3838  if (h0->GetDimension() > 2) {
3839  rc = kFALSE;
3840  a0 = h0->GetZaxis();
3841  a1 = h1->GetZaxis();
3842  if (a0->GetNbins() == a1->GetNbins())
3843  if (TMath::Abs(a0->GetXmax() - a1->GetXmax()) < 1.e-9)
3844  if (TMath::Abs(a0->GetXmin() - a1->GetXmin()) < 1.e-9) rc = kTRUE;
3845  }
3846 
3847  // Done
3848  return rc;
3849 }
3850 
3851 ////////////////////////////////////////////////////////////////////////////////
3852 /// Store received output list.
3853 
3855 {
3856  PDB(kOutput,1) Info("StoreOutput","Enter");
3857 
3858  if ( out == 0 ) {
3859  PDB(kOutput,1) Info("StoreOutput","Leave (empty)");
3860  return;
3861  }
3862 
3863  TIter next(out);
3864  out->SetOwner(kFALSE); // take ownership of the contents
3865 
3866  if (fOutputLists == 0) {
3867  PDB(kOutput,2) Info("StoreOutput","Create fOutputLists");
3868  fOutputLists = new TList;
3869  fOutputLists->SetOwner();
3870  }
3871  // process eventlists first
3872  TList* lists = dynamic_cast<TList*> (out->FindObject("PROOF_EventListsList"));
3873  if (lists) {
3874  out->Remove(lists);
3875  TEventList *mainList = new TEventList("PROOF_EventList");
3876  out->Add(mainList);
3877  TIter it(lists);
3878  TEventList *aList;
3879  while ( (aList = dynamic_cast<TEventList*> (it())) ) {
3880  // find file offset
3881  TIter nxe(fDSet->GetListOfElements());
3882  TDSetElement *elem;
3883  while ( (elem = dynamic_cast<TDSetElement*> (nxe())) ) {
3884  if (strcmp(elem->GetFileName(), aList->GetName()) == 0)
3885  break;
3886  }
3887  if (!elem) {
3888  Error("StoreOutput", "found the EventList for %s, but no object with that name "
3889  "in the TDSet", aList->GetName());
3890  continue;
3891  }
3892  Long64_t offset = elem->GetTDSetOffset();
3893 
3894  // shift the list by the number of first event in that file
3895  Long64_t *arr = aList->GetList();
3896  Int_t num = aList->GetN();
3897  if (arr && offset)
3898  for (int i = 0; i < num; i++)
3899  arr[i] += offset;
3900 
3901  mainList->Add(aList); // add to the main list
3902  }
3903  delete lists;
3904  }
3905 
3906  TObject *obj;
3907  while( (obj = next()) ) {
3908  PDB(kOutput,2) Info("StoreOutput","find list for '%s'", obj->GetName() );
3909 
3910  TList *list = (TList *) fOutputLists->FindObject( obj->GetName() );
3911  if ( list == 0 ) {
3912  PDB(kOutput,2) Info("StoreOutput", "list for '%s' not found (creating)", obj->GetName());
3913  list = new TList;
3914  list->SetName( obj->GetName() );
3915  list->SetOwner();
3916  fOutputLists->Add( list );
3917  }
3918  list->Add( obj );
3919  }
3920 
3921  delete out;
3922  PDB(kOutput,1) Info("StoreOutput", "leave");
3923 }
3924 
3925 ////////////////////////////////////////////////////////////////////////////////
3926 /// Merge feedback lists.
3927 
3929 {
3930  PDB(kFeedback,1)
3931  Info("MergeFeedback","Enter");
3932 
3933  if ( fFeedbackLists == 0 ) {
3934  PDB(kFeedback,1)
3935  Info("MergeFeedback","Leave (no output)");
3936  return 0;
3937  }
3938 
3939  TList *fb = new TList; // collection of feedback objects
3940  fb->SetOwner();
3941 
3942  TIter next(fFeedbackLists);
3943 
3944  TMap *map;
3945  while ( (map = (TMap*) next()) ) {
3946 
3947  PDB(kFeedback,2)
3948  Info("MergeFeedback", "map %s size: %d", map->GetName(), map->GetSize());
3949 
3950  // turn map into list ...
3951 
3952  TList *list = new TList;
3953  TIter keys(map);
3954 
3955 #ifndef R__TH1MERGEFIXED
3956  Int_t nbmx = -1;
3957  TObject *oref = 0;
3958 #endif
3959  while ( TObject *key = keys() ) {
3960  TObject *o = map->GetValue(key);
3961  TH1 *h = dynamic_cast<TH1 *>(o);
3962 #ifndef R__TH1MERGEFIXED
3963  // Temporary fix for to cope with the problem in TH1::Merge.
3964  // We need to use a reference histo the one with the largest number
3965  // of bins so that the histos from all submasters can be correctly
3966  // fit in
3967  if (h && !strncmp(o->GetName(),"PROOF_",6)) {
3968  if (h->GetNbinsX() > nbmx) {
3969  nbmx= h->GetNbinsX();
3970  oref = o;
3971  }
3972  }
3973 #endif
3974  if (h) {
3975  TIter nxh(list);
3976  TH1 *href= 0;
3977  while ((href = (TH1 *)nxh())) {
3978  if (h->GetBuffer()) {
3979  if (href->GetBuffer() && href->GetBufferLength() < h->GetBufferLength()) break;
3980  } else {
3981  if (href->GetBuffer() || href->GetEntries() < h->GetEntries()) break;
3982  }
3983  }
3984  if (href) {
3985  list->AddBefore(href, h);
3986  } else {
3987  list->Add(h);
3988  }
3989  } else {
3990  list->Add(o);
3991  }
3992  }
3993 
3994  // clone first object, remove from list
3995 #ifdef R__TH1MERGEFIXED
3996  TObject *obj = list->First();
3997 #else
3998  TObject *obj = (oref) ? oref : list->First();
3999 #endif
4000  list->Remove(obj);
4001  obj = obj->Clone();
4002  fb->Add(obj);
4003 
4004  if ( list->IsEmpty() ) {
4005  delete list;
4006  continue;
4007  }
4008 
4009  // merge list with clone
4010  TMethodCall callEnv;
4011  if (obj->IsA())
4012  callEnv.InitWithPrototype(obj->IsA(), "Merge", "TCollection*");
4013  if (callEnv.IsValid()) {
4014  callEnv.SetParam((Long_t) list);
4015  callEnv.Execute(obj);
4016  } else {
4017  // No Merge interface, return copy of individual objects
4018  while ( (obj = list->First()) ) {
4019  fb->Add(obj->Clone());
4020  list->Remove(obj);
4021  }
4022  }
4023 
4024  delete list;
4025  }
4026 
4027  PDB(kFeedback,1)
4028  Info("MergeFeedback","Leave (%d object(s))", fb->GetSize());
4029 
4030  return fb;
4031 }
4032 
4033 ////////////////////////////////////////////////////////////////////////////////
4034 /// Store feedback results from the specified slave.
4035 
4037 {
4038  PDB(kFeedback,1)
4039  Info("StoreFeedback","Enter");
4040 
4041  if ( out == 0 ) {
4042  PDB(kFeedback,1)
4043  Info("StoreFeedback","Leave (empty)");
4044  return;
4045  }
4046 
4047  if ( IsClient() ) {
4048  // in client
4049  Feedback(out);
4050  delete out;
4051  return;
4052  }
4053 
4054  if (fFeedbackLists == 0) {
4055  PDB(kFeedback,2) Info("StoreFeedback","Create fFeedbackLists");
4056  fFeedbackLists = new TList;
4057  fFeedbackLists->SetOwner();
4058  }
4059 
4060  TIter next(out);
4061  out->SetOwner(kFALSE); // take ownership of the contents
4062 
4063  const char *ord = ((TSlave*) slave)->GetOrdinal();
4064 
4065  TObject *obj;
4066  while( (obj = next()) ) {
4067  PDB(kFeedback,2)
4068  Info("StoreFeedback","%s: Find '%s'", ord, obj->GetName() );
4069  TMap *map = (TMap*) fFeedbackLists->FindObject(obj->GetName());
4070  if ( map == 0 ) {
4071  PDB(kFeedback,2)
4072  Info("StoreFeedback", "%s: map for '%s' not found (creating)", ord, obj->GetName());
4073  // Map must not be owner (ownership is with regards to the keys (only))
4074  map = new TMap;
4075  map->SetName(obj->GetName());
4076  fFeedbackLists->Add(map);
4077  } else {
4078  PDB(kFeedback,2)
4079  Info("StoreFeedback","%s: removing previous value", ord);
4080  if (map->GetValue(slave))
4081  delete map->GetValue(slave);
4082  map->Remove(slave);
4083  }
4084  map->Add(slave, obj);
4085  PDB(kFeedback,2)
4086  Info("StoreFeedback","%s: %s, size: %d", ord, obj->GetName(), map->GetSize());
4087  }
4088 
4089  delete out;
4090  PDB(kFeedback,1)
4091  Info("StoreFeedback","Leave");
4092 }
4093 
4094 ////////////////////////////////////////////////////////////////////////////////
4095 /// Setup reporting of feedback objects.
4096 
4098 {
4099  if (IsClient()) return; // Client does not need timer
4100 
4101  fFeedback = (TList*) fInput->FindObject("FeedbackList");
4102 
4103  PDB(kFeedback,1) Info("SetupFeedback","\"FeedbackList\" %sfound",
4104  fFeedback == 0 ? "NOT ":"");
4105 
4106  if (fFeedback == 0 || fFeedback->GetSize() == 0) return;
4107 
4108  // OK, feedback was requested, setup the timer
4110  fFeedbackPeriod = 2000;
4111  TProof::GetParameter(fInput, "PROOF_FeedbackPeriod", fFeedbackPeriod);
4112  fFeedbackTimer = new TTimer;
4113  fFeedbackTimer->SetObject(this);
4115 }
4116 
4117 ////////////////////////////////////////////////////////////////////////////////
4118 /// Stop reporting of feedback objects.
4119 
4121 {
4122  if (fFeedbackTimer == 0) return;
4123 
4124  PDB(kFeedback,1) Info("StopFeedback","Stop Timer");
4125 
4127 }
4128 
4129 ////////////////////////////////////////////////////////////////////////////////
4130 /// Send feedback objects to client.
4131 
4133 {
4134  PDB(kFeedback,2) Info("HandleTimer","Entry");
4135 
4136  if (fFeedbackTimer == 0) return kFALSE; // timer already switched off
4137 
4138  // process local feedback objects
4139 
4140  TList *fb = new TList;
4141  fb->SetOwner();
4142 
4143  TIter next(fFeedback);
4144  while( TObjString *name = (TObjString*) next() ) {
4145  TObject *o = fOutput->FindObject(name->GetName());
4146  if (o != 0) {
4147  fb->Add(o->Clone());
4148  // remove the corresponding entry from the feedback list
4149  TMap *m = 0;
4150  if (fFeedbackLists &&
4151  (m = (TMap *) fFeedbackLists->FindObject(name->GetName()))) {
4152  fFeedbackLists->Remove(m);
4153  m->DeleteValues();
4154  delete m;
4155  }
4156  }
4157  }
4158 
4159  if (fb->GetSize() > 0) {
4160  StoreFeedback(this, fb); // adopts fb
4161  } else {
4162  delete fb;
4163  }
4164 
4165  if (fFeedbackLists == 0) {
4166  fFeedbackTimer->Start(fFeedbackPeriod, kTRUE); // maybe next time
4167  return kFALSE;
4168  }
4169 
4170  fb = MergeFeedback();
4171 
4172  PDB(kFeedback,2) Info("HandleTimer","Sending %d objects", fb->GetSize());
4173 
4175  m << fb;
4176 
4177  // send message to client;
4178  gProofServ->GetSocket()->Send(m);
4179 
4180  delete fb;
4181 
4183 
4184  return kFALSE; // ignored?
4185 }
4186 
4187 ////////////////////////////////////////////////////////////////////////////////
4188 /// Get next packet for specified slave.
4189 
4191 {
4192  // The first call to this determines the end of initialization
4193  SetInitTime();
4194 
4195  if (fProcPackets) {
4196  Int_t bin = fProcPackets->GetXaxis()->FindBin(slave->GetOrdinal());
4197  if (bin >= 0) {
4198  if (fProcPackets->GetBinContent(bin) > 0)
4199  fProcPackets->Fill(slave->GetOrdinal(), -1);
4200  }
4201  }
4202 
4203  TDSetElement *e = fPacketizer->GetNextPacket( slave, r );
4204 
4205  if (e == 0) {
4206  PDB(kPacketizer,2)
4207  Info("GetNextPacket","%s: done!", slave->GetOrdinal());
4208  } else if (e == (TDSetElement*) -1) {
4209  PDB(kPacketizer,2)
4210  Info("GetNextPacket","%s: waiting ...", slave->GetOrdinal());
4211  } else {
4212  PDB(kPacketizer,2)
4213  Info("GetNextPacket","%s (%s): '%s' '%s' '%s' %lld %lld",
4214  slave->GetOrdinal(), slave->GetName(), e->GetFileName(),
4215  e->GetDirectory(), e->GetObjName(), e->GetFirst(), e->GetNum());
4216  if (fProcPackets) fProcPackets->Fill(slave->GetOrdinal(), 1);
4217  }
4218 
4219  return e;
4220 }
4221 
4222 ////////////////////////////////////////////////////////////////////////////////
4223 /// Is the player running on the client?
4224 
4226 {
4227  return fProof ? fProof->TestBit(TProof::kIsClient) : kFALSE;
4228 }
4229 
4230 ////////////////////////////////////////////////////////////////////////////////
4231 /// Draw (support for TChain::Draw()).
4232 /// Returns -1 in case of error or number of selected events in case of success.
4233 
4235  const char *selection, Option_t *option,
4236  Long64_t nentries, Long64_t firstentry)
4237 {
4238  if (!fgDrawInputPars) {
4239  fgDrawInputPars = new THashList;
4240  fgDrawInputPars->Add(new TObjString("FeedbackList"));
4241  fgDrawInputPars->Add(new TObjString("PROOF_ChainWeight"));
4242  fgDrawInputPars->Add(new TObjString("PROOF_LineColor"));
4243  fgDrawInputPars->Add(new TObjString("PROOF_LineStyle"));
4244  fgDrawInputPars->Add(new TObjString("PROOF_LineWidth"));
4245  fgDrawInputPars->Add(new TObjString("PROOF_MarkerColor"));
4246  fgDrawInputPars->Add(new TObjString("PROOF_MarkerStyle"));
4247  fgDrawInputPars->Add(new TObjString("PROOF_MarkerSize"));
4248  fgDrawInputPars->Add(new TObjString("PROOF_FillColor"));
4249  fgDrawInputPars->Add(new TObjString("PROOF_FillStyle"));
4250  fgDrawInputPars->Add(new TObjString("PROOF_ListOfAliases"));
4251  }
4252 
4253  TString selector, objname;
4254  if (GetDrawArgs(varexp, selection, option, selector, objname) != 0) {
4255  Error("DrawSelect", "parsing arguments");
4256  return -1;
4257  }
4258 
4259  TNamed *varexpobj = new TNamed("varexp", varexp);
4260  TNamed *selectionobj = new TNamed("selection", selection);
4261 
4262  // Save the current input list
4263  TObject *o = 0;
4264  TList *savedInput = new TList;
4265  TIter nxi(fInput);
4266  while ((o = nxi())) {
4267  savedInput->Add(o);
4268  TString n(o->GetName());
4269  if (fgDrawInputPars &&
4271  !n.BeginsWith("alias:")) fInput->Remove(o);
4272  }
4273 
4274  fInput->Add(varexpobj);
4275  fInput->Add(selectionobj);
4276 
4277  // Make sure we have an object name
4278  if (objname == "") objname = "htemp";
4279 
4280  fProof->AddFeedback(objname);
4281  Long64_t r = Process(set, selector, option, nentries, firstentry);
4282  fProof->RemoveFeedback(objname);
4283 
4284  fInput->Remove(varexpobj);
4285  fInput->Remove(selectionobj);
4286  if (TNamed *opt = dynamic_cast<TNamed*> (fInput->FindObject("PROOF_OPTIONS"))) {
4287  fInput->Remove(opt);
4288  delete opt;
4289  }
4290 
4291  delete varexpobj;
4292  delete selectionobj;
4293 
4294  // Restore the input list
4295  fInput->Clear();
4296  TIter nxsi(savedInput);
4297  while ((o = nxsi()))
4298  fInput->Add(o);
4299  savedInput->SetOwner(kFALSE);
4300  delete savedInput;
4301 
4302  return r;
4303 }
4304 
4305 ////////////////////////////////////////////////////////////////////////////////
4306 /// Set init time
4307 
4309 {
4310  if (fPacketizer)
4311  fPacketizer->SetInitTime();
4312 }
4313 
4314 //------------------------------------------------------------------------------
4315 
4316 
4318 
4319 ////////////////////////////////////////////////////////////////////////////////
4320 /// Setup feedback.
4321 
4323 {
4324  TList *fb = (TList*) fInput->FindObject("FeedbackList");
4325  if (fb) {
4326  PDB(kFeedback,1)
4327  Info("SetupFeedback","\"FeedbackList\" found: %d objects", fb->GetSize());
4328  } else {
4329  PDB(kFeedback,1)
4330  Info("SetupFeedback","\"FeedbackList\" NOT found");
4331  }
4332 
4333  if (fb == 0 || fb->GetSize() == 0) return;
4334 
4335  // OK, feedback was requested, setup the timer
4336 
4338  fFeedbackPeriod = 2000;
4339  TProof::GetParameter(fInput, "PROOF_FeedbackPeriod", fFeedbackPeriod);
4340  fFeedbackTimer = new TTimer;
4341  fFeedbackTimer->SetObject(this);
4343 
4344  fFeedback = fb;
4345 }
4346 
4347 ////////////////////////////////////////////////////////////////////////////////
4348 /// Stop feedback.
4349 
4351 {
4352  if (fFeedbackTimer == 0) return;
4353 
4354  PDB(kFeedback,1) Info("StopFeedback","Stop Timer");
4355 
4357 }
4358 
4359 ////////////////////////////////////////////////////////////////////////////////
4360 /// Handle timer event.
4361 
4363 {
4364  PDB(kFeedback,2) Info("HandleTimer","Entry");
4365 
4366  // If in sequential (0-slave-PROOF) mode we do not have a packetizer
4367  // so we also send the info to update the progress bar.
4368  if (gProofServ) {
4369  Bool_t sendm = kFALSE;
4371  if (gProofServ->IsMaster() && !gProofServ->IsParallel()) {
4372  sendm = kTRUE;
4373  if (gProofServ->GetProtocol() > 25) {
4374  m << GetProgressStatus();
4375  } else if (gProofServ->GetProtocol() > 11) {
4377  m << fTotalEvents << ps->GetEntries() << ps->GetBytesRead()
4378  << (Float_t) -1. << (Float_t) ps->GetProcTime()
4379  << (Float_t) ps->GetRate() << (Float_t) -1.;
4380  } else {
4381  m << fTotalEvents << GetEventsProcessed();
4382  }
4383  }
4384  if (sendm) gProofServ->GetSocket()->Send(m);
4385  }
4386 
4387  if (fFeedback == 0) return kFALSE;
4388 
4389  TList *fb = new TList;
4390  fb->SetOwner(kFALSE);
4391 
4392  if (fOutput == 0) {
4394  }
4395 
4396  if (fOutput) {
4397  TIter next(fFeedback);
4398  while( TObjString *name = (TObjString*) next() ) {
4399  // TODO: find object in memory ... maybe allow only in fOutput ?
4400  TObject *o = fOutput->FindObject(name->GetName());
4401  if (o != 0) fb->Add(o);
4402  }
4403  }
4404 
4405  PDB(kFeedback,2) Info("HandleTimer","Sending %d objects", fb->GetSize());
4406 
4408  m << fb;
4409 
4410  // send message to client;
4411  gProofServ->GetSocket()->Send(m);
4412 
4413  delete fb;
4414 
4416 
4417  return kFALSE; // ignored?
4418 }
4419 
4420 ////////////////////////////////////////////////////////////////////////////////
4421 /// Handle tree header request.
4422 
4424 {
4426 
4427  TDSet *dset;
4428  (*mess) >> dset;
4429  dset->Reset();
4430  TDSetElement *e = dset->Next();
4431  Long64_t entries = 0;
4432  TFile *f = 0;
4433  TTree *t = 0;
4434  if (!e) {
4435  PDB(kGlobal, 1) Info("HandleGetTreeHeader", "empty TDSet");
4436  } else {
4437  f = TFile::Open(e->GetFileName());
4438  t = 0;
4439  if (f) {
4440  t = (TTree*) f->Get(e->GetObjName());
4441  if (t) {
4442  t->SetMaxVirtualSize(0);
4443  t->DropBaskets();
4444  entries = t->GetEntries();
4445 
4446  // compute #entries in all the files
4447  while ((e = dset->Next()) != 0) {
4448  TFile *f1 = TFile::Open(e->GetFileName());
4449  if (f1) {
4450  TTree *t1 = (TTree*) f1->Get(e->GetObjName());
4451  if (t1) {
4452  entries += t1->GetEntries();
4453  delete t1;
4454  }
4455  delete f1;
4456  }
4457  }
4458  t->SetMaxEntryLoop(entries); // this field will hold the total number of entries ;)
4459  }
4460  }
4461  }
4462  if (t)
4463  answ << TString("Success") << t;
4464  else
4465  answ << TString("Failed") << t;
4466 
4467  fSocket->Send(answ);
4468 
4469  SafeDelete(t);
4470  SafeDelete(f);
4471 }
4472 
4473 
4474 //------------------------------------------------------------------------------
4475 
4477 
4478 ////////////////////////////////////////////////////////////////////////////////
4479 /// Process specified TDSet on PROOF. Runs on super master.
4480 /// The return value is -1 in case of error and TSelector::GetStatus() in
4481 /// in case of success.
4482 
4483 Long64_t TProofPlayerSuperMaster::Process(TDSet *dset, const char *selector_file,
4484  Option_t *option, Long64_t nentries,
4485  Long64_t first)
4486 {
4488  PDB(kGlobal,1) Info("Process","Enter");
4489 
4490  TProofSuperMaster *proof = dynamic_cast<TProofSuperMaster*>(GetProof());
4491  if (!proof) return -1;
4492 
4493  delete fOutput;
4494  fOutput = new THashList;
4495 
4497 
4498  if (!SendSelector(selector_file)) {
4499  Error("Process", "sending selector %s", selector_file);
4500  return -1;
4501  }
4502 
4503  TCleanup clean(this);
4504  SetupFeedback();
4505 
4506  if (proof->IsMaster()) {
4507 
4508  // make sure the DSet is valid
4509  if (!dset->ElementsValid()) {
4510  proof->ValidateDSet(dset);
4511  if (!dset->ElementsValid()) {
4512  Error("Process", "could not validate TDSet");
4513  return -1;
4514  }
4515  }
4516 
4517  TList msds;
4518  msds.SetOwner(); // This will delete TPairs
4519 
4520  TList keyholder; // List to clean up key part of the pairs
4521  keyholder.SetOwner();
4522  TList valueholder; // List to clean up value part of the pairs
4523  valueholder.SetOwner();
4524 
4525  // Construct msd list using the slaves
4526  TIter nextslave(proof->GetListOfActiveSlaves());
4527  while (TSlave *sl = dynamic_cast<TSlave*>(nextslave())) {
4528  TList *submasters = 0;
4529  TPair *msd = dynamic_cast<TPair*>(msds.FindObject(sl->GetMsd()));
4530  if (!msd) {
4531  submasters = new TList;
4532  submasters->SetName(sl->GetMsd());
4533  keyholder.Add(submasters);
4534  TList *setelements = new TSortedList(kSortDescending);
4535  setelements->SetName(TString(sl->GetMsd())+"_Elements");
4536  valueholder.Add(setelements);
4537  msds.Add(new TPair(submasters, setelements));
4538  } else {
4539  submasters = dynamic_cast<TList*>(msd->Key());
4540  }
4541  if (submasters) submasters->Add(sl);
4542  }
4543 
4544  // Add TDSetElements to msd list
4545  Long64_t cur = 0; //start of next element
4546  TIter nextelement(dset->GetListOfElements());
4547  while (TDSetElement *elem = dynamic_cast<TDSetElement*>(nextelement())) {
4548 
4549  if (elem->GetNum()<1) continue; // get rid of empty elements
4550 
4551  if (nentries !=-1 && cur>=first+nentries) {
4552  // we are done
4553  break;
4554  }
4555 
4556  if (cur+elem->GetNum()-1<first) {
4557  //element is before first requested entry
4558  cur+=elem->GetNum();
4559  continue;
4560  }
4561 
4562  if (cur<first) {
4563  //modify element to get proper start
4564  elem->SetNum(elem->GetNum()-(first-cur));
4565  elem->SetFirst(elem->GetFirst()+first-cur);
4566  cur=first;
4567  }
4568 
4569  if (nentries==-1 || cur+elem->GetNum()<=first+nentries) {
4570  cur+=elem->GetNum();
4571  } else {
4572  //modify element to get proper end
4573  elem->SetNum(first+nentries-cur);
4574  cur=first+nentries;
4575  }
4576 
4577  TPair *msd = dynamic_cast<TPair*>(msds.FindObject(elem->GetMsd()));
4578  if (!msd) {
4579  Error("Process", "data requires mass storage domain '%s'"
4580  " which is not accessible in this proof session",
4581  elem->GetMsd());
4582  return -1;
4583  } else {
4584  TList *elements = dynamic_cast<TList*>(msd->Value());
4585  if (elements) elements->Add(elem);
4586  }
4587  }
4588 
4589  TList usedmasters;
4590  TIter nextmsd(msds.MakeIterator());
4591  while (TPair *msd = dynamic_cast<TPair*>(nextmsd())) {
4592  TList *submasters = dynamic_cast<TList*>(msd->Key());
4593  TList *setelements = dynamic_cast<TList*>(msd->Value());
4594 
4595  // distribute elements over the masters
4596  Int_t nmasters = submasters ? submasters->GetSize() : -1;
4597  Int_t nelements = setelements ? setelements->GetSize() : -1;
4598  for (Int_t i=0; i<nmasters; i++) {
4599 
4600  Long64_t nent = 0;
4601  TDSet set(dset->GetType(), dset->GetObjName(),
4602  dset->GetDirectory());
4603  for (Int_t j = (i*nelements)/nmasters;
4604  j < ((i+1)*nelements)/nmasters;
4605  j++) {
4606  TDSetElement *elem = setelements ?
4607  dynamic_cast<TDSetElement*>(setelements->At(j)) : (TDSetElement *)0;
4608  if (elem) {
4609  set.Add(elem->GetFileName(), elem->GetObjName(),
4610  elem->GetDirectory(), elem->GetFirst(),
4611  elem->GetNum(), elem->GetMsd());
4612  nent += elem->GetNum();
4613  } else {
4614  Warning("Process", "not a TDSetElement object");
4615  }
4616  }
4617 
4618  if (set.GetListOfElements()->GetSize()>0) {
4619  TMessage mesg(kPROOF_PROCESS);
4620  TString fn(gSystem->BaseName(selector_file));
4621  TString opt = option;
4622  mesg << &set << fn << fInput << opt << Long64_t(-1) << Long64_t(0);
4623 
4624  TSlave *sl = dynamic_cast<TSlave*>(submasters->At(i));
4625  if (sl) {
4626  PDB(kGlobal,1) Info("Process",
4627  "Sending TDSet with %d elements to submaster %s",
4628  set.GetListOfElements()->GetSize(),
4629  sl->GetOrdinal());
4630  sl->GetSocket()->Send(mesg);
4631  usedmasters.Add(sl);
4632 
4633  // setup progress info
4634  fSlaves.AddLast(sl);
4635  fSlaveProgress.Set(fSlaveProgress.GetSize()+1);
4636  fSlaveProgress[fSlaveProgress.GetSize()-1] = 0;
4637  fSlaveTotals.Set(fSlaveTotals.GetSize()+1);
4638  fSlaveTotals[fSlaveTotals.GetSize()-1] = nent;
4639  fSlaveBytesRead.Set(fSlaveBytesRead.GetSize()+1);
4640  fSlaveBytesRead[fSlaveBytesRead.GetSize()-1] = 0;
4641  fSlaveInitTime.Set(fSlaveInitTime.GetSize()+1);
4642  fSlaveInitTime[fSlaveInitTime.GetSize()-1] = -1.;
4643  fSlaveProcTime.Set(fSlaveProcTime.GetSize()+1);
4644  fSlaveProcTime[fSlaveProcTime.GetSize()-1] = -1.;
4645  fSlaveEvtRti.Set(fSlaveEvtRti.GetSize()+1);
4646  fSlaveEvtRti[fSlaveEvtRti.GetSize()-1] = -1.;
4647  fSlaveMBRti.Set(fSlaveMBRti.GetSize()+1);
4648  fSlaveMBRti[fSlaveMBRti.GetSize()-1] = -1.;
4649  fSlaveActW.Set(fSlaveActW.GetSize()+1);
4650  fSlaveActW[fSlaveActW.GetSize()-1] = 0;
4651  fSlaveTotS.Set(fSlaveTotS.GetSize()+1);
4652  fSlaveTotS[fSlaveTotS.GetSize()-1] = 0;
4653  fSlaveEffS.Set(fSlaveEffS.GetSize()+1);
4654  fSlaveEffS[fSlaveEffS.GetSize()-1] = 0.;
4655  } else {
4656  Warning("Process", "not a TSlave object");
4657  }
4658  }
4659  }
4660  }
4661 
4662  if ( !IsClient() ) HandleTimer(0);
4663  PDB(kGlobal,1) Info("Process","Calling Collect");
4664  proof->Collect(&usedmasters);
4665  HandleTimer(0);
4666 
4667  }
4668 
4669  StopFeedback();
4670 
4671  PDB(kGlobal,1) Info("Process","Calling Merge Output");
4672  MergeOutput();
4673 
4674  TPerfStats::Stop();
4675 
4676  return 0;
4677 }
4678 
4679 ////////////////////////////////////////////////////////////////////////////////
4680 /// Report progress.
4681 
4683 {
4684  Int_t idx = fSlaves.IndexOf(sl);
4685  fSlaveProgress[idx] = processed;
4686  if (fSlaveTotals[idx] != total)
4687  Warning("Progress", "total events has changed for slave %s", sl->GetName());
4688  fSlaveTotals[idx] = total;
4689 
4690  Long64_t tot = 0;
4691  Int_t i;
4692  for (i = 0; i < fSlaveTotals.GetSize(); i++) tot += fSlaveTotals[i];
4693  Long64_t proc = 0;
4694  for (i = 0; i < fSlaveProgress.GetSize(); i++) proc += fSlaveProgress[i];
4695 
4696  Progress(tot, proc);
4697 }
4698 
4699 ////////////////////////////////////////////////////////////////////////////////
4700 /// Report progress.
4701 
4703  Long64_t processed, Long64_t bytesread,
4704  Float_t initTime, Float_t procTime,
4705  Float_t evtrti, Float_t mbrti)
4706 {
4707  PDB(kGlobal,2)
4708  Info("Progress","%s: %lld %lld %f %f %f %f", sl->GetName(),
4709  processed, bytesread, initTime, procTime, evtrti, mbrti);
4710 
4711  Int_t idx = fSlaves.IndexOf(sl);
4712  if (fSlaveTotals[idx] != total)
4713  Warning("Progress", "total events has changed for slave %s", sl->GetName());
4714  fSlaveTotals[idx] = total;
4715  fSlaveProgress[idx] = processed;
4716  fSlaveBytesRead[idx] = bytesread;
4717  fSlaveInitTime[idx] = (initTime > -1.) ? initTime : fSlaveInitTime[idx];
4718  fSlaveProcTime[idx] = (procTime > -1.) ? procTime : fSlaveProcTime[idx];
4719  fSlaveEvtRti[idx] = (evtrti > -1.) ? evtrti : fSlaveEvtRti[idx];
4720  fSlaveMBRti[idx] = (mbrti > -1.) ? mbrti : fSlaveMBRti[idx];
4721 
4722  Int_t i;
4723  Long64_t tot = 0;
4724  Long64_t proc = 0;
4725  Long64_t bytes = 0;
4726  Float_t init = -1.;
4727  Float_t ptime = -1.;
4728  Float_t erti = 0.;
4729  Float_t srti = 0.;
4730  Int_t nerti = 0;
4731  Int_t nsrti = 0;
4732  for (i = 0; i < fSlaveTotals.GetSize(); i++) {
4733  tot += fSlaveTotals[i];
4734  if (i < fSlaveProgress.GetSize())
4735  proc += fSlaveProgress[i];
4736  if (i < fSlaveBytesRead.GetSize())
4737  bytes += fSlaveBytesRead[i];
4738  if (i < fSlaveInitTime.GetSize())
4739  if (fSlaveInitTime[i] > -1. && (init < 0. || fSlaveInitTime[i] < init))
4740  init = fSlaveInitTime[i];
4741  if (i < fSlaveProcTime.GetSize())
4742  if (fSlaveProcTime[i] > -1. && (ptime < 0. || fSlaveProcTime[i] > ptime))
4743  ptime = fSlaveProcTime[i];
4744  if (i < fSlaveEvtRti.GetSize())
4745  if (fSlaveEvtRti[i] > -1.) {
4746  erti += fSlaveEvtRti[i];
4747  nerti++;
4748  }
4749  if (i < fSlaveMBRti.GetSize())
4750  if (fSlaveMBRti[i] > -1.) {
4751  srti += fSlaveMBRti[i];
4752  nsrti++;
4753  }
4754  }
4755  srti = (nsrti > 0) ? srti / nerti : 0.;
4756 
4757  Progress(tot, proc, bytes, init, ptime, erti, srti);
4758 }
4759 
4760 ////////////////////////////////////////////////////////////////////////////////
4761 /// Progress signal.
4762 
4764 {
4765  if (pi) {
4766  PDB(kGlobal,2)
4767  Info("Progress","%s: %lld %lld %lld %f %f %f %f %d %f", wrk->GetOrdinal(),
4768  pi->fTotal, pi->fProcessed, pi->fBytesRead,
4769  pi->fInitTime, pi->fProcTime, pi->fEvtRateI, pi->fMBRateI,
4770  pi->fActWorkers, pi->fEffSessions);
4771 
4772  Int_t idx = fSlaves.IndexOf(wrk);
4773  if (fSlaveTotals[idx] != pi->fTotal)
4774  Warning("Progress", "total events has changed for worker %s", wrk->GetName());
4775  fSlaveTotals[idx] = pi->fTotal;
4776  fSlaveProgress[idx] = pi->fProcessed;
4777  fSlaveBytesRead[idx] = pi->fBytesRead;
4778  fSlaveInitTime[idx] = (pi->fInitTime > -1.) ? pi->fInitTime : fSlaveInitTime[idx];
4779  fSlaveProcTime[idx] = (pi->fProcTime > -1.) ? pi->fProcTime : fSlaveProcTime[idx];
4780  fSlaveEvtRti[idx] = (pi->fEvtRateI > -1.) ? pi->fEvtRateI : fSlaveEvtRti[idx];
4781  fSlaveMBRti[idx] = (pi->fMBRateI > -1.) ? pi->fMBRateI : fSlaveMBRti[idx];
4782  fSlaveActW[idx] = (pi->fActWorkers > -1) ? pi->fActWorkers : fSlaveActW[idx];
4783  fSlaveTotS[idx] = (pi->fTotSessions > -1) ? pi->fTotSessions : fSlaveTotS[idx];
4784  fSlaveEffS[idx] = (pi->fEffSessions > -1.) ? pi->fEffSessions : fSlaveEffS[idx];
4785 
4786  Int_t i;
4787  Int_t nerti = 0;
4788  Int_t nsrti = 0;
4789  TProofProgressInfo pisum(0, 0, 0, -1., -1., 0., 0., 0, 0, 0.);
4790  for (i = 0; i < fSlaveTotals.GetSize(); i++) {
4791  pisum.fTotal += fSlaveTotals[i];
4792  if (i < fSlaveProgress.GetSize())
4793  pisum.fProcessed += fSlaveProgress[i];
4794  if (i < fSlaveBytesRead.GetSize())
4795  pisum.fBytesRead += fSlaveBytesRead[i];
4796  if (i < fSlaveInitTime.GetSize())
4797  if (fSlaveInitTime[i] > -1. && (pisum.fInitTime < 0. || fSlaveInitTime[i] < pisum.fInitTime))
4798  pisum.fInitTime = fSlaveInitTime[i];
4799  if (i < fSlaveProcTime.GetSize())
4800  if (fSlaveProcTime[i] > -1. && (pisum.fProcTime < 0. || fSlaveProcTime[i] > pisum.fProcTime))
4801  pisum.fProcTime = fSlaveProcTime[i];
4802  if (i < fSlaveEvtRti.GetSize())
4803  if (fSlaveEvtRti[i] > -1.) {
4804  pisum.fEvtRateI += fSlaveEvtRti[i];
4805  nerti++;
4806  }
4807  if (i < fSlaveMBRti.GetSize())
4808  if (fSlaveMBRti[i] > -1.) {
4809  pisum.fMBRateI += fSlaveMBRti[i];
4810  nsrti++;
4811  }
4812  if (i < fSlaveActW.GetSize())
4813  pisum.fActWorkers += fSlaveActW[i];
4814  if (i < fSlaveTotS.GetSize())
4815  if (fSlaveTotS[i] > -1 && (pisum.fTotSessions < 0. || fSlaveTotS[i] > pisum.fTotSessions))
4816  pisum.fTotSessions = fSlaveTotS[i];
4817  if (i < fSlaveEffS.GetSize())
4818  if (fSlaveEffS[i] > -1. && (pisum.fEffSessions < 0. || fSlaveEffS[i] > pisum.fEffSessions))
4819  pisum.fEffSessions = fSlaveEffS[i];
4820  }
4821  pisum.fMBRateI = (nsrti > 0) ? pisum.fMBRateI / nerti : 0.;
4822 
4823  Progress(&pisum);
4824  }
4825 }
4826 
4827 ////////////////////////////////////////////////////////////////////////////////
4828 /// Send progress and feedback to client.
4829 
4831 {
4832  if (fFeedbackTimer == 0) return kFALSE; // timer stopped already
4833 
4834  Int_t i;
4835  Long64_t tot = 0;
4836  Long64_t proc = 0;
4837  Long64_t bytes = 0;
4838  Float_t init = -1.;
4839  Float_t ptime = -1.;
4840  Float_t erti = 0.;
4841  Float_t srti = 0.;
4842  Int_t nerti = 0;
4843  Int_t nsrti = 0;
4844  for (i = 0; i < fSlaveTotals.GetSize(); i++) {
4845  tot += fSlaveTotals[i];
4846  if (i < fSlaveProgress.GetSize())
4847  proc += fSlaveProgress[i];
4848  if (i < fSlaveBytesRead.GetSize())
4849  bytes += fSlaveBytesRead[i];
4850  if (i < fSlaveInitTime.GetSize())
4851  if (fSlaveInitTime[i] > -1. && (init < 0. || fSlaveInitTime[i] < init))
4852  init = fSlaveInitTime[i];
4853  if (i < fSlaveProcTime.GetSize())
4854  if (fSlaveProcTime[i] > -1. && (ptime < 0. || fSlaveProcTime[i] > ptime))
4855  ptime = fSlaveProcTime[i];
4856  if (i < fSlaveEvtRti.GetSize())
4857  if (fSlaveEvtRti[i] > -1.) {
4858  erti += fSlaveEvtRti[i];
4859  nerti++;
4860  }
4861  if (i < fSlaveMBRti.GetSize())
4862  if (fSlaveMBRti[i] > -1.) {
4863  srti += fSlaveMBRti[i];
4864  nsrti++;
4865  }
4866  }
4867  erti = (nerti > 0) ? erti / nerti : 0.;
4868  srti = (nsrti > 0) ? srti / nerti : 0.;
4869 
4871  if (gProofServ->GetProtocol() > 25) {
4872  // Fill the message now
4873  TProofProgressInfo pi(tot, proc, bytes, init, ptime,
4874  erti, srti, -1,
4876  m << &pi;
4877  } else {
4878 
4879  m << tot << proc << bytes << init << ptime << erti << srti;
4880  }
4881 
4882  // send message to client;
4883  gProofServ->GetSocket()->Send(m);
4884 
4885  if (fReturnFeedback)
4887  else
4888  return kFALSE;
4889 }
4890 
4891 ////////////////////////////////////////////////////////////////////////////////
4892 /// Setup reporting of feedback objects and progress messages.
4893 
4895 {
4896  if (IsClient()) return; // Client does not need timer
4897 
4899 
4900  if (fFeedbackTimer) {
4901  fReturnFeedback = kTRUE;
4902  return;
4903  } else {
4904  fReturnFeedback = kFALSE;
4905  }
4906 
4907  // setup the timer for progress message
4909  fFeedbackPeriod = 2000;
4910  TProof::GetParameter(fInput, "PROOF_FeedbackPeriod", fFeedbackPeriod);
4911  fFeedbackTimer = new TTimer;
4912  fFeedbackTimer->SetObject(this);
4914 }
Int_t fTotSessions
Definition: TProof.h:201
virtual void SetMerging(Bool_t=kTRUE)
Definition: TProofPlayer.h:233
virtual Bool_t cd(const char *path=0)
Change current directory to "this" directory.
TList * GetOutputList()
Definition: TQueryResult.h:139
TClass * GetClass() const
Definition: TMessage.h:76
virtual const char * BaseName(const char *pathname)
Base name of a file name. Base name of /user/root is root.
Definition: TSystem.cxx:929
virtual const char * GetName() const
Returns name of object.
Definition: TNamed.h:51
Long64_t Process(const char *selector, Long64_t nentries=-1, Option_t *option="")
Process the specified TSelector file &#39;nentries&#39; times.
virtual Int_t Write(const char *name=0, Int_t option=0, Int_t bufsize=0)
Write this object to the current directory.
Definition: TObject.cxx:830
virtual Bool_t AccessPathName(const char *path, EAccessMode mode=kFileExists)
Returns FALSE if one can access a file using the specified access mode.
Definition: TSystem.cxx:1266
Long64_t fTotal
Definition: TProof.h:193
static FILE * SetErrorHandlerFile(FILE *ferr)
Set the file stream where to log (default stderr).
virtual TDirectory * mkdir(const char *name, const char *title="")
Create a sub-directory and return a pointer to the created directory.
ErrorHandlerFunc_t SetErrorHandler(ErrorHandlerFunc_t newhandler)
Set an errorhandler function. Returns the old handler.
Definition: TError.cxx:106
void SetMerging(Bool_t on=kTRUE)
Switch on/off merge timer.
An array of TObjects.
Definition: TObjArray.h:39
TProofProgressStatus * fProgressStatus
Definition: TProofPlayer.h:95
TFileInfo * GetFileInfo(const char *type="TTree")
Return the content of this element in the form of a TFileInfo.
Definition: TDSet.cxx:212
float xmin
Definition: THbookFile.cxx:93
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition: TObject.cxx:899
void ValidateDSet(TDSet *dset)
Validate a TDSet.
Double_t RealTime()
Stop the stopwatch (if it is running) and return the realtime (in seconds) passed between the start a...
Definition: TStopwatch.cxx:110
Internal class steering processing in PROOF.
Definition: TProofPlayer.h:78
void SetWorkerOrdinal(const char *ordinal)
long long Long64_t
Definition: RtypesCore.h:69
EExitStatus fExitStatus
status of query in progress
Definition: TProofPlayer.h:93
void SetNumMergers(Int_t nmergers)
Definition: TQueryResult.h:109
virtual TDSetElement * Next(Long64_t totalEntries=-1)
Returns next TDSetElement.
Definition: TDSet.cxx:394
Int_t fPhysRam
Definition: TSystem.h:169
Int_t AssertSelector(const char *selector_file)
Make sure that a valid selector object Return -1 in case of problems, 0 otherwise.
const char * GetCacheDir() const
Definition: TProofServ.h:262
R__EXTERN Int_t gErrorIgnoreLevel
Definition: TError.h:107
void StoreOutput(TList *out)
Store output list (may not be used in this class).
virtual const char * WorkingDirectory()
Return working directory.
Definition: TSystem.cxx:866
void Print(Option_t *opt="") const
Print query content. Use opt = "F" for a full listing.
UInt_t GetTypeOpt() const
Int_t GetLearnEntries()
Return the number of entries in the learning phase.
static TMD5 * FileChecksum(const char *file)
Returns checksum of specified file.
Definition: TMD5.cxx:474
virtual Bool_t SendProcessingProgress(Double_t, Double_t, Bool_t=kFALSE)
void SetMemValues(Long_t vmem=-1, Long_t rmem=-1, Bool_t master=kFALSE)
Set max memory values.
Definition: TStatus.cxx:159
Long_t fFeedbackPeriod
timer for sending intermediate results
Definition: TProofPlayer.h:90
const double pi
Collectable string class.
Definition: TObjString.h:32
#define kPEX_ABORTED
float Float_t
Definition: RtypesCore.h:53
Long64_t fBytesRead
Definition: TProof.h:195
virtual void SetDirectory(TDirectory *dir)
By default when an histogram is created, it is added to the list of histogram objects in the current ...
Definition: TH1.cxx:8051
void SetDir(const char *dir, Bool_t raw=kFALSE)
TProofLockPath * GetCacheLock()
Definition: TProofServ.h:295
return c
const char Option_t
Definition: RtypesCore.h:62
virtual TString SplitAclicMode(const char *filename, TString &mode, TString &args, TString &io) const
This method split a filename of the form: ~~~ {.cpp} [path/]macro.C[+|++[k|f|g|O|c|s|d|v|-]][(args)]...
Definition: TSystem.cxx:4109
const char * GetTopSessionTag() const
Definition: TProofServ.h:260
float ymin
Definition: THbookFile.cxx:93
Long64_t GetMsgSizeHWM() const
Definition: TProofServ.h:288
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
Definition: TSocket.cxx:520
This class represents a WWW compatible URL.
Definition: TUrl.h:41
virtual Bool_t ProcessCut(Long64_t)
Definition: TSelector.cxx:262
TString & ReplaceAll(const TString &s1, const TString &s2)
Definition: TString.h:635
void SetWriteV3(Bool_t on=kTRUE)
Set/Reset the &#39;OldStreamer&#39; bit in this instance and its elements.
Definition: TDSet.cxx:1856
static void FilterLocalroot(TString &path, const char *url="root://dum/")
If &#39;path&#39; is local and &#39;dsrv&#39; is Xrootd, apply &#39;path.Localroot&#39; settings, if any. ...
TUrl * GetCurrentUrl() const
Return the current url.
Definition: TFileInfo.cxx:248
TQueryResult * GetQueryResult(const char *ref)
Get query result instances referenced &#39;ref&#39; from the list of results.
virtual Bool_t JoinProcess(TList *workers)
Not implemented: meaningful only in the remote player. Returns kFALSE.
TObject * GetParameter(const char *par) const
Get specified parameter.
Definition: TProof.cxx:9890
Bool_t CheckMemUsage(Long64_t &mfreq, Bool_t &w80r, Bool_t &w80v, TString &wmsg)
Check the memory usage, if requested.
Bool_t TestBit(UInt_t f) const
Definition: TObject.h:157
const char * GetProtocol() const
Definition: TUrl.h:73
This class implements a data set to be used for PROOF processing.
Definition: TDSet.h:153
virtual Long64_t Finalize(Bool_t force=kFALSE, Bool_t sync=kFALSE)
Finalize query (may not be used in this class).
Bool_t fSaveResultsPerPacket
Definition: TProofPlayer.h:119
TProofPlayer(TProof *proof=0)
Default ctor.
virtual void SetName(const char *name)
Set the name of the TNamed.
Definition: TNamed.cxx:131
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
Bool_t GetValid() const
Definition: TDSet.h:121
void StopProcess(Bool_t abort, Int_t timeout=-1)
Stop process after this event.
TH1 * h
Definition: legend2.C:5
TQueryResult * fPreviousQuery
Definition: TProofPlayer.h:103
Bool_t IsRegister() const
TSocket * GetSocket() const
Definition: TProofServ.h:271
void Add(const char *mesg)
Add an error message.
Definition: TStatus.cxx:46
Double_t GetProcTime() const
TDatime GetStartTime() const
Definition: TQueryResult.h:125
virtual Bool_t Merge(Bool_t=kTRUE)
Merge the files.
void AddOutput(TList *out)
Incorporate output list (may not be used in this class).
A ROOT file is a suite of consecutive data records (TKey instances) with a well defined format...
Definition: TFile.h:50
void SetupFeedback()
Setup reporting of feedback objects.
virtual Int_t GetEntries() const
Definition: TCollection.h:92
virtual TObject * Last() const
Return the last object in the list. Returns 0 when list is empty.
Definition: TList.cxx:581
static Long_t GetVirtMemMax()
VirtMemMax getter.
virtual int MakeDirectory(const char *name)
Make a directory.
Definition: TSystem.cxx:822
virtual void AddFirst(TObject *obj)
Add object at the beginning of the list.
Definition: TList.cxx:93
Long64_t GetEventsProcessed() const
Definition: TProofPlayer.h:224
virtual void MergeOutput(Bool_t savememvalues=kFALSE)
Merge objects in output the lists.
void SetLastUpdate(Double_t updtTime=0)
Update time stamp either with the passed value (if > 0) or with the current time. ...
virtual TObject * Get(const char *namecycle)
Return pointer to object identified by namecycle.
virtual Bool_t ChangeDirectory(const char *path)
Change directory.
Definition: TSystem.cxx:857
void RemoveQueryResult(const char *ref)
Remove all query result instances referenced &#39;ref&#39; from the list of results.
TTimer * fProcTimeTimer
Definition: TProofPlayer.h:112
virtual Int_t GetNbinsZ() const
Definition: TH1.h:303
Bool_t HandleTimer(TTimer *timer)
Send progress and feedback to client.
#define gROOT
Definition: TROOT.h:364
Int_t AdoptFile(TFile *f)
Adopt a file already open.
TDSetElement * GetNextPacket(TSlave *slave, TMessage *r)
Get next packet for specified slave.
Long64_t GetFirst() const
Definition: TDSet.h:114
Float_t fEvtRateI
Definition: TProof.h:198
virtual int Load(const char *module, const char *entry="", Bool_t system=kFALSE)
Load a shared library.
Definition: TSystem.cxx:1819
const char * GetOrdinal() const
Definition: TSlave.h:135
Implement Tree drawing using PROOF.
Definition: TProofDraw.h:57
Basic string class.
Definition: TString.h:137
virtual Int_t GetNextPacket(Long64_t &first, Long64_t &num)=0
void SetReadCalls(Long64_t readCalls)
int Int_t
Definition: RtypesCore.h:41
virtual const char * DirName(const char *pathname)
Return the directory name in pathname.
Definition: TSystem.cxx:997
bool Bool_t
Definition: RtypesCore.h:59
const Bool_t kFALSE
Definition: Rtypes.h:92
#define ENDTRY
Definition: TException.h:73
Bool_t IsMerge() const
Bool_t IsMaster() const
Definition: TProof.h:966
virtual char * Which(const char *search, const char *file, EAccessMode mode=kFileExists)
Find location of file in a search path