Logo ROOT   6.21/01
Reference Guide
TProofPlayer.cxx
Go to the documentation of this file.
1 // @(#)root/proofplayer:$Id$
2 // Author: Maarten Ballintijn 07/01/02
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2001, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 /** \class TProofPlayer
13 \ingroup proofkernel
14 
15 Internal class steering processing in PROOF.
16 Instances of the TProofPlayer class are created on the worker nodes
17 per session and do the processing.
18 Instances of its subclass - TProofPlayerRemote are created per each
19 query on the master(s) and on the client. On the master(s),
20 TProofPlayerRemote coordinate processing, check the dataset, create
21 the packetizer and take care of merging the results of the workers.
22 The instance on the client collects information on the input
23 (dataset and selector), it invokes the Begin() method and finalizes
24 the query by calling Terminate().
25 
26 */
27 
28 #include "TProofDraw.h"
29 #include "TProofPlayer.h"
30 #include "THashList.h"
31 #include "TEnv.h"
32 #include "TEventIter.h"
33 #include "TVirtualPacketizer.h"
34 #include "TSelector.h"
35 #include "TSocket.h"
36 #include "TProofServ.h"
37 #include "TProof.h"
38 #include "TProofOutputFile.h"
39 #include "TProofSuperMaster.h"
40 #include "TSlave.h"
41 #include "TClass.h"
42 #include "TROOT.h"
43 #include "TError.h"
44 #include "TException.h"
45 #include "MessageTypes.h"
46 #include "TMessage.h"
47 #include "TDSetProxy.h"
48 #include "TString.h"
49 #include "TSystem.h"
50 #include "TFile.h"
51 #include "TFileCollection.h"
52 #include "TFileInfo.h"
53 #include "TFileMerger.h"
54 #include "TProofDebug.h"
55 #include "TTimer.h"
56 #include "TMap.h"
57 #include "TPerfStats.h"
58 #include "TStatus.h"
59 #include "TEventList.h"
60 #include "TProofLimitsFinder.h"
61 #include "THashList.h"
62 #include "TSortedList.h"
63 #include "TTree.h"
64 #include "TEntryList.h"
65 #include "TDSet.h"
66 #include "TDrawFeedback.h"
67 #include "TNamed.h"
68 #include "TObjString.h"
69 #include "TQueryResult.h"
70 #include "TMD5.h"
71 #include "TMethodCall.h"
72 #include "TObjArray.h"
73 #include "TH1.h"
74 #include "TVirtualMonitoring.h"
75 #include "TParameter.h"
77 #include "TStopwatch.h"
78 
79 // Timeout exception
80 #define kPEX_STOPPED 1001
81 #define kPEX_ABORTED 1002
82 
83 // To flag an abort condition: use a local static variable to avoid
84 // warnings about problems with longjumps
85 static Bool_t gAbort = kFALSE;
86 
87 class TAutoBinVal : public TNamed {
88 private:
89  Double_t fXmin, fXmax, fYmin, fYmax, fZmin, fZmax;
90 
91 public:
92  TAutoBinVal(const char *name, Double_t xmin, Double_t xmax, Double_t ymin,
93  Double_t ymax, Double_t zmin, Double_t zmax) : TNamed(name,"")
94  {
95  fXmin = xmin; fXmax = xmax;
96  fYmin = ymin; fYmax = ymax;
97  fZmin = zmin; fZmax = zmax;
98  }
99  void GetAll(Double_t& xmin, Double_t& xmax, Double_t& ymin,
100  Double_t& ymax, Double_t& zmin, Double_t& zmax)
101  {
102  xmin = fXmin; xmax = fXmax;
103  ymin = fYmin; ymax = fYmax;
104  zmin = fZmin; zmax = fZmax;
105  }
106 
107 };
108 
109 //
110 // Special timer to dispatch pending events while processing
111 ////////////////////////////////////////////////////////////////////////////////
112 
113 class TDispatchTimer : public TTimer {
114 private:
115  TProofPlayer *fPlayer;
116 
117 public:
118  TDispatchTimer(TProofPlayer *p) : TTimer(1000, kFALSE), fPlayer(p) { }
119 
120  Bool_t Notify();
121 };
122 ////////////////////////////////////////////////////////////////////////////////
123 /// Handle expiration of the timer associated with dispatching pending
124 /// events while processing. We must act as fast as possible here, so
125 /// we just set a flag submitting a request for dispatching pending events
126 
127 Bool_t TDispatchTimer::Notify()
128 {
129  if (gDebug > 0) printf("TDispatchTimer::Notify: called!\n");
130 
131  fPlayer->SetBit(TProofPlayer::kDispatchOneEvent);
132 
133  // Needed for the next shot
134  Reset();
135  return kTRUE;
136 }
137 
138 //
139 // Special timer to notify reach of max packet proc time
140 ////////////////////////////////////////////////////////////////////////////////
141 
142 class TProctimeTimer : public TTimer {
143 private:
144  TProofPlayer *fPlayer;
145 
146 public:
147  TProctimeTimer(TProofPlayer *p, Long_t to) : TTimer(to, kFALSE), fPlayer(p) { }
148 
149  Bool_t Notify();
150 };
151 ////////////////////////////////////////////////////////////////////////////////
152 /// Handle expiration of the timer associated with dispatching pending
153 /// events while processing. We must act as fast as possible here, so
154 /// we just set a flag submitting a request for dispatching pending events
155 
156 Bool_t TProctimeTimer::Notify()
157 {
158  if (gDebug > 0) printf("TProctimeTimer::Notify: called!\n");
159 
160  fPlayer->SetBit(TProofPlayer::kMaxProcTimeReached);
161 
162  // One shot only
163  return kTRUE;
164 }
165 
166 //
167 // Special timer to handle stop/abort request via exception raising
168 ////////////////////////////////////////////////////////////////////////////////
169 
170 class TStopTimer : public TTimer {
171 private:
172  Bool_t fAbort;
173  TProofPlayer *fPlayer;
174 
175 public:
176  TStopTimer(TProofPlayer *p, Bool_t abort, Int_t to);
177 
178  Bool_t Notify();
179 };
180 
181 ////////////////////////////////////////////////////////////////////////////////
182 /// Constructor for the timer to stop/abort processing.
183 /// The 'timeout' is in seconds.
184 /// Make sure that 'to' make sense, i.e. not larger than 10 days;
185 /// the minimum value is 10 ms (0 does not seem to start the timer ...).
186 
187 TStopTimer::TStopTimer(TProofPlayer *p, Bool_t abort, Int_t to)
188  : TTimer(((to <= 0 || to > 864000) ? 10 : to * 1000), kFALSE)
189 {
190  if (gDebug > 0)
191  Info ("TStopTimer","enter: %d, timeout: %d", abort, to);
192 
193  fPlayer = p;
194  fAbort = abort;
195 
196  if (gDebug > 1)
197  Info ("TStopTimer","timeout set to %s ms", fTime.AsString());
198 }
199 
200 ////////////////////////////////////////////////////////////////////////////////
201 /// Handle the signal coming from the expiration of the timer
202 /// associated with an abort or stop request.
203 /// We raise an exception which will be processed in the
204 /// event loop.
205 
206 Bool_t TStopTimer::Notify()
207 {
208  if (gDebug > 0) printf("TStopTimer::Notify: called!\n");
209 
210  if (fAbort)
212  else
214 
215  return kTRUE;
216 }
217 
218 //------------------------------------------------------------------------------
219 
221 
223 
224 ////////////////////////////////////////////////////////////////////////////////
225 /// Default ctor.
226 
228  : fAutoBins(0), fOutput(0), fSelector(0), fCreateSelObj(kTRUE), fSelectorClass(0),
229  fFeedbackTimer(0), fFeedbackPeriod(2000),
230  fEvIter(0), fSelStatus(0),
231  fTotalEvents(0), fReadBytesRun(0), fReadCallsRun(0), fProcessedRun(0),
232  fQueryResults(0), fQuery(0), fPreviousQuery(0), fDrawQueries(0),
233  fMaxDrawQueries(1), fStopTimer(0), fDispatchTimer(0),
234  fProcTimeTimer(0), fProcTime(0),
235  fOutputFile(0),
236  fSaveMemThreshold(-1), fSavePartialResults(kFALSE), fSaveResultsPerPacket(kFALSE)
237 {
238  fInput = new TList;
245 
246  static Bool_t initLimitsFinder = kFALSE;
247  if (!initLimitsFinder && gProofServ && !gProofServ->IsMaster()) {
249  initLimitsFinder = kTRUE;
250  }
251 
252 }
253 
254 ////////////////////////////////////////////////////////////////////////////////
255 /// Destructor.
256 
258 {
259  fInput->Clear("nodelete");
261  // The output list is owned by fSelector and destroyed in there
270 }
271 
272 ////////////////////////////////////////////////////////////////////////////////
273 /// Set processing bit according to 'on'
274 
276 {
277  if (on)
279  else
281 }
282 
283 ////////////////////////////////////////////////////////////////////////////////
284 /// Stop the process after this event. If timeout is positive, start
285 /// a timer firing after timeout seconds to hard-stop time-expensive
286 /// events.
287 
289 {
290  if (gDebug > 0)
291  Info ("StopProcess","abort: %d, timeout: %d", abort, timeout);
292 
293  if (fEvIter != 0)
294  fEvIter->StopProcess(abort);
295  Long_t to = 1;
296  if (abort == kTRUE) {
298  } else {
300  to = timeout;
301  }
302  // Start countdown, if needed
303  if (to > 0)
304  SetStopTimer(kTRUE, abort, to);
305 }
306 
307 ////////////////////////////////////////////////////////////////////////////////
308 /// Enable/disable the timer to dispatch pening events while processing.
309 
311 {
314  if (on) {
315  fDispatchTimer = new TDispatchTimer(this);
317  }
318 }
319 
320 ////////////////////////////////////////////////////////////////////////////////
321 /// Enable/disable the timer to stop/abort processing.
322 /// The 'timeout' is in seconds.
323 
325 {
326  std::lock_guard<std::mutex> lock(fStopTimerMtx);
327 
328  // Clean-up the timer
330  if (on) {
331  // create timer
332  fStopTimer = new TStopTimer(this, abort, timeout);
333  // Start the countdown
334  fStopTimer->Start();
335  if (gDebug > 0)
336  Info ("SetStopTimer", "%s timer STARTED (timeout: %d)",
337  (abort ? "ABORT" : "STOP"), timeout);
338  } else {
339  if (gDebug > 0)
340  Info ("SetStopTimer", "timer STOPPED");
341  }
342 }
343 
344 ////////////////////////////////////////////////////////////////////////////////
345 /// Add query result to the list, making sure that there are no
346 /// duplicates.
347 
349 {
350  if (!q) {
351  Warning("AddQueryResult","query undefined - do nothing");
352  return;
353  }
354 
355  // Treat differently normal and draw queries
356  if (!(q->IsDraw())) {
357  if (!fQueryResults) {
358  fQueryResults = new TList;
359  fQueryResults->Add(q);
360  } else {
361  TIter nxr(fQueryResults);
362  TQueryResult *qr = 0;
363  TQueryResult *qp = 0;
364  while ((qr = (TQueryResult *) nxr())) {
365  // If same query, remove old version and break
366  if (*qr == *q) {
367  fQueryResults->Remove(qr);
368  delete qr;
369  break;
370  }
371  // Record position according to start time
372  if (qr->GetStartTime().Convert() <= q->GetStartTime().Convert())
373  qp = qr;
374  }
375 
376  if (!qp) {
378  } else {
379  fQueryResults->AddAfter(qp, q);
380  }
381  }
382  } else if (IsClient()) {
383  // If max reached, eliminate first the oldest one
385  TIter nxr(fQueryResults);
386  TQueryResult *qr = 0;
387  while ((qr = (TQueryResult *) nxr())) {
388  // If same query, remove old version and break
389  if (qr->IsDraw()) {
390  fDrawQueries--;
391  fQueryResults->Remove(qr);
392  delete qr;
393  break;
394  }
395  }
396  }
397  // Add new draw query
398  if (fDrawQueries >= 0 && fDrawQueries < fMaxDrawQueries) {
399  fDrawQueries++;
400  if (!fQueryResults)
401  fQueryResults = new TList;
402  fQueryResults->Add(q);
403  }
404  }
405 }
406 
407 ////////////////////////////////////////////////////////////////////////////////
408 /// Remove all query result instances referenced 'ref' from
409 /// the list of results.
410 
411 void TProofPlayer::RemoveQueryResult(const char *ref)
412 {
413  if (fQueryResults) {
414  TIter nxq(fQueryResults);
415  TQueryResult *qr = 0;
416  while ((qr = (TQueryResult *) nxq())) {
417  if (qr->Matches(ref)) {
418  fQueryResults->Remove(qr);
419  delete qr;
420  }
421  }
422  }
423 }
424 
425 ////////////////////////////////////////////////////////////////////////////////
426 /// Get query result instances referenced 'ref' from
427 /// the list of results.
428 
430 {
431  if (fQueryResults) {
432  if (ref && strlen(ref) > 0) {
433  TIter nxq(fQueryResults);
434  TQueryResult *qr = 0;
435  while ((qr = (TQueryResult *) nxq())) {
436  if (qr->Matches(ref))
437  return qr;
438  }
439  } else {
440  // Get last
441  return (TQueryResult *) fQueryResults->Last();
442  }
443  }
444 
445  // Nothing found
446  return (TQueryResult *)0;
447 }
448 
449 ////////////////////////////////////////////////////////////////////////////////
450 /// Set current query and save previous value.
451 
453 {
455  fQuery = q;
456 }
457 
458 ////////////////////////////////////////////////////////////////////////////////
459 /// Add object to input list.
460 
462 {
463  fInput->Add(inp);
464 }
465 
466 ////////////////////////////////////////////////////////////////////////////////
467 /// Clear input list.
468 
470 {
471  fInput->Clear();
472 }
473 
474 ////////////////////////////////////////////////////////////////////////////////
475 /// Get output object by name.
476 
478 {
479  if (fOutput)
480  return fOutput->FindObject(name);
481  return 0;
482 }
483 
484 ////////////////////////////////////////////////////////////////////////////////
485 /// Get output list.
486 
488 {
489  TList *ol = fOutput;
490  if (!ol && fQuery)
491  ol = fQuery->GetOutputList();
492  return ol;
493 }
494 
495 ////////////////////////////////////////////////////////////////////////////////
496 /// Reinitialize fSelector using the selector files in the query result.
497 /// Needed when Finalize is called after a Process execution for the same
498 /// selector name.
499 
501 {
502  Int_t rc = 0;
503 
504  // Make sure we have a query
505  if (!qr) {
506  Info("ReinitSelector", "query undefined - do nothing");
507  return -1;
508  }
509 
510  // Selector name
511  TString selec = qr->GetSelecImp()->GetName();
512  if (selec.Length() <= 0) {
513  Info("ReinitSelector", "selector name undefined - do nothing");
514  return -1;
515  }
516 
517  // Find out if this is a standard selection used for Draw actions
518  Bool_t stdselec = TSelector::IsStandardDraw(selec);
519 
520  // Find out if this is a precompiled selector: in such a case we do not
521  // have the code in TMacros, so we must rely on local libraries
522  Bool_t compselec = (selec.Contains(".") || stdselec) ? kFALSE : kTRUE;
523 
524  // If not, find out if it needs to be expanded
525  TString ipathold;
526  if (!stdselec && !compselec) {
527  // Check checksums for the versions of the selector files
528  Bool_t expandselec = kTRUE;
529  TString dir, ipath;
530  char *selc = gSystem->Which(TROOT::GetMacroPath(), selec, kReadPermission);
531  if (selc) {
532  // Check checksums
533  TMD5 *md5icur = 0, *md5iold = 0, *md5hcur = 0, *md5hold = 0;
534  // Implementation files
535  md5icur = TMD5::FileChecksum(selc);
536  md5iold = qr->GetSelecImp()->Checksum();
537  // Header files
538  TString selh(selc);
539  Int_t dot = selh.Last('.');
540  if (dot != kNPOS) selh.Remove(dot);
541  selh += ".h";
543  md5hcur = TMD5::FileChecksum(selh);
544  md5hold = qr->GetSelecHdr()->Checksum();
545 
546  // If nothing has changed nothing to do
547  if (md5hcur && md5hold && md5icur && md5iold)
548  if (*md5hcur == *md5hold && *md5icur == *md5iold)
549  expandselec = kFALSE;
550 
551  SafeDelete(md5icur);
552  SafeDelete(md5hcur);
553  SafeDelete(md5iold);
554  SafeDelete(md5hold);
555  if (selc) delete [] selc;
556  }
557 
558  Bool_t ok = kTRUE;
559  // Expand selector files, if needed
560  if (expandselec) {
561 
562  ok = kFALSE;
563  // Expand files in a temporary directory
564  TUUID u;
565  dir = Form("%s/%s",gSystem->TempDirectory(),u.AsString());
566  if (!(gSystem->MakeDirectory(dir))) {
567 
568  // Export implementation file
569  selec = Form("%s/%s",dir.Data(),selec.Data());
570  qr->GetSelecImp()->SaveSource(selec);
571 
572  // Export header file
573  TString seleh = Form("%s/%s",dir.Data(),qr->GetSelecHdr()->GetName());
574  qr->GetSelecHdr()->SaveSource(seleh);
575 
576  // Adjust include path
577  ipathold = gSystem->GetIncludePath();
578  ipath = Form("-I%s %s", dir.Data(), gSystem->GetIncludePath());
579  gSystem->SetIncludePath(ipath.Data());
580 
581  ok = kTRUE;
582  }
583  }
584  TString opt(qr->GetOptions());
585  Ssiz_t id = opt.Last('#');
586  if (id != kNPOS && id < opt.Length() - 1)
587  selec += opt(id + 1, opt.Length());
588 
589  if (!ok) {
590  Info("ReinitSelector", "problems locating or exporting selector files");
591  return -1;
592  }
593  }
594 
595  // Cleanup previous stuff
597  fSelectorClass = 0;
598 
599  // Init the selector now
600  Int_t iglevelsave = gErrorIgnoreLevel;
601  if (compselec)
602  // Silent error printout on first attempt
604 
605  if ((fSelector = TSelector::GetSelector(selec))) {
606  if (compselec)
607  gErrorIgnoreLevel = iglevelsave; // restore ignore level
608  fSelectorClass = fSelector->IsA();
610 
611  } else {
612  if (compselec) {
613  gErrorIgnoreLevel = iglevelsave; // restore ignore level
614  // Retry by loading first the libraries listed in TQueryResult, if any
615  if (strlen(qr->GetLibList()) > 0) {
616  TString sl(qr->GetLibList());
617  TObjArray *oa = sl.Tokenize(" ");
618  if (oa) {
619  Bool_t retry = kFALSE;
620  TIter nxl(oa);
621  TObjString *os = 0;
622  while ((os = (TObjString *) nxl())) {
623  TString lib = gSystem->BaseName(os->GetName());
624  if (lib != "lib") {
625  lib.ReplaceAll("-l", "lib");
626  if (gSystem->Load(lib) == 0)
627  retry = kTRUE;
628  }
629  }
630  // Retry now, if the case
631  if (retry)
633  }
634  }
635  }
636  if (!fSelector) {
637  if (compselec)
638  Info("ReinitSelector", "compiled selector re-init failed:"
639  " automatic reload unsuccessful:"
640  " please load manually the correct library");
641  rc = -1;
642  }
643  }
644  if (fSelector) {
645  // Draw needs to reinit temp histos
647  if (stdselec) {
648  ((TProofDraw *)fSelector)->DefVar();
649  } else {
650  // variables may have been initialized in Begin()
651  fSelector->Begin(0);
652  }
653  }
654 
655  // Restore original include path, if needed
656  if (ipathold.Length() > 0)
657  gSystem->SetIncludePath(ipathold.Data());
658 
659  return rc;
660 }
661 
662 ////////////////////////////////////////////////////////////////////////////////
663 /// Incorporate output object (may not be used in this class).
664 
666 {
667  MayNotUse("AddOutputObject");
668  return -1;
669 }
670 
671 ////////////////////////////////////////////////////////////////////////////////
672 /// Incorporate output list (may not be used in this class).
673 
675 {
676  MayNotUse("AddOutput");
677 }
678 
679 ////////////////////////////////////////////////////////////////////////////////
680 /// Store output list (may not be used in this class).
681 
683 {
684  MayNotUse("StoreOutput");
685 }
686 
687 ////////////////////////////////////////////////////////////////////////////////
688 /// Store feedback list (may not be used in this class).
689 
691 {
692  MayNotUse("StoreFeedback");
693 }
694 
695 ////////////////////////////////////////////////////////////////////////////////
696 /// Report progress (may not be used in this class).
697 
698 void TProofPlayer::Progress(Long64_t /*total*/, Long64_t /*processed*/)
699 {
700  MayNotUse("Progress");
701 }
702 
703 ////////////////////////////////////////////////////////////////////////////////
704 /// Report progress (may not be used in this class).
705 
706 void TProofPlayer::Progress(Long64_t /*total*/, Long64_t /*processed*/,
707  Long64_t /*bytesread*/,
708  Float_t /*evtRate*/, Float_t /*mbRate*/,
709  Float_t /*evtrti*/, Float_t /*mbrti*/)
710 {
711  MayNotUse("Progress");
712 }
713 
714 ////////////////////////////////////////////////////////////////////////////////
715 /// Report progress (may not be used in this class).
716 
718 {
719  MayNotUse("Progress");
720 }
721 
722 ////////////////////////////////////////////////////////////////////////////////
723 /// Set feedback list (may not be used in this class).
724 
726 {
727  MayNotUse("Feedback");
728 }
729 
730 ////////////////////////////////////////////////////////////////////////////////
731 /// Draw feedback creation proxy. When accessed via TProof avoids
732 /// link dependency on libProofPlayer.
733 
735 {
736  return new TDrawFeedback(p);
737 }
738 
739 ////////////////////////////////////////////////////////////////////////////////
740 /// Set draw feedback option.
741 
743 {
744  if (f)
745  f->SetOption(opt);
746 }
747 
748 ////////////////////////////////////////////////////////////////////////////////
749 /// Delete draw feedback object.
750 
752 {
753  delete f;
754 }
755 
756 ////////////////////////////////////////////////////////////////////////////////
757 /// Save the partial results of this query to a dedicated file under the user
758 /// data directory. The file name has the form
759 /// <session_tag>.q<query_seq_num>.root
760 /// The file pat and the file are created if not existing already.
761 /// Only objects in the outputlist not being TProofOutputFile are saved.
762 /// The packets list 'packets' is saved if given.
763 /// Trees not attached to any file are attached to the open file.
764 /// If 'queryend' is kTRUE evrything is written out (TTrees included).
765 /// The actual saving action is controlled by 'force' and by fSavePartialResults /
766 /// fSaveResultsPerPacket:
767 ///
768 /// fSavePartialResults = kFALSE/kTRUE no-saving/saving
769 /// fSaveResultsPerPacket = kFALSE/kTRUE save-per-query/save-per-packet
770 ///
771 /// The function CheckMemUsage sets fSavePartialResults = 1 if fSaveMemThreshold > 0 and
772 /// ProcInfo_t::fMemResident >= fSaveMemThreshold: from that point on partial results
773 /// are always saved and expensive calls to TSystem::GetProcInfo saved.
774 /// The switch fSaveResultsPerPacket is instead controlled by the user or admin
775 /// who can also force saving in all cases; parameter PROOF_SavePartialResults or
776 /// RC env ProofPlayer.SavePartialResults .
777 /// However, if 'force' is kTRUE, fSavePartialResults and fSaveResultsPerPacket
778 /// are ignored.
779 /// Return -1 in case of problems, 0 otherwise.
780 
782 {
783  Bool_t save = (force || (fSavePartialResults &&
784  (queryend || fSaveResultsPerPacket))) ? kTRUE : kFALSE;
785  if (!save) {
786  PDB(kOutput, 2)
787  Info("SavePartialResults", "partial result saving disabled");
788  return 0;
789  }
790 
791  // Sanity check
792  if (!gProofServ) {
793  Error("SavePartialResults", "gProofServ undefined: something really wrong going on!!!");
794  return -1;
795  }
796  if (!fOutput) {
797  Error("SavePartialResults", "fOutput undefined: something really wrong going on!!!");
798  return -1;
799  }
800 
801  PDB(kOutput, 1)
802  Info("SavePartialResults", "start saving partial results {%d,%d,%d,%d}",
803  queryend, force, fSavePartialResults, fSaveResultsPerPacket);
804 
805  // Get list of processed packets from the iterator
806  PDB(kOutput, 2) Info("SavePartialResults", "fEvIter: %p", fEvIter);
807 
808  TList *packets = (fEvIter) ? fEvIter->GetPackets() : 0;
809  PDB(kOutput, 2) Info("SavePartialResults", "list of packets: %p, sz: %d",
810  packets, (packets ? packets->GetSize(): -1));
811 
812  // Open the file
813  const char *oopt = "UPDATE";
814  // Check if the file has already been defined
815  TString baseName(fOutputFilePath);
816  if (fOutputFilePath.IsNull()) {
817  baseName.Form("output-%s.q%d.root", gProofServ->GetTopSessionTag(), gProofServ->GetQuerySeqNum());
818  if (gProofServ->GetDataDirOpts() && strlen(gProofServ->GetDataDirOpts()) > 0) {
819  fOutputFilePath.Form("%s/%s?%s", gProofServ->GetDataDir(), baseName.Data(),
821  } else {
822  fOutputFilePath.Form("%s/%s", gProofServ->GetDataDir(), baseName.Data());
823  }
824  Info("SavePartialResults", "file with (partial) output: '%s'", fOutputFilePath.Data());
825  oopt = "RECREATE";
826  }
827  // Open the file in write mode
828  if (!(fOutputFile = TFile::Open(fOutputFilePath, oopt)) ||
829  (fOutputFile && fOutputFile->IsZombie())) {
830  Error("SavePartialResults", "cannot open '%s' for writing", fOutputFilePath.Data());
832  return -1;
833  }
834 
835  // Save current directory
836  TDirectory *curdir = gDirectory;
837  fOutputFile->cd();
838 
839  // Write first the packets list, if required
840  if (packets) {
841  TDirectory *packetsDir = fOutputFile->mkdir("packets");
842  if (packetsDir) packetsDir->cd();
844  fOutputFile->cd();
845  }
846 
847  Bool_t notempty = kFALSE;
848  // Write out the output list
849  TList torm;
850  TIter nxo(fOutput);
851  TObject *o = 0;
852  while ((o = nxo())) {
853  // Skip output file drivers
854  if (o->InheritsFrom(TProofOutputFile::Class())) continue;
855  // Skip control objets
856  if (!strncmp(o->GetName(), "PROOF_", 6)) continue;
857  // Skip data members mapping
859  // Skip missing file info
860  if (!strcmp(o->GetName(), "MissingFiles")) continue;
861  // Trees need a special treatment
862  if (o->InheritsFrom("TTree")) {
863  TTree *t = (TTree *) o;
864  TDirectory *d = t->GetDirectory();
865  // If the tree is not attached to any file ...
866  if (!d || (d && !d->InheritsFrom("TFile"))) {
867  // ... we attach it
869  }
870  if (t->GetDirectory() == fOutputFile) {
871  if (queryend) {
872  // ... we write it out
873  o->Write(0, TObject::kOverwrite);
874  // At least something in the file
875  notempty = kTRUE;
876  // Flag for removal from the outputlist
877  torm.Add(o);
878  // Prevent double-deletion attempts
879  t->SetDirectory(0);
880  } else {
881  // ... or we set in automatic flush mode
882  t->SetAutoFlush();
883  }
884  }
885  } else if (queryend || fSaveResultsPerPacket) {
886  // Save overwriting what's already there
887  o->Write(0, TObject::kOverwrite);
888  // At least something in the file
889  notempty = kTRUE;
890  // Flag for removal from the outputlist
891  if (queryend) torm.Add(o);
892  }
893  }
894 
895  // Restore previous directory
896  gDirectory = curdir;
897 
898  // Close the file if required
899  if (notempty) {
900  if (!fOutput->FindObject(baseName)) {
901  TProofOutputFile *po = 0;
902  // Get directions
903  TNamed *nm = (TNamed *) fInput->FindObject("PROOF_DefaultOutputOption");
904  TString oname = (nm) ? nm->GetTitle() : fOutputFilePath.Data();
905  if (nm && oname.BeginsWith("ds:")) {
906  oname.Replace(0, 3, "");
907  TString qtag =
909  oname.ReplaceAll("<qtag>", qtag);
910  // Create the TProofOutputFile for dataset creation
911  po = new TProofOutputFile(baseName, "DRO", oname.Data());
912  } else {
913  Bool_t hasddir = kFALSE;
914  // Create the TProofOutputFile for automatic merging
915  po = new TProofOutputFile(baseName, "M");
916  if (oname.BeginsWith("of:")) oname.Replace(0, 3, "");
917  if (gProofServ->IsTopMaster()) {
918  if (!strcmp(TUrl(oname, kTRUE).GetProtocol(), "file")) {
919  TString dsrv;
921  TProofServ::FilterLocalroot(oname, dsrv);
922  oname.Insert(0, dsrv);
923  }
924  } else {
925  if (nm) {
926  // The name has been sent by the client: resolve local place holders
927  oname.ReplaceAll("<file>", baseName);
928  } else {
929  // We did not get any indication; the final file will be in the datadir on
930  // the top master and it will be resolved there
931  oname.Form("<datadir>/%s", baseName.Data());
932  hasddir = kTRUE;
933  }
934  }
935  po->SetOutputFileName(oname.Data());
936  if (hasddir)
937  // Reset the bit, so that <datadir> has a chance to be resolved in AddOutputObject
939  po->SetName(gSystem->BaseName(oname.Data()));
940  }
941  po->AdoptFile(fOutputFile);
942  fOutput->Add(po);
943  // Flag the nature of this file
945  }
946  }
947  fOutputFile->Close();
949 
950  // If last call, cleanup the output list from objects saved to file
951  if (queryend && torm.GetSize() > 0) {
952  TIter nxrm(&torm);
953  while ((o = nxrm())) { fOutput->Remove(o); }
954  }
955  torm.SetOwner(kFALSE);
956 
957  PDB(kOutput, 1)
958  Info("SavePartialResults", "partial results saved to file");
959  // We are done
960  return 0;
961 }
962 
963 ////////////////////////////////////////////////////////////////////////////////
964 /// Make sure that a valid selector object
965 /// Return -1 in case of problems, 0 otherwise
966 
967 Int_t TProofPlayer::AssertSelector(const char *selector_file)
968 {
969  if (selector_file && strlen(selector_file)) {
971 
972  // Get selector files from cache
973  TString ocwd = gSystem->WorkingDirectory();
974  if (gProofServ) {
977  }
978 
979  fSelector = TSelector::GetSelector(selector_file);
980 
981  if (gProofServ) {
982  gSystem->ChangeDirectory(ocwd);
984  }
985 
986  if (!fSelector) {
987  Error("AssertSelector", "cannot load: %s", selector_file );
988  return -1;
989  }
990 
992  Info("AssertSelector", "Processing via filename (%s)", selector_file);
993  } else if (!fSelector) {
994  Error("AssertSelector", "no TSelector object define : cannot continue!");
995  return -1;
996  } else {
997  Info("AssertSelector", "Processing via TSelector object");
998  }
999  // Done
1000  return 0;
1001 }
1002 ////////////////////////////////////////////////////////////////////////////////
1003 /// Update fProgressStatus
1004 
1006 {
1007  if (fProgressStatus) {
1012  if (gMonitoringWriter)
1015  fProcessedRun = 0;
1016  }
1017 }
1018 
1019 ////////////////////////////////////////////////////////////////////////////////
1020 /// Process specified TDSet on PROOF worker.
1021 /// The return value is -1 in case of error and TSelector::GetStatus()
1022 /// in case of success.
1023 
1024 Long64_t TProofPlayer::Process(TDSet *dset, const char *selector_file,
1025  Option_t *option, Long64_t nentries,
1026  Long64_t first)
1027 {
1028  PDB(kGlobal,1) Info("Process","Enter");
1029 
1031  fOutput = 0;
1032 
1033  TCleanup clean(this);
1034 
1035  fSelectorClass = 0;
1036  TString wmsg;
1037  TRY {
1038  if (AssertSelector(selector_file) != 0 || !fSelector) {
1039  Error("Process", "cannot assert the selector object");
1040  return -1;
1041  }
1042 
1043  fSelectorClass = fSelector->IsA();
1044  Int_t version = fSelector->Version();
1045  if (version == 0 && IsClient()) fSelector->GetOutputList()->Clear();
1046 
1048 
1049  if (gProofServ)
1051 
1052  fSelStatus = new TStatus;
1054 
1055  fSelector->SetOption(option);
1057 
1058  // If in sequential (0-PROOF) mode validate the data set to get
1059  // the number of entries
1061  if (fTotalEvents < 0 && gProofServ &&
1063  dset->Validate();
1064  dset->Reset();
1065  TDSetElement *e = 0;
1066  while ((e = dset->Next())) {
1067  fTotalEvents += e->GetNum();
1068  }
1069  }
1070 
1071  dset->Reset();
1072 
1073  // Set parameters controlling the iterator behaviour
1074  Int_t useTreeCache = 1;
1075  if (TProof::GetParameter(fInput, "PROOF_UseTreeCache", useTreeCache) == 0) {
1076  if (useTreeCache > -1 && useTreeCache < 2)
1077  gEnv->SetValue("ProofPlayer.UseTreeCache", useTreeCache);
1078  }
1079  Long64_t cacheSize = -1;
1080  if (TProof::GetParameter(fInput, "PROOF_CacheSize", cacheSize) == 0) {
1081  TString sz = TString::Format("%lld", cacheSize);
1082  gEnv->SetValue("ProofPlayer.CacheSize", sz.Data());
1083  }
1084  // Parallel unzipping
1085  Int_t useParallelUnzip = 0;
1086  if (TProof::GetParameter(fInput, "PROOF_UseParallelUnzip", useParallelUnzip) == 0) {
1087  if (useParallelUnzip > -1 && useParallelUnzip < 2)
1088  gEnv->SetValue("ProofPlayer.UseParallelUnzip", useParallelUnzip);
1089  }
1090  // OS file caching (Mac Os X only)
1091  Int_t dontCacheFiles = 0;
1092  if (TProof::GetParameter(fInput, "PROOF_DontCacheFiles", dontCacheFiles) == 0) {
1093  if (dontCacheFiles == 1)
1094  gEnv->SetValue("ProofPlayer.DontCacheFiles", 1);
1095  }
1097 
1098  // Control file object swap
1099  // <how>*10 + <force>
1100  // <how> = 0 end of run
1101  // 1 after each packet
1102  // <force> = 0 no, swap only if memory threshold is reached
1103  // 1 swap in all cases, accordingly to <how>
1104  Int_t opt = 0;
1105  if (TProof::GetParameter(fInput, "PROOF_SavePartialResults", opt) != 0) {
1106  opt = gEnv->GetValue("ProofPlayer.SavePartialResults", 0);
1107  }
1108  fSaveResultsPerPacket = (opt >= 10) ? kTRUE : kFALSE;
1109  fSavePartialResults = (opt%10 > 0) ? kTRUE : kFALSE;
1110  Info("Process", "save partial results? %d per-packet? %d", fSavePartialResults, fSaveResultsPerPacket);
1111 
1112  // Memory threshold for file object swap
1113  Float_t memfrac = gEnv->GetValue("ProofPlayer.SaveMemThreshold", -1.);
1114  if (memfrac > 0.) {
1115  // The threshold is per core
1116  SysInfo_t si;
1117  if (gSystem->GetSysInfo(&si) == 0) {
1118  fSaveMemThreshold = (Long_t) ((memfrac * si.fPhysRam * 1024.) / si.fCpus);
1119  Info("Process", "memory threshold for saving objects to file set to %ld kB",
1121  } else {
1122  Error("Process", "cannot get SysInfo_t (!)");
1123  }
1124  }
1125 
1126  if (version == 0) {
1127  PDB(kLoop,1) Info("Process","Call Begin(0)");
1128  fSelector->Begin(0);
1129  } else {
1130  if (IsClient()) {
1131  // on client (for local run)
1132  PDB(kLoop,1) Info("Process","Call Begin(0)");
1133  fSelector->Begin(0);
1134  }
1136  PDB(kLoop,1) Info("Process","Call SlaveBegin(0)");
1137  fSelector->SlaveBegin(0); // Init is called explicitly
1138  // from GetNextEvent()
1139  }
1140  }
1141 
1142  } CATCH(excode) {
1144  Error("Process","exception %d caught", excode);
1146  return -1;
1147  } ENDTRY;
1148 
1149  // Save the results, if needed, closing the file
1150  if (SavePartialResults(kFALSE) < 0)
1151  Warning("Process", "problems seetting up file-object swapping");
1152 
1153  // Create feedback lists, if required
1154  SetupFeedback();
1155 
1156  if (gMonitoringWriter)
1158 
1159  PDB(kLoop,1)
1160  Info("Process","Looping over Process()");
1161 
1162  // get the byte read counter at the beginning of processing
1165  fProcessedRun = 0;
1166  // force the first monitoring info
1167  if (gMonitoringWriter)
1169 
1170  // Start asynchronous timer to dispatch pending events
1172 
1173  // Loop over range
1174  gAbort = kFALSE;
1175  Long64_t entry;
1178 
1179  TRY {
1180 
1181  Int_t mrc = -1;
1182  // Get the frequency for checking memory consumption and logging information
1183  Long64_t memlogfreq = -1;
1184  if (((mrc = TProof::GetParameter(fInput, "PROOF_MemLogFreq", memlogfreq))) != 0) memlogfreq = -1;
1185  Long64_t singleshot = 1;
1186  Bool_t warnHWMres = kTRUE, warnHWMvir = kTRUE;
1187  TString lastMsg("(unfortunately no detailed info is available about current packet)");
1188 
1189  // Initial memory footprint
1190  if (!CheckMemUsage(singleshot, warnHWMres, warnHWMvir, wmsg)) {
1191  Error("Process", "%s", wmsg.Data());
1192  wmsg.Insert(0, TString::Format("ERROR:%s, after SlaveBegin(), ", gProofServ ? gProofServ->GetOrdinal() : "gProofServ is nullptr"));
1193  fSelStatus->Add(wmsg.Data());
1194  if (gProofServ) {
1195  gProofServ->SendAsynMessage(wmsg.Data());
1197  }
1200  } else if (!wmsg.IsNull()) {
1201  Warning("Process", "%s", wmsg.Data());
1202  }
1203 
1204  TPair *currentElem = 0;
1205  // The event loop on the worker
1206  Long64_t fst = -1, num;
1207  Long_t maxproctime = -1;
1208  Bool_t newrun = kFALSE;
1209  while ((fEvIter->GetNextPacket(fst, num) != -1) &&
1212  // This is needed by the inflate infrastructure to calculate
1213  // sleeping times
1215 
1216  // Give the possibility to the selector to access additional info in the
1217  // incoming packet
1218  if (dset->Current()) {
1219  if (!currentElem) {
1220  currentElem = new TPair(new TObjString("PROOF_CurrentElement"), dset->Current());
1221  fInput->Add(currentElem);
1222  } else {
1223  if (currentElem->Value() != dset->Current()) {
1224  currentElem->SetValue(dset->Current());
1225  } else if (dset->Current()->TestBit(TDSetElement::kNewRun)) {
1227  }
1228  }
1229  if (dset->Current()->TestBit(TDSetElement::kNewPacket)) {
1230  if (dset->TestBit(TDSet::kEmpty)) {
1231  lastMsg = "check logs for possible stacktrace - last cycle:";
1232  } else {
1233  TDSetElement *elem = dynamic_cast<TDSetElement *>(currentElem->Value());
1234  TString fn = (elem) ? elem->GetFileName() : "<undef>";
1235  lastMsg.Form("while processing dset:'%s', file:'%s'"
1236  " - check logs for possible stacktrace - last event:", dset->GetName(), fn.Data());
1237  }
1238  TProofServ::SetLastMsg(lastMsg);
1239  }
1240  // Set the max proc time, if any
1241  if (dset->Current()->GetMaxProcTime() >= 0.)
1242  maxproctime = (Long_t) (1000 * dset->Current()->GetMaxProcTime());
1243  newrun = (dset->Current()->TestBit(TDSetElement::kNewPacket)) ? kTRUE : kFALSE;
1244  }
1245 
1248  // Setup packet proc time measurement
1249  if (maxproctime > 0) {
1250  if (!fProcTimeTimer) fProcTimeTimer = new TProctimeTimer(this, maxproctime);
1251  fProcTimeTimer->Start(maxproctime, kTRUE); // One shot
1252  if (!fProcTime) fProcTime = new TStopwatch();
1253  fProcTime->Reset(); // Reset counters
1254  }
1255  Long64_t refnum = num;
1256  if (refnum < 0 && maxproctime <= 0) {
1257  wmsg.Form("neither entries nor max proc time specified:"
1258  " risk of infinite loop: processing aborted");
1259  Error("Process", "%s", wmsg.Data());
1260  if (gProofServ) {
1261  wmsg.Insert(0, TString::Format("ERROR:%s, entry:%lld, ",
1263  gProofServ->SendAsynMessage(wmsg.Data());
1264  }
1267  break;
1268  }
1269  while (refnum < 0 || num--) {
1270 
1271  // Did we use all our time?
1273  fProcTime->Stop();
1274  if (!newrun && !TestBit(TProofPlayer::kMaxProcTimeExtended) && refnum > 0) {
1275  // How much are we left with?
1276  Float_t xleft = (refnum > num) ? (Float_t) num / (Float_t) (refnum) : 1.;
1277  if (xleft < 0.2) {
1278  // Give another try, 1.5 times the remaining measured expected time
1279  Long_t mpt = (Long_t) (1500 * num / ((Double_t)(refnum - num) / fProcTime->RealTime()));
1281  fProcTimeTimer->Start(mpt, kTRUE); // One shot
1283  }
1284  }
1286  Info("Process", "max proc time reached (%ld msecs): packet processing stopped:\n%s",
1287  maxproctime, lastMsg.Data());
1288 
1289  break;
1290  }
1291  }
1292 
1293  if (!(!fSelStatus->TestBit(TStatus::kNotOk) &&
1294  fSelector->GetAbort() == TSelector::kContinue)) break;
1295 
1296  // Get the netry number, taking into account entry or event lists
1297  entry = fEvIter->GetEntryNumber(fst);
1298  fst++;
1299 
1300  // Set the last entry
1301  TProofServ::SetLastEntry(entry);
1302 
1303  if (fSelector->Version() == 0) {
1304  PDB(kLoop,3)
1305  Info("Process","Call ProcessCut(%lld)", entry);
1306  if (fSelector->ProcessCut(entry)) {
1307  PDB(kLoop,3)
1308  Info("Process","Call ProcessFill(%lld)", entry);
1309  fSelector->ProcessFill(entry);
1310  }
1311  } else {
1312  PDB(kLoop,3)
1313  Info("Process","Call Process(%lld)", entry);
1314  fSelector->Process(entry);
1317  break;
1318  } else if (fSelector->GetAbort() == TSelector::kAbortFile) {
1319  Info("Process", "packet processing aborted following the"
1320  " selector settings:\n%s", lastMsg.Data());
1323  }
1324  }
1326 
1327  // Check the memory footprint, if required
1328  if (memlogfreq > 0 && (GetEventsProcessed() + fProcessedRun)%memlogfreq == 0) {
1329  if (!CheckMemUsage(memlogfreq, warnHWMres, warnHWMvir, wmsg)) {
1330  Error("Process", "%s", wmsg.Data());
1331  if (gProofServ) {
1332  wmsg.Insert(0, TString::Format("ERROR:%s, entry:%lld, ",
1333  gProofServ->GetOrdinal(), entry));
1334  gProofServ->SendAsynMessage(wmsg.Data());
1335  }
1339  break;
1340  } else {
1341  if (!wmsg.IsNull()) {
1342  Warning("Process", "%s", wmsg.Data());
1343  if (gProofServ) {
1344  wmsg.Insert(0, TString::Format("WARNING:%s, entry:%lld, ",
1345  gProofServ->GetOrdinal(), entry));
1346  gProofServ->SendAsynMessage(wmsg.Data());
1347  }
1348  }
1349  }
1350  }
1354  }
1356  if (fSelStatus->TestBit(TStatus::kNotOk) || gROOT->IsInterrupted()) break;
1357 
1358  // Make sure that the selector abort status is reset
1360  fSelector->Abort("status reset", TSelector::kContinue);
1361  }
1362  }
1363 
1364  } CATCH(excode) {
1365  if (excode == kPEX_STOPPED) {
1366  Info("Process","received stop-process signal");
1368  } else if (excode == kPEX_ABORTED) {
1369  gAbort = kTRUE;
1370  Info("Process","received abort-process signal");
1372  } else {
1373  Error("Process","exception %d caught", excode);
1374  // Perhaps we need a dedicated status code here ...
1375  gAbort = kTRUE;
1377  }
1379  } ENDTRY;
1380 
1381  // Clean-up the envelop for the current element
1382  TPair *currentElem = 0;
1383  if ((currentElem = (TPair *) fInput->FindObject("PROOF_CurrentElement"))) {
1384  if ((currentElem = (TPair *) fInput->Remove(currentElem))) {
1385  delete currentElem->Key();
1386  delete currentElem;
1387  }
1388  }
1389 
1390  // Final memory footprint
1391  Long64_t singleshot = 1;
1392  Bool_t warnHWMres = kTRUE, warnHWMvir = kTRUE;
1393  Bool_t shrc = CheckMemUsage(singleshot, warnHWMres, warnHWMvir, wmsg);
1394  if (!wmsg.IsNull()) Warning("Process", "%s (%s)", wmsg.Data(), shrc ? "warn" : "hwm");
1395 
1396  PDB(kGlobal,2)
1397  Info("Process","%lld events processed", fProgressStatus->GetEntries());
1398 
1399  if (gMonitoringWriter) {
1403  }
1404 
1405  // Stop active timers
1407  if (fStopTimer != 0)
1409  if (fFeedbackTimer != 0)
1410  HandleTimer(0);
1411 
1412  StopFeedback();
1413 
1414  // Save the results, if needed, closing the file
1415  if (SavePartialResults(kTRUE) < 0)
1416  Warning("Process", "problems saving the results to file");
1417 
1419 
1420  // Finalize
1421 
1422  if (fExitStatus != kAborted) {
1423 
1424  TIter nxo(GetOutputList());
1425  TObject *o = 0;
1426  while ((o = nxo())) {
1427  // Special treatment for files
1428  if (o->IsA() == TProofOutputFile::Class()) {
1430  of->Print();
1432  const char *dir = of->GetDir();
1433  if (!dir || (dir && strlen(dir) <= 0)) {
1435  } else if (dir && strlen(dir) > 0) {
1436  TUrl u(dir);
1437  if (!strcmp(u.GetHost(), "localhost") || !strcmp(u.GetHost(), "127.0.0.1") ||
1438  !strcmp(u.GetHost(), "localhost.localdomain")) {
1439  u.SetHost(TUrl(gSystem->HostName()).GetHostFQDN());
1440  of->SetDir(u.GetUrl(kTRUE));
1441  }
1442  of->Print();
1443  }
1444  }
1445  }
1446 
1448 
1450  if (fSelector->Version() == 0) {
1451  PDB(kLoop,1) Info("Process","Call Terminate()");
1452  fSelector->Terminate();
1453  } else {
1454  PDB(kLoop,1) Info("Process","Call SlaveTerminate()");
1457  PDB(kLoop,1) Info("Process","Call Terminate()");
1458  fSelector->Terminate();
1459  }
1460  }
1461  }
1462 
1463  // Add Selector status in the output list so it can be returned to the client as done
1464  // by Tree::Process (see ROOT-748). The status from the various workers will be added.
1465  fOutput->Add(new TParameter<Long64_t>("PROOF_SelectorStatus", (Long64_t) fSelector->GetStatus()));
1466 
1467  if (gProofServ && !gProofServ->IsParallel()) { // put all the canvases onto the output list
1468  TIter nxc(gROOT->GetListOfCanvases());
1469  while (TObject *c = nxc())
1470  fOutput->Add(c);
1471  }
1472  }
1473 
1474  if (gProofServ)
1475  TPerfStats::Stop();
1476 
1477  return 0;
1478 }
1479 
1480 ////////////////////////////////////////////////////////////////////////////////
1481 /// Process specified TDSet on PROOF worker with TSelector object
1482 /// The return value is -1 in case of error and TSelector::GetStatus()
1483 /// in case of success.
1484 
1486  Option_t *option, Long64_t nentries,
1487  Long64_t first)
1488 {
1489  if (!selector) {
1490  Error("Process", "selector object undefiend!");
1491  return -1;
1492  }
1493 
1495  fSelector = selector;
1497  return Process(dset, (const char *)0, option, nentries, first);
1498 }
1499 
1500 ////////////////////////////////////////////////////////////////////////////////
1501 /// Not implemented: meaningful only in the remote player. Returns kFALSE.
1502 
1504 {
1505  return kFALSE;
1506 }
1507 
1508 ////////////////////////////////////////////////////////////////////////////////
1509 /// Check the memory usage, if requested.
1510 /// Return kTRUE if OK, kFALSE if above 95% of at least one between virtual or
1511 /// resident limits are depassed.
1512 
1514  Bool_t &w80v, TString &wmsg)
1515 {
1516  Long64_t processed = GetEventsProcessed() + fProcessedRun;
1517  if (mfreq > 0 && processed%mfreq == 0) {
1518  // Record the memory information
1519  ProcInfo_t pi;
1520  if (!gSystem->GetProcInfo(&pi)){
1521  wmsg = "";
1522  if (gProofServ)
1523  Info("CheckMemUsage|Svc", "Memory %ld virtual %ld resident event %lld",
1524  pi.fMemVirtual, pi.fMemResident, processed);
1525  // Save info in TStatus
1526  fSelStatus->SetMemValues(pi.fMemVirtual, pi.fMemResident);
1527  // Apply limit on virtual memory, if any: warn if above 80%, stop if above 95% of max
1528  if (TProofServ::GetVirtMemMax() > 0) {
1529  if (pi.fMemVirtual > TProofServ::GetMemStop() * TProofServ::GetVirtMemMax()) {
1530  wmsg.Form("using more than %d%% of allowed virtual memory (%ld kB)"
1531  " - STOP processing", (Int_t) (TProofServ::GetMemStop() * 100), pi.fMemVirtual);
1532  return kFALSE;
1533  } else if (pi.fMemVirtual > TProofServ::GetMemHWM() * TProofServ::GetVirtMemMax() && w80v) {
1534  // Refine monitoring
1535  mfreq = 1;
1536  wmsg.Form("using more than %d%% of allowed virtual memory (%ld kB)",
1537  (Int_t) (TProofServ::GetMemHWM() * 100), pi.fMemVirtual);
1538  w80v = kFALSE;
1539  }
1540  }
1541  // Apply limit on resident memory, if any: warn if above 80%, stop if above 95% of max
1542  if (TProofServ::GetResMemMax() > 0) {
1543  if (pi.fMemResident > TProofServ::GetMemStop() * TProofServ::GetResMemMax()) {
1544  wmsg.Form("using more than %d%% of allowed resident memory (%ld kB)"
1545  " - STOP processing", (Int_t) (TProofServ::GetMemStop() * 100), pi.fMemResident);
1546  return kFALSE;
1547  } else if (pi.fMemResident > TProofServ::GetMemHWM() * TProofServ::GetResMemMax() && w80r) {
1548  // Refine monitoring
1549  mfreq = 1;
1550  if (wmsg.Length() > 0) {
1551  wmsg.Form("using more than %d%% of allowed both virtual and resident memory ({%ld,%ld} kB)",
1552  (Int_t) (TProofServ::GetMemHWM() * 100), pi.fMemVirtual, pi.fMemResident);
1553  } else {
1554  wmsg.Form("using more than %d%% of allowed resident memory (%ld kB)",
1555  (Int_t) (TProofServ::GetMemHWM() * 100), pi.fMemResident);
1556  }
1557  w80r = kFALSE;
1558  }
1559  }
1560  // In saving-partial-results mode flag the saving regime when reached to save expensive calls
1561  // to TSystem::GetProcInfo in SavePartialResults
1562  if (fSaveMemThreshold > 0 && pi.fMemResident >= fSaveMemThreshold) fSavePartialResults = kTRUE;
1563  }
1564  }
1565  // Done
1566  return kTRUE;
1567 }
1568 
1569 ////////////////////////////////////////////////////////////////////////////////
1570 /// Finalize query (may not be used in this class).
1571 
1573 {
1574  MayNotUse("Finalize");
1575  return -1;
1576 }
1577 
1578 ////////////////////////////////////////////////////////////////////////////////
1579 /// Finalize query (may not be used in this class).
1580 
1582 {
1583  MayNotUse("Finalize");
1584  return -1;
1585 }
1586 ////////////////////////////////////////////////////////////////////////////////
1587 /// Merge output (may not be used in this class).
1588 
1590 {
1591  MayNotUse("MergeOutput");
1592  return;
1593 }
1594 
1595 ////////////////////////////////////////////////////////////////////////////////
1596 
1598 {
1600  fOutput->Add(olsdm);
1601 }
1602 
1603 ////////////////////////////////////////////////////////////////////////////////
1604 /// Update automatic binning parameters for given object "name".
1605 
1609  Double_t& zmin, Double_t& zmax)
1610 {
1611  if ( fAutoBins == 0 ) {
1612  fAutoBins = new THashList;
1613  }
1614 
1615  TAutoBinVal *val = (TAutoBinVal*) fAutoBins->FindObject(name);
1616 
1617  if ( val == 0 ) {
1618  //look for info in higher master
1619  if (gProofServ && !gProofServ->IsTopMaster()) {
1620  TString key = name;
1622  }
1623 
1624  val = new TAutoBinVal(name,xmin,xmax,ymin,ymax,zmin,zmax);
1625  fAutoBins->Add(val);
1626  } else {
1627  val->GetAll(xmin,xmax,ymin,ymax,zmin,zmax);
1628  }
1629 }
1630 
1631 ////////////////////////////////////////////////////////////////////////////////
1632 /// Get next packet (may not be used in this class).
1633 
1635 {
1636  MayNotUse("GetNextPacket");
1637  return 0;
1638 }
1639 
1640 ////////////////////////////////////////////////////////////////////////////////
1641 /// Set up feedback (may not be used in this class).
1642 
1644 {
1645  MayNotUse("SetupFeedback");
1646 }
1647 
1648 ////////////////////////////////////////////////////////////////////////////////
1649 /// Stop feedback (may not be used in this class).
1650 
1652 {
1653  MayNotUse("StopFeedback");
1654 }
1655 
1656 ////////////////////////////////////////////////////////////////////////////////
1657 /// Draw (may not be used in this class).
1658 
1659 Long64_t TProofPlayer::DrawSelect(TDSet * /*set*/, const char * /*varexp*/,
1660  const char * /*selection*/, Option_t * /*option*/,
1661  Long64_t /*nentries*/, Long64_t /*firstentry*/)
1662 {
1663  MayNotUse("DrawSelect");
1664  return -1;
1665 }
1666 
1667 ////////////////////////////////////////////////////////////////////////////////
1668 /// Handle tree header request.
1669 
1671 {
1672  MayNotUse("HandleGetTreeHeader|");
1673 }
1674 
1675 ////////////////////////////////////////////////////////////////////////////////
1676 /// Receive histo from slave.
1677 
1679 {
1680  TObject *obj = mess->ReadObject(mess->GetClass());
1681  if (obj->InheritsFrom(TH1::Class())) {
1682  TH1 *h = (TH1*)obj;
1683  h->SetDirectory(0);
1684  TH1 *horg = (TH1*)gDirectory->GetList()->FindObject(h->GetName());
1685  if (horg)
1686  horg->Add(h);
1687  else
1688  h->SetDirectory(gDirectory);
1689  }
1690 }
1691 
1692 ////////////////////////////////////////////////////////////////////////////////
1693 /// Draw the object if it is a canvas.
1694 /// Return 0 in case of success, 1 if it is not a canvas or libProofDraw
1695 /// is not available.
1696 
1698 {
1699  static Int_t (*gDrawCanvasHook)(TObject *) = 0;
1700 
1701  // Load the library the first time
1702  if (!gDrawCanvasHook) {
1703  // Load library needed for graphics ...
1704  TString drawlib = "libProofDraw";
1705  char *p = 0;
1706  if ((p = gSystem->DynamicPathName(drawlib, kTRUE))) {
1707  delete[] p;
1708  if (gSystem->Load(drawlib) != -1) {
1709  // Locate DrawCanvas
1710  Func_t f = 0;
1711  if ((f = gSystem->DynFindSymbol(drawlib,"DrawCanvas")))
1712  gDrawCanvasHook = (Int_t (*)(TObject *))(f);
1713  else
1714  Warning("DrawCanvas", "can't find DrawCanvas");
1715  } else
1716  Warning("DrawCanvas", "can't load %s", drawlib.Data());
1717  } else
1718  Warning("DrawCanvas", "can't locate %s", drawlib.Data());
1719  }
1720  if (gDrawCanvasHook && obj)
1721  return (*gDrawCanvasHook)(obj);
1722  // No drawing hook or object undefined
1723  return 1;
1724 }
1725 
1726 ////////////////////////////////////////////////////////////////////////////////
1727 /// Parse the arguments from var, sel and opt and fill the selector and
1728 /// object name accordingly.
1729 /// Return 0 in case of success, 1 if libProofDraw is not available.
1730 
1731 Int_t TProofPlayer::GetDrawArgs(const char *var, const char *sel, Option_t *opt,
1732  TString &selector, TString &objname)
1733 {
1734  static Int_t (*gGetDrawArgsHook)(const char *, const char *, Option_t *,
1735  TString &, TString &) = 0;
1736 
1737  // Load the library the first time
1738  if (!gGetDrawArgsHook) {
1739  // Load library needed for graphics ...
1740  TString drawlib = "libProofDraw";
1741  char *p = 0;
1742  if ((p = gSystem->DynamicPathName(drawlib, kTRUE))) {
1743  delete[] p;
1744  if (gSystem->Load(drawlib) != -1) {
1745  // Locate GetDrawArgs
1746  Func_t f = 0;
1747  if ((f = gSystem->DynFindSymbol(drawlib,"GetDrawArgs")))
1748  gGetDrawArgsHook = (Int_t (*)(const char *, const char *, Option_t *,
1749  TString &, TString &))(f);
1750  else
1751  Warning("GetDrawArgs", "can't find GetDrawArgs");
1752  } else
1753  Warning("GetDrawArgs", "can't load %s", drawlib.Data());
1754  } else
1755  Warning("GetDrawArgs", "can't locate %s", drawlib.Data());
1756  }
1757  if (gGetDrawArgsHook)
1758  return (*gGetDrawArgsHook)(var, sel, opt, selector, objname);
1759  // No parser hook or object undefined
1760  return 1;
1761 }
1762 
1763 ////////////////////////////////////////////////////////////////////////////////
1764 /// Create/destroy a named canvas for feedback
1765 
1766 void TProofPlayer::FeedBackCanvas(const char *name, Bool_t create)
1767 {
1768  static void (*gFeedBackCanvasHook)(const char *, Bool_t) = 0;
1769 
1770  // Load the library the first time
1771  if (!gFeedBackCanvasHook) {
1772  // Load library needed for graphics ...
1773  TString drawlib = "libProofDraw";
1774  char *p = 0;
1775  if ((p = gSystem->DynamicPathName(drawlib, kTRUE))) {
1776  delete[] p;
1777  if (gSystem->Load(drawlib) != -1) {
1778  // Locate FeedBackCanvas
1779  Func_t f = 0;
1780  if ((f = gSystem->DynFindSymbol(drawlib,"FeedBackCanvas")))
1781  gFeedBackCanvasHook = (void (*)(const char *, Bool_t))(f);
1782  else
1783  Warning("FeedBackCanvas", "can't find FeedBackCanvas");
1784  } else
1785  Warning("FeedBackCanvas", "can't load %s", drawlib.Data());
1786  } else
1787  Warning("FeedBackCanvas", "can't locate %s", drawlib.Data());
1788  }
1789  if (gFeedBackCanvasHook) (*gFeedBackCanvasHook)(name, create);
1790  // No parser hook or object undefined
1791  return;
1792 }
1793 
1794 ////////////////////////////////////////////////////////////////////////////////
1795 /// Return the size in bytes of the cache
1796 
1798 {
1799  if (fEvIter) return fEvIter->GetCacheSize();
1800  return -1;
1801 }
1802 
1803 ////////////////////////////////////////////////////////////////////////////////
1804 /// Return the number of entries in the learning phase
1805 
1807 {
1808  if (fEvIter) return fEvIter->GetLearnEntries();
1809  return -1;
1810 }
1811 
1812 ////////////////////////////////////////////////////////////////////////////////
1813 /// Switch on/off merge timer
1814 
1816 {
1817  if (on) {
1818  if (!fMergeSTW) fMergeSTW = new TStopwatch();
1819  PDB(kGlobal,1)
1820  Info("SetMerging", "ON: mergers: %d", fProof->fMergersCount);
1821  if (fNumMergers <= 0 && fProof->fMergersCount > 0)
1823  } else if (fMergeSTW) {
1824  fMergeSTW->Stop();
1825  Float_t rt = fMergeSTW->RealTime();
1826  PDB(kGlobal,1)
1827  Info("SetMerging", "OFF: rt: %f, mergers: %d", rt, fNumMergers);
1828  if (fQuery) {
1829  if (!fProof->TestBit(TProof::kIsClient) || fProof->IsLite()) {
1830  // On the master (or in Lite()) we set the merging time and the numebr of mergers
1831  fQuery->SetMergeTime(rt);
1833  } else {
1834  // In a standard client we save the transfer-to-client time
1835  fQuery->SetRecvTime(rt);
1836  }
1837  PDB(kGlobal,2) fQuery->Print("F");
1838  }
1839  }
1840 }
1841 
1842 //------------------------------------------------------------------------------
1843 
1845 
1846 ////////////////////////////////////////////////////////////////////////////////
1847 /// Process the specified TSelector object 'nentries' times.
1848 /// Used to test the PROOF interator mechanism for cycle-driven selectors in a
1849 /// local session.
1850 /// The return value is -1 in case of error and TSelector::GetStatus()
1851 /// in case of success.
1852 
1854  Long64_t nentries, Option_t *option)
1855 {
1856  if (!selector) {
1857  Error("Process", "selector object undefiend!");
1858  return -1;
1859  }
1860 
1861  TDSetProxy *set = new TDSetProxy("", "", "");
1862  set->SetBit(TDSet::kEmpty);
1863  set->SetBit(TDSet::kIsLocal);
1864  Long64_t rc = Process(set, selector, option, nentries);
1865  SafeDelete(set);
1866 
1867  // Done
1868  return rc;
1869 }
1870 
1871 ////////////////////////////////////////////////////////////////////////////////
1872 /// Process the specified TSelector file 'nentries' times.
1873 /// Used to test the PROOF interator mechanism for cycle-driven selectors in a
1874 /// local session.
1875 /// Process specified TDSet on PROOF worker with TSelector object
1876 /// The return value is -1 in case of error and TSelector::GetStatus()
1877 /// in case of success.
1878 
1880  Long64_t nentries, Option_t *option)
1881 {
1882  TDSetProxy *set = new TDSetProxy("", "", "");
1883  set->SetBit(TDSet::kEmpty);
1884  set->SetBit(TDSet::kIsLocal);
1885  Long64_t rc = Process(set, selector, option, nentries);
1886  SafeDelete(set);
1887 
1888  // Done
1889  return rc;
1890 }
1891 
1892 
1893 //------------------------------------------------------------------------------
1894 
1896 
1897 ////////////////////////////////////////////////////////////////////////////////
1898 /// Destructor.
1899 
1901 {
1902  SafeDelete(fOutput); // owns the output list
1904 
1905  // Objects stored in maps are already deleted when merging the feedback
1908 
1910 }
1911 
1912 ////////////////////////////////////////////////////////////////////////////////
1913 /// Init the packetizer
1914 /// Return 0 on success (fPacketizer is correctly initialized), -1 on failure.
1915 
1917  Long64_t first, const char *defpackunit,
1918  const char *defpackdata)
1919 {
1921  PDB(kGlobal,1) Info("Process","Enter");
1922  fDSet = dset;
1924 
1925  // This is done here to pickup on the fly changes
1926  Int_t honebyone = 1;
1927  if (TProof::GetParameter(fInput, "PROOF_MergeTH1OneByOne", honebyone) != 0)
1928  honebyone = gEnv->GetValue("ProofPlayer.MergeTH1OneByOne", 1);
1929  fMergeTH1OneByOne = (honebyone == 1) ? kTRUE : kFALSE;
1930 
1931  Bool_t noData = dset->TestBit(TDSet::kEmpty) ? kTRUE : kFALSE;
1932 
1933  TString packetizer;
1934  TList *listOfMissingFiles = 0;
1935 
1936  TMethodCall callEnv;
1937  TClass *cl;
1938  noData = dset->TestBit(TDSet::kEmpty) ? kTRUE : kFALSE;
1939 
1940  if (noData) {
1941 
1942  if (TProof::GetParameter(fInput, "PROOF_Packetizer", packetizer) != 0)
1943  packetizer = defpackunit;
1944  else
1945  Info("InitPacketizer", "using alternate packetizer: %s", packetizer.Data());
1946 
1947  // Get linked to the related class
1948  cl = TClass::GetClass(packetizer);
1949  if (cl == 0) {
1950  Error("InitPacketizer", "class '%s' not found", packetizer.Data());
1952  return -1;
1953  }
1954 
1955  // Init the constructor
1956  callEnv.InitWithPrototype(cl, cl->GetName(),"TList*,Long64_t,TList*,TProofProgressStatus*");
1957  if (!callEnv.IsValid()) {
1958  Error("InitPacketizer",
1959  "cannot find correct constructor for '%s'", cl->GetName());
1961  return -1;
1962  }
1963  callEnv.ResetParam();
1965  callEnv.SetParam((Long64_t) nentries);
1966  callEnv.SetParam((Long_t) fInput);
1967  callEnv.SetParam((Long_t) fProgressStatus);
1968 
1969  } else if (dset->TestBit(TDSet::kMultiDSet)) {
1970 
1971  // We have to process many datasets in one go, keeping them separate
1972  if (fProof->GetRunStatus() != TProof::kRunning) {
1973  // We have been asked to stop
1974  Error("InitPacketizer", "received stop/abort request");
1976  return -1;
1977  }
1978 
1979  // The multi packetizer
1980  packetizer = "TPacketizerMulti";
1981 
1982  // Get linked to the related class
1983  cl = TClass::GetClass(packetizer);
1984  if (cl == 0) {
1985  Error("InitPacketizer", "class '%s' not found", packetizer.Data());
1987  return -1;
1988  }
1989 
1990  // Init the constructor
1991  callEnv.InitWithPrototype(cl, cl->GetName(),"TDSet*,TList*,Long64_t,Long64_t,TList*,TProofProgressStatus*");
1992  if (!callEnv.IsValid()) {
1993  Error("InitPacketizer", "cannot find correct constructor for '%s'", cl->GetName());
1995  return -1;
1996  }
1997  callEnv.ResetParam();
1998  callEnv.SetParam((Long_t) dset);
2000  callEnv.SetParam((Long64_t) first);
2001  callEnv.SetParam((Long64_t) nentries);
2002  callEnv.SetParam((Long_t) fInput);
2003  callEnv.SetParam((Long_t) fProgressStatus);
2004 
2005  // We are going to test validity during the packetizer initialization
2006  dset->SetBit(TDSet::kValidityChecked);
2007  dset->ResetBit(TDSet::kSomeInvalid);
2008 
2009  } else {
2010 
2011  // Lookup - resolve the end-point urls to optmize the distribution.
2012  // The lookup was previously called in the packetizer's constructor.
2013  // A list for the missing files may already have been added to the
2014  // output list; otherwise, if needed it will be created inside
2015  if ((listOfMissingFiles = (TList *)fInput->FindObject("MissingFiles"))) {
2016  // Move it to the output list
2017  fInput->Remove(listOfMissingFiles);
2018  } else {
2019  listOfMissingFiles = new TList;
2020  }
2021  // Do the lookup; we only skip it if explicitly requested so.
2022  TString lkopt;
2023  if (TProof::GetParameter(fInput, "PROOF_LookupOpt", lkopt) != 0 || lkopt != "none")
2024  dset->Lookup(kTRUE, &listOfMissingFiles);
2025 
2026  if (fProof->GetRunStatus() != TProof::kRunning) {
2027  // We have been asked to stop
2028  Error("InitPacketizer", "received stop/abort request");
2030  return -1;
2031  }
2032 
2033  if (!(dset->GetListOfElements()) ||
2034  !(dset->GetListOfElements()->GetSize())) {
2035  if (gProofServ)
2036  gProofServ->SendAsynMessage("InitPacketizer: No files from the data set were found - Aborting");
2037  Error("InitPacketizer", "No files from the data set were found - Aborting");
2039  if (listOfMissingFiles) {
2040  listOfMissingFiles->SetOwner();
2041  fOutput->Remove(listOfMissingFiles);
2042  SafeDelete(listOfMissingFiles);
2043  }
2044  return -1;
2045  }
2046 
2047  if (TProof::GetParameter(fInput, "PROOF_Packetizer", packetizer) != 0)
2048  // Using standard packetizer TAdaptivePacketizer
2049  packetizer = defpackdata;
2050  else
2051  Info("InitPacketizer", "using alternate packetizer: %s", packetizer.Data());
2052 
2053  // Get linked to the related class
2054  cl = TClass::GetClass(packetizer);
2055  if (cl == 0) {
2056  Error("InitPacketizer", "class '%s' not found", packetizer.Data());
2058  return -1;
2059  }
2060 
2061  // Init the constructor
2062  callEnv.InitWithPrototype(cl, cl->GetName(),"TDSet*,TList*,Long64_t,Long64_t,TList*,TProofProgressStatus*");
2063  if (!callEnv.IsValid()) {
2064  Error("InitPacketizer", "cannot find correct constructor for '%s'", cl->GetName());
2066  return -1;
2067  }
2068  callEnv.ResetParam();
2069  callEnv.SetParam((Long_t) dset);
2071  callEnv.SetParam((Long64_t) first);
2072  callEnv.SetParam((Long64_t) nentries);
2073  callEnv.SetParam((Long_t) fInput);
2074  callEnv.SetParam((Long_t) fProgressStatus);
2075 
2076  // We are going to test validity during the packetizer initialization
2077  dset->SetBit(TDSet::kValidityChecked);
2078  dset->ResetBit(TDSet::kSomeInvalid);
2079  }
2080 
2081  // Get an instance of the packetizer
2082  Long_t ret = 0;
2083  callEnv.Execute(ret);
2084  if ((fPacketizer = (TVirtualPacketizer *)ret) == 0) {
2085  Error("InitPacketizer", "cannot construct '%s'", cl->GetName());
2087  return -1;
2088  }
2089 
2090  if (!fPacketizer->IsValid()) {
2091  Error("InitPacketizer",
2092  "instantiated packetizer object '%s' is invalid", cl->GetName());
2095  return -1;
2096  }
2097 
2098  // In multi mode retrieve the list of missing files
2099  if (!noData && dset->TestBit(TDSet::kMultiDSet)) {
2100  if ((listOfMissingFiles = (TList *) fInput->FindObject("MissingFiles"))) {
2101  // Remove it; it will be added to the output list
2102  fInput->Remove(listOfMissingFiles);
2103  }
2104  }
2105 
2106  if (!noData) {
2107  // Add invalid elements to the list of missing elements
2108  TDSetElement *elem = 0;
2109  if (dset->TestBit(TDSet::kSomeInvalid)) {
2110  TIter nxe(dset->GetListOfElements());
2111  while ((elem = (TDSetElement *)nxe())) {
2112  if (!elem->GetValid()) {
2113  if (!listOfMissingFiles)
2114  listOfMissingFiles = new TList;
2115  listOfMissingFiles->Add(elem->GetFileInfo(dset->GetType()));
2116  dset->Remove(elem, kFALSE);
2117  }
2118  }
2119  // The invalid elements have been removed
2121  }
2122 
2123  // Record the list of missing or invalid elements in the output list
2124  if (listOfMissingFiles && listOfMissingFiles->GetSize() > 0) {
2125  TIter missingFiles(listOfMissingFiles);
2126  TString msg;
2127  if (gDebug > 0) {
2128  TFileInfo *fi = 0;
2129  while ((fi = (TFileInfo *) missingFiles.Next())) {
2130  if (fi->GetCurrentUrl()) {
2131  msg = Form("File not found: %s - skipping!",
2132  fi->GetCurrentUrl()->GetUrl());
2133  } else {
2134  msg = Form("File not found: %s - skipping!", fi->GetName());
2135  }
2137  }
2138  }
2139  // Make sure it will be sent back
2140  if (!GetOutput("MissingFiles")) {
2141  listOfMissingFiles->SetName("MissingFiles");
2142  AddOutputObject(listOfMissingFiles);
2143  }
2144  TStatus *tmpStatus = (TStatus *)GetOutput("PROOF_Status");
2145  if (!tmpStatus) AddOutputObject((tmpStatus = new TStatus()));
2146 
2147  // Estimate how much data are missing
2148  Int_t ngood = dset->GetListOfElements()->GetSize();
2149  Int_t nbad = listOfMissingFiles->GetSize();
2150  Double_t xb = Double_t(nbad) / Double_t(ngood + nbad);
2151  msg = Form(" About %.2f %c of the requested files (%d out of %d) were missing or unusable; details in"
2152  " the 'missingFiles' list", xb * 100., '%', nbad, nbad + ngood);
2153  tmpStatus->Add(msg.Data());
2154  msg = Form(" +++\n"
2155  " +++ About %.2f %c of the requested files (%d out of %d) are missing or unusable; details in"
2156  " the 'MissingFiles' list\n"
2157  " +++", xb * 100., '%', nbad, nbad + ngood);
2159  } else {
2160  // Cleanup
2161  SafeDelete(listOfMissingFiles);
2162  }
2163  }
2164 
2165  // Done
2166  return 0;
2167 }
2168 
2169 ////////////////////////////////////////////////////////////////////////////////
2170 /// Process specified TDSet on PROOF.
2171 /// This method is called on client and on the PROOF master.
2172 /// The return value is -1 in case of an error and TSelector::GetStatus() in
2173 /// in case of success.
2174 
2175 Long64_t TProofPlayerRemote::Process(TDSet *dset, const char *selector_file,
2176  Option_t *option, Long64_t nentries,
2177  Long64_t first)
2178 {
2179  PDB(kGlobal,1) Info("Process", "Enter");
2180 
2181  fDSet = dset;
2183 
2184  if (!fProgressStatus) {
2185  Error("Process", "No progress status");
2186  return -1;
2187  }
2189 
2190  // delete fOutput;
2191  if (!fOutput)
2192  fOutput = new THashList;
2193  else
2194  fOutput->Clear();
2195 
2197 
2198  if (fProof->IsMaster()){
2200  } else {
2202  }
2203 
2204  TStopwatch elapsed;
2205 
2206  // Define filename
2207  TString fn;
2208  fSelectorFileName = selector_file;
2209 
2210  if (fCreateSelObj) {
2211  if(!SendSelector(selector_file)) return -1;
2212  fn = gSystem->BaseName(selector_file);
2213  } else {
2214  fn = selector_file;
2215  }
2216 
2217  TMessage mesg(kPROOF_PROCESS);
2218 
2219  // Parse option
2220  Bool_t sync = (fProof->GetQueryMode(option) == TProof::kSync);
2221 
2222  TList *inputtmp = 0; // List of temporary input objects
2223  TDSet *set = dset;
2224  if (fProof->IsMaster()) {
2225 
2226  PDB(kPacketizer,1) Info("Process","Create Proxy TDSet");
2227  set = new TDSetProxy( dset->GetType(), dset->GetObjName(),
2228  dset->GetDirectory() );
2229  if (dset->TestBit(TDSet::kEmpty))
2230  set->SetBit(TDSet::kEmpty);
2231 
2232  if (InitPacketizer(dset, nentries, first, "TPacketizerUnit", "TPacketizer") != 0) {
2233  Error("Process", "cannot init the packetizer");
2235  return -1;
2236  }
2237 
2238  // Reset start, this is now managed by the packetizer
2239  first = 0;
2240 
2241  // Negative memlogfreq disable checks.
2242  // If 0 is passed we try to have 100 messages about memory
2243  // Otherwise we use the frequency passed.
2244  Int_t mrc = -1;
2245  Long64_t memlogfreq = -1, mlf;
2246  if (gSystem->Getenv("PROOF_MEMLOGFREQ")) {
2247  TString clf(gSystem->Getenv("PROOF_MEMLOGFREQ"));
2248  if (clf.IsDigit()) { memlogfreq = clf.Atoi(); mrc = 0; }
2249  }
2250  if ((mrc = TProof::GetParameter(fProof->GetInputList(), "PROOF_MemLogFreq", mlf)) == 0) memlogfreq = mlf;
2251  if (memlogfreq == 0) {
2252  memlogfreq = fPacketizer->GetTotalEntries()/(fProof->GetParallel()*100);
2253  if (memlogfreq <= 0) memlogfreq = 1;
2254  }
2255  if (mrc == 0) fProof->SetParameter("PROOF_MemLogFreq", memlogfreq);
2256 
2257 
2258  // Send input data, if any
2259  TString emsg;
2260  if (TProof::SendInputData(fQuery, fProof, emsg) != 0)
2261  Warning("Process", "could not forward input data: %s", emsg.Data());
2262 
2263  // Attach to the transient histogram with the assigned packets, if required
2264  if (fInput->FindObject("PROOF_StatsHist") != 0) {
2265  if (!(fProcPackets = (TH1I *) fOutput->FindObject("PROOF_ProcPcktHist"))) {
2266  Warning("Process", "could not attach to histogram 'PROOF_ProcPcktHist'");
2267  } else {
2268  PDB(kLoop,1)
2269  Info("Process", "attached to histogram 'PROOF_ProcPcktHist' to record"
2270  " packets being processed");
2271  }
2272  }
2273 
2274  } else {
2275 
2276  // Check whether we have to enforce the use of submergers
2277  if (gEnv->Lookup("Proof.UseMergers") && !fInput->FindObject("PROOF_UseMergers")) {
2278  Int_t smg = gEnv->GetValue("Proof.UseMergers",-1);
2279  if (smg >= 0) {
2280  fInput->Add(new TParameter<Int_t>("PROOF_UseMergers", smg));
2281  if (gEnv->Lookup("Proof.MergersByHost")) {
2282  Int_t mbh = gEnv->GetValue("Proof.MergersByHost",0);
2283  if (mbh != 0) {
2284  // Administrator settings have the priority
2285  TObject *o = 0;
2286  if ((o = fInput->FindObject("PROOF_MergersByHost"))) { fInput->Remove(o); delete o; }
2287  fInput->Add(new TParameter<Int_t>("PROOF_MergersByHost", mbh));
2288  }
2289  }
2290  }
2291  }
2292 
2293  // For a new query clients should make sure that the temporary
2294  // output list is empty
2295  if (fOutputLists) {
2296  fOutputLists->Delete();
2297  delete fOutputLists;
2298  fOutputLists = 0;
2299  }
2300 
2301  if (!sync) {
2303  Printf(" ");
2304  Info("Process","starting new query");
2305  }
2306 
2307  // Define fSelector in Client if processing with filename
2308  if (fCreateSelObj) {
2310  if (!(fSelector = TSelector::GetSelector(selector_file))) {
2311  if (!sync)
2312  gSystem->RedirectOutput(0);
2313  return -1;
2314  }
2315  }
2316 
2317  fSelectorClass = 0;
2318  fSelectorClass = fSelector->IsA();
2319 
2320  // Add fSelector to inputlist if processing with object
2321  if (!fCreateSelObj) {
2322  // In any input list was set into the selector move it to the PROOF
2323  // input list, because we do not want to stream the selector one
2324  if (fSelector->GetInputList() && fSelector->GetInputList()->GetSize() > 0) {
2325  TIter nxi(fSelector->GetInputList());
2326  TObject *o = 0;
2327  while ((o = nxi())) {
2328  if (!fInput->FindObject(o)) {
2329  fInput->Add(o);
2330  if (!inputtmp) {
2331  inputtmp = new TList;
2332  inputtmp->SetOwner(kFALSE);
2333  }
2334  inputtmp->Add(o);
2335  }
2336  }
2337  }
2338  fInput->Add(fSelector);
2339  }
2340  // Set the input list for initialization
2342  fSelector->SetOption(option);
2344 
2345  PDB(kLoop,1) Info("Process","Call Begin(0)");
2346  fSelector->Begin(0);
2347 
2348  // Reset the input list to avoid double streaming and related problems (saving
2349  // the TQueryResult)
2351 
2352  // Send large input data objects, if any
2354 
2355  if (!sync)
2356  gSystem->RedirectOutput(0);
2357  }
2358 
2359  TCleanup clean(this);
2360  SetupFeedback();
2361 
2362  TString opt = option;
2363 
2364  // Old servers need a dedicated streamer
2365  if (fProof->fProtocol < 13)
2366  dset->SetWriteV3(kTRUE);
2367 
2368  // Workers will get the entry ranges from the packetizer
2370  Long64_t fst = (gProofServ && gProofServ->IsMaster() && gProofServ->IsParallel()) ? -1 : first;
2371 
2372  // Entry- or Event- list ?
2373  TEntryList *enl = (!fProof->IsMaster()) ? dynamic_cast<TEntryList *>(set->GetEntryList())
2374  : (TEntryList *)0;
2375  TEventList *evl = (!fProof->IsMaster() && !enl) ? dynamic_cast<TEventList *>(set->GetEntryList())
2376  : (TEventList *)0;
2377  if (fProof->fProtocol > 14) {
2378  if (fProcessMessage) delete fProcessMessage;
2380  mesg << set << fn << fInput << opt << num << fst << evl << sync << enl;
2381  (*fProcessMessage) << set << fn << fInput << opt << num << fst << evl << sync << enl;
2382  } else {
2383  mesg << set << fn << fInput << opt << num << fst << evl << sync;
2384  if (enl)
2385  // Not supported remotely
2386  Warning("Process","entry lists not supported by the server");
2387  }
2388 
2389  // Reset the merging progress information
2390  fProof->ResetMergePrg();
2391 
2392  Int_t nb = fProof->Broadcast(mesg);
2393  PDB(kGlobal,1) Info("Process", "Broadcast called: %d workers notified", nb);
2394  if (fProof->IsLite()) fProof->fNotIdle += nb;
2395 
2396  // Reset streamer choice
2397  if (fProof->fProtocol < 13)
2398  dset->SetWriteV3(kFALSE);
2399 
2400  // Redirect logs from master to special log frame
2401  if (IsClient())
2402  fProof->fRedirLog = kTRUE;
2403 
2404  if (!IsClient()){
2405  // Signal the start of finalize for the memory log grepping
2406  Info("Process|Svc", "Start merging Memory information");
2407  }
2408 
2409  if (!sync) {
2410  if (IsClient()) {
2411  // Asynchronous query: just make sure that asynchronous input
2412  // is enabled and return the prompt
2413  PDB(kGlobal,1) Info("Process","Asynchronous processing:"
2414  " activating CollectInputFrom");
2415  fProof->Activate();
2416 
2417  // Receive the acknowledgement and query sequential number
2418  fProof->Collect();
2419 
2420  return fProof->fSeqNum;
2421 
2422  } else {
2423  PDB(kGlobal,1) Info("Process","Calling Collect");
2424  fProof->Collect();
2425 
2426  HandleTimer(0); // force an update of final result
2427  // This forces a last call to TPacketizer::HandleTimer via the second argument
2428  // (the first is ignored). This is needed when some events were skipped so that
2429  // the total number of entries is not the one requested. The packetizer has no
2430  // way in such a case to understand that processing is finished: it must be told.
2431  if (fPacketizer) {
2433  // The progress timer will now stop itself at the next call
2435  // Store process info
2436  elapsed.Stop();
2437  if (fQuery)
2440  elapsed.RealTime());
2441  }
2442  StopFeedback();
2443 
2444  return Finalize(kFALSE,sync);
2445  }
2446  } else {
2447 
2448  PDB(kGlobal,1) Info("Process","Synchronous processing: calling Collect");
2449  fProof->Collect();
2450  if (!(fProof->IsSync())) {
2451  // The server required to switch to asynchronous mode
2452  Info("Process", "switching to the asynchronous mode ...");
2453  return fProof->fSeqNum;
2454  }
2455 
2456  // Restore prompt logging, for clients (Collect leaves things as they were
2457  // at the time it was called)
2458  if (IsClient())
2459  fProof->fRedirLog = kFALSE;
2460 
2461  if (!IsClient()) {
2462  // Force an update of final result
2463  HandleTimer(0);
2464  // This forces a last call to TPacketizer::HandleTimer via the second argument
2465  // (the first is ignored). This is needed when some events were skipped so that
2466  // the total number of entries is not the one requested. The packetizer has no
2467  // way in such a case to understand that processing is finished: it must be told.
2468  if (fPacketizer) {
2470  // The progress timer will now stop itself at the next call
2472  // Store process info
2473  if (fQuery)
2477  }
2478  } else {
2479  // Set the input list: maybe required at termination
2481  }
2482  StopFeedback();
2483 
2484  Long64_t rc = -1;
2486  rc = Finalize(kFALSE,sync);
2487 
2488  // Remove temporary input objects, if any
2489  if (inputtmp) {
2490  TIter nxi(inputtmp);
2491  TObject *o = 0;
2492  while ((o = nxi())) fInput->Remove(o);
2493  SafeDelete(inputtmp);
2494  }
2495 
2496  // Done
2497  return rc;
2498  }
2499 }
2500 
2501 ////////////////////////////////////////////////////////////////////////////////
2502 /// Process specified TDSet on PROOF.
2503 /// This method is called on client and on the PROOF master.
2504 /// The return value is -1 in case of an error and TSelector::GetStatus() in
2505 /// in case of success.
2506 
2508  Option_t *option, Long64_t nentries,
2509  Long64_t first)
2510 {
2511  if (!selector) {
2512  Error("Process", "selector object undefined");
2513  return -1;
2514  }
2515 
2516  // Define fSelector in Client
2517  if (IsClient() && (selector != fSelector)) {
2519  fSelector = selector;
2520  }
2521 
2523  Long64_t rc = Process(dset, selector->ClassName(), option, nentries, first);
2524  fCreateSelObj = kTRUE;
2525 
2526  // Done
2527  return rc;
2528 }
2529 
2530 ////////////////////////////////////////////////////////////////////////////////
2531 /// Prepares the given list of new workers to join a progressing process.
2532 /// Returns kTRUE on success, kFALSE otherwise.
2533 
2535 {
2536  if (!fProcessMessage || !fProof || !fPacketizer) {
2537  Error("Process", "Should not happen: fProcessMessage=%p fProof=%p fPacketizer=%p",
2539  return kFALSE;
2540  }
2541 
2542  if (!workers || !fProof->IsMaster()) {
2543  Error("Process", "Invalid call");
2544  return kFALSE;
2545  }
2546 
2547  PDB(kGlobal, 1)
2548  Info("Process", "Preparing %d new worker(s) to process", workers->GetEntries());
2549 
2550  // Sends the file associated to the TSelector, if necessary
2551  if (fCreateSelObj) {
2552  PDB(kGlobal, 2)
2553  Info("Process", "Sending selector file %s", fSelectorFileName.Data());
2555  Error("Process", "Problems in sending selector file %s", fSelectorFileName.Data());
2556  return kFALSE;
2557  }
2558  }
2559 
2560  if (fProof->IsLite()) fProof->fNotIdle += workers->GetSize();
2561 
2562  PDB(kGlobal, 2)
2563  Info("Process", "Adding new workers to the packetizer");
2564  if (fPacketizer->AddWorkers(workers) == -1) {
2565  Error("Process", "Cannot add new workers to the packetizer!");
2566  return kFALSE; // TODO: make new wrks inactive
2567  }
2568 
2569  PDB(kGlobal, 2)
2570  Info("Process", "Broadcasting process message to new workers");
2571  fProof->Broadcast(*fProcessMessage, workers);
2572 
2573  // Don't call Collect(): we came here from a global Collect() already which
2574  // will take care of new workers as well
2575 
2576  return kTRUE;
2577 
2578 }
2579 
2580 ////////////////////////////////////////////////////////////////////////////////
2581 /// Merge output in files
2582 
2584 {
2585  PDB(kOutput,1) Info("MergeOutputFiles", "enter: fOutput size: %d", fOutput->GetSize());
2586  PDB(kOutput,2) fOutput->ls();
2587 
2588  TList *rmList = 0;
2589  if (fMergeFiles) {
2590  TIter nxo(fOutput);
2591  TObject *o = 0;
2592  TProofOutputFile *pf = 0;
2593  while ((o = nxo())) {
2594  if ((pf = dynamic_cast<TProofOutputFile*>(o))) {
2595 
2596  PDB(kOutput,2) pf->Print();
2597 
2598  if (pf->IsMerge()) {
2599 
2600  // Point to the merger
2601  Bool_t localMerge = (pf->GetTypeOpt() == TProofOutputFile::kLocal) ? kTRUE : kFALSE;
2602  TFileMerger *filemerger = pf->GetFileMerger(localMerge);
2603  if (!filemerger) {
2604  Error("MergeOutputFiles", "file merger is null in TProofOutputFile! Protocol error?");
2605  pf->Print();
2606  continue;
2607  }
2608  // If only one instance the list in the merger is not yet created: do it now
2609  if (!pf->IsMerged()) {
2610  PDB(kOutput,2) pf->Print();
2611  TString fileLoc = TString::Format("%s/%s", pf->GetDir(), pf->GetFileName());
2612  filemerger->AddFile(fileLoc);
2613  }
2614  // Datadir
2615  TString ddir, ddopts;
2616  if (gProofServ) {
2617  ddir.Form("%s/", gProofServ->GetDataDir());
2619  }
2620  // Set the output file
2621  TString outfile(pf->GetOutputFileName());
2622  if (outfile.Contains("<datadir>/")) {
2623  outfile.ReplaceAll("<datadir>/", ddir.Data());
2624  if (!ddopts.IsNull())
2625  outfile += TString::Format("?%s", ddopts.Data());
2626  pf->SetOutputFileName(outfile);
2627  }
2628  if ((gProofServ && gProofServ->IsTopMaster()) || (fProof && fProof->IsLite())) {
2630  TString srv;
2632  TUrl usrv(srv);
2633  Bool_t localFile = kFALSE;
2634  if (pf->IsRetrieve()) {
2635  // This file will be retrieved by the client: we created it in the data dir
2636  // and save the file URL on the client in the title
2637  if (outfile.BeginsWith("client:")) outfile.Replace(0, 7, "");
2638  TString bn = gSystem->BaseName(TUrl(outfile.Data(), kTRUE).GetFile());
2639  // The output file path on the master
2640  outfile.Form("%s%s", ddir.Data(), bn.Data());
2641  // Save the client path in the title if not defined yet
2642  if (strlen(pf->GetTitle()) <= 0) pf->SetTitle(bn);
2643  // The file is local
2644  localFile = kTRUE;
2645  } else {
2646  // Check if the file is on the master or elsewhere
2647  if (outfile.BeginsWith("master:")) outfile.Replace(0, 7, "");
2648  // Check locality
2649  TUrl uof(outfile.Data(), kTRUE);
2650  TString lfn;
2651  ftyp = TFile::GetType(uof.GetUrl(), "RECREATE", &lfn);
2652  if (ftyp == TFile::kLocal && !srv.IsNull()) {
2653  // Check if is a different server
2654  if (uof.GetPort() > 0 && usrv.GetPort() > 0 &&
2655  usrv.GetPort() != uof.GetPort()) ftyp = TFile::kNet;
2656  }
2657  // If it is really local set the file name
2658  if (ftyp == TFile::kLocal) outfile = lfn;
2659  // The file maybe local
2660  if (ftyp == TFile::kLocal || ftyp == TFile::kFile) localFile = kTRUE;
2661  }
2662  // The remote output file name (the one to be used by the client)
2663  TString outfilerem(outfile);
2664  // For local files we add the local server
2665  if (localFile) {
2666  // Remove prefix, if any, if included and if Xrootd
2667  TProofServ::FilterLocalroot(outfilerem, srv);
2668  outfilerem.Insert(0, srv);
2669  }
2670  // Save the new remote output filename
2671  pf->SetOutputFileName(outfilerem);
2672  // Align the filename
2673  pf->SetFileName(gSystem->BaseName(outfilerem));
2674  }
2675  if (!filemerger->OutputFile(outfile)) {
2676  Error("MergeOutputFiles", "cannot open the output file");
2677  continue;
2678  }
2679  // Merge
2680  PDB(kSubmerger,2) filemerger->PrintFiles("");
2681  if (!filemerger->Merge()) {
2682  Error("MergeOutputFiles", "cannot merge the output files");
2683  continue;
2684  }
2685  // Remove the files
2686  TList *fileList = filemerger->GetMergeList();
2687  if (fileList) {
2688  TIter next(fileList);
2689  TObjString *url = 0;
2690  while((url = (TObjString*)next())) {
2691  TUrl u(url->GetName());
2692  if (!strcmp(u.GetProtocol(), "file")) {
2693  gSystem->Unlink(u.GetFile());
2694  } else {
2695  gSystem->Unlink(url->GetName());
2696  }
2697  }
2698  }
2699  // Reset the merger
2700  filemerger->Reset();
2701 
2702  } else {
2703 
2704  // If not yet merged (for example when having only 1 active worker,
2705  // we need to create the dataset by calling Merge on an effectively empty list
2706  if (!pf->IsMerged()) {
2707  TList dumlist;
2708  dumlist.Add(new TNamed("dum", "dum"));
2709  dumlist.SetOwner(kTRUE);
2710  pf->Merge(&dumlist);
2711  }
2712  // Point to the dataset
2714  if (!fc) {
2715  Error("MergeOutputFiles", "file collection is null in TProofOutputFile! Protocol error?");
2716  pf->Print();
2717  continue;
2718  }
2719  // Add the collection to the output list for registration and/or to be returned
2720  // to the client
2721  fOutput->Add(fc);
2722  // Do not cleanup at destruction
2723  pf->ResetFileCollection();
2724  // Tell the main thread to register this dataset, if needed
2725  if (pf->IsRegister()) {
2726  TString opt;
2727  if ((pf->GetTypeOpt() & TProofOutputFile::kOverwrite)) opt += "O";
2728  if ((pf->GetTypeOpt() & TProofOutputFile::kVerify)) opt += "V";
2729  if (!fOutput->FindObject("PROOFSERV_RegisterDataSet"))
2730  fOutput->Add(new TNamed("PROOFSERV_RegisterDataSet", ""));
2731  TString tag = TString::Format("DATASET_%s", pf->GetTitle());
2732  fOutput->Add(new TNamed(tag, opt));
2733  }
2734  // Remove this object from the output list and schedule it for distruction
2735  fOutput->Remove(pf);
2736  if (!rmList) rmList = new TList;
2737  rmList->Add(pf);
2738  PDB(kOutput,2) fOutput->Print();
2739  }
2740  }
2741  }
2742  }
2743 
2744  // Remove objects scheduled for removal
2745  if (rmList && rmList->GetSize() > 0) {
2746  TIter nxo(rmList);
2747  TObject *o = 0;
2748  while((o = nxo())) {
2749  fOutput->Remove(o);
2750  }
2751  rmList->SetOwner(kTRUE);
2752  delete rmList;
2753  }
2754 
2755  PDB(kOutput,1) Info("MergeOutputFiles", "done!");
2756 
2757  // Done
2758  return kTRUE;
2759 }
2760 
2761 
2762 ////////////////////////////////////////////////////////////////////////////////
2763 /// Set the selector's data members:
2764 /// find the mapping of data members to otuput list entries in the output list
2765 /// and apply it.
2766 
2768 {
2771  if (!olsdm) {
2772  PDB(kOutput,1) Warning("SetSelectorDataMembersFromOutputList",
2773  "failed to find map object in output list!");
2774  return;
2775  }
2776 
2777  olsdm->SetDataMembers(fSelector);
2778 }
2779 
2780 ////////////////////////////////////////////////////////////////////////////////
2781 
2783 {
2784  // Finalize a query.
2785  // Returns -1 in case of an error, 0 otherwise.
2786 
2787  if (IsClient()) {
2788  if (fOutputLists == 0) {
2789  if (force)
2790  if (fQuery)
2791  return fProof->Finalize(Form("%s:%s", fQuery->GetTitle(),
2792  fQuery->GetName()), force);
2793  } else {
2794  // Make sure the all objects are in the output list
2795  PDB(kGlobal,1) Info("Finalize","Calling Merge Output to finalize the output list");
2796  MergeOutput();
2797  }
2798  }
2799 
2800  Long64_t rv = 0;
2801  if (fProof->IsMaster()) {
2802 
2803  // Fill information for monitoring and stop it
2804  TStatus *status = (TStatus *) fOutput->FindObject("PROOF_Status");
2805  if (!status) {
2806  // The query was aborted: let's add some info in the output list
2807  status = new TStatus();
2808  fOutput->Add(status);
2809  TString emsg = TString::Format("Query aborted after %lld entries", GetEventsProcessed());
2810  status->Add(emsg);
2811  }
2812  status->SetExitStatus((Int_t) GetExitStatus());
2813 
2814  PDB(kOutput,1) Info("Finalize","Calling Merge Output");
2815  // Some objects (e.g. histos in autobin) may not have been merged yet
2816  // do it now
2817  MergeOutput();
2818 
2819  fOutput->SetOwner();
2820 
2821  // Add the active-wrks-vs-proctime info from the packetizer
2822  if (fPacketizer) {
2824  if (pperf) fOutput->Add(pperf);
2826  if (parms) {
2827  TIter nxo(parms);
2828  TObject *o = 0;
2829  while ((o = nxo())) fOutput->Add(o);
2830  }
2831 
2832  // If other invalid elements were found during processing, add them to the
2833  // list of missing elements
2834  TDSetElement *elem = 0;
2835  if (fPacketizer->GetFailedPackets()) {
2837  TList *listOfMissingFiles = (TList *) fOutput->FindObject("MissingFiles");
2838  if (!listOfMissingFiles) {
2839  listOfMissingFiles = new TList;
2840  listOfMissingFiles->SetName("MissingFiles");
2841  }
2843  while ((elem = (TDSetElement *)nxe()))
2844  listOfMissingFiles->Add(elem->GetFileInfo(type));
2845  if (!fOutput->FindObject(listOfMissingFiles)) fOutput->Add(listOfMissingFiles);
2846  }
2847  }
2848 
2849  TPerfStats::Stop();
2850  // Save memory usage on master
2851  Long_t vmaxmst, rmaxmst;
2852  TPerfStats::GetMemValues(vmaxmst, rmaxmst);
2853  status->SetMemValues(vmaxmst, rmaxmst, kTRUE);
2854 
2856 
2857  } else {
2858  if (fExitStatus != kAborted) {
2859 
2860  if (!sync) {
2861  // Reinit selector (with multi-sessioning we must do this until
2862  // TSelector::GetSelector() is optimized to i) avoid reloading of an
2863  // unchanged selector and ii) invalidate existing instances of
2864  // reloaded selector)
2865  if (ReinitSelector(fQuery) == -1) {
2866  Info("Finalize", "problems reinitializing selector \"%s\"",
2867  fQuery->GetSelecImp()->GetName());
2868  return -1;
2869  }
2870  }
2871 
2872  if (fPacketizer)
2873  if (TList *failedPackets = fPacketizer->GetFailedPackets()) {
2875  failedPackets->SetName("FailedPackets");
2876  AddOutputObject(failedPackets);
2877 
2878  TStatus *status = (TStatus *)GetOutput("PROOF_Status");
2879  if (!status) AddOutputObject((status = new TStatus()));
2880  status->Add("Some packets were not processed! Check the the"
2881  " 'FailedPackets' list in the output list");
2882  }
2883 
2884  // Some input parameters may be needed in Terminate
2886 
2888  if (output) {
2889  TIter next(fOutput);
2890  while(TObject* obj = next()) {
2891  if (fProof->IsParallel() || DrawCanvas(obj) == 1)
2892  // Either parallel or not a canvas or not able to display it:
2893  // just add to the list
2894  output->Add(obj);
2895  }
2896  } else {
2897  Warning("Finalize", "undefined output list in the selector! Protocol error?");
2898  }
2899 
2900  // We need to do this because the output list can be modified in TSelector::Terminate
2901  // in a way to invalidate existing objects; so we clean the links when still valid and
2902  // we re-copy back later
2904  fOutput->Clear("nodelete");
2905 
2906  // Map output objects to selector members
2908 
2909  PDB(kLoop,1) Info("Finalize","Call Terminate()");
2910  // This is the end of merging
2911  SetMerging(kFALSE);
2912  // We measure the merge time
2913  fProof->fQuerySTW.Reset();
2914  // Call Terminate now
2915  fSelector->Terminate();
2916 
2917  rv = fSelector->GetStatus();
2918 
2919  // Copy the output list back and clean the selector's list
2920  TIter it(output);
2921  while(TObject* o = it()) {
2922  fOutput->Add(o);
2923  }
2924 
2925  // Save the output list in the current query, if any
2926  if (fQuery) {
2928  // Set in finalized state (cannot be done twice)
2929  fQuery->SetFinalized();
2930  } else {
2931  Warning("Finalize","current TQueryResult object is undefined!");
2932  }
2933 
2934  if (!fCreateSelObj) {
2937  if (output) output->Remove(fSelector);
2938  fSelector = 0;
2939  }
2940 
2941  // We have transferred copy of the output objects in TQueryResult,
2942  // so now we can cleanup the selector, making sure that we do not
2943  // touch the output objects
2944  if (output) { output->SetOwner(kFALSE); output->Clear("nodelete"); }
2946 
2947  // Delete fOutput (not needed anymore, cannot be finalized twice),
2948  // making sure that the objects saved in TQueryResult are not deleted
2950  fOutput->Clear("nodelete");
2952 
2953  } else {
2954 
2955  // Cleanup
2956  fOutput->SetOwner();
2958  if (!fCreateSelObj) fSelector = 0;
2959  }
2960  }
2961  PDB(kGlobal,1) Info("Process","exit");
2962 
2963  if (!IsClient()) {
2964  Info("Finalize", "finalization on %s finished", gProofServ->GetPrefix());
2965  }
2967 
2968  return rv;
2969 }
2970 
2971 ////////////////////////////////////////////////////////////////////////////////
2972 /// Finalize the results of a query already processed.
2973 
2975 {
2976  PDB(kGlobal,1) Info("Finalize(TQueryResult *)","Enter");
2977 
2978  if (!IsClient()) {
2979  Info("Finalize(TQueryResult *)",
2980  "method to be executed only on the clients");
2981  return -1;
2982  }
2983 
2984  if (!qr) {
2985  Info("Finalize(TQueryResult *)", "query undefined");
2986  return -1;
2987  }
2988 
2989  if (qr->IsFinalized()) {
2990  Info("Finalize(TQueryResult *)", "query already finalized");
2991  return -1;
2992  }
2993 
2994  // Reset the list
2995  if (!fOutput)
2996  fOutput = new THashList;
2997  else
2998  fOutput->Clear();
2999 
3000  // Make sure that the temporary output list is empty
3001  if (fOutputLists) {
3002  fOutputLists->Delete();
3003  delete fOutputLists;
3004  fOutputLists = 0;
3005  }
3006 
3007  // Re-init the selector
3009 
3010  // Import the output list
3011  TList *tmp = (TList *) qr->GetOutputList();
3012  if (!tmp) {
3013  gSystem->RedirectOutput(0);
3014  Info("Finalize(TQueryResult *)", "outputlist is empty");
3015  return -1;
3016  }
3017  TList *out = fOutput;
3018  if (fProof->fProtocol < 11)
3019  out = new TList;
3020  TIter nxo(tmp);
3021  TObject *o = 0;
3022  while ((o = nxo()))
3023  out->Add(o->Clone());
3024 
3025  // Adopts the list
3026  if (fProof->fProtocol < 11) {
3027  out->SetOwner();
3028  StoreOutput(out);
3029  }
3030  gSystem->RedirectOutput(0);
3031 
3033 
3034  // Finalize it
3035  SetCurrentQuery(qr);
3036  Long64_t rc = Finalize();
3038 
3039  return rc;
3040 }
3041 
3042 ////////////////////////////////////////////////////////////////////////////////
3043 /// Send the selector file(s) to master or worker nodes.
3044 
3045 Bool_t TProofPlayerRemote::SendSelector(const char* selector_file)
3046 {
3047  // Check input
3048  if (!selector_file) {
3049  Info("SendSelector", "Invalid input: selector (file) name undefined");
3050  return kFALSE;
3051  }
3052 
3053  if (!strchr(gSystem->BaseName(selector_file), '.')) {
3054  if (gDebug > 1)
3055  Info("SendSelector", "selector name '%s' does not contain a '.':"
3056  " nothing to send, it will be loaded from a library", selector_file);
3057  return kTRUE;
3058  }
3059 
3060  // Extract the fine name first
3061  TString selec = selector_file;
3062  TString aclicMode;
3063  TString arguments;
3064  TString io;
3065  selec = gSystem->SplitAclicMode(selec, aclicMode, arguments, io);
3066 
3067  // Expand possible envs or '~'
3068  gSystem->ExpandPathName(selec);
3069 
3070  // Update the macro path
3072  TString np = gSystem->GetDirName(selec);
3073  if (!np.IsNull()) {
3074  np += ":";
3075  if (!mp.BeginsWith(np) && !mp.Contains(":"+np)) {
3076  Int_t ip = (mp.BeginsWith(".:")) ? 2 : 0;
3077  mp.Insert(ip, np);
3078  TROOT::SetMacroPath(mp);
3079  if (gDebug > 0)
3080  Info("SendSelector", "macro path set to '%s'", TROOT::GetMacroPath());
3081  }
3082  }
3083 
3084  // Header file
3085  TString header = selec;
3086  header.Remove(header.Last('.'));
3087  header += ".h";
3088  if (gSystem->AccessPathName(header, kReadPermission)) {
3089  TString h = header;
3090  header.Remove(header.Last('.'));
3091  header += ".hh";
3092  if (gSystem->AccessPathName(header, kReadPermission)) {
3093  Info("SendSelector",
3094  "header file not found: tried: %s %s", h.Data(), header.Data());
3095  return kFALSE;
3096  }
3097  }
3098 
3099  // Send files now
3101  Info("SendSelector", "problems sending implementation file %s", selec.Data());
3102  return kFALSE;
3103  }
3104  if (fProof->SendFile(header, (TProof::kBinary | TProof::kForward | TProof::kCp)) == -1) {
3105  Info("SendSelector", "problems sending header file %s", header.Data());
3106  return kFALSE;
3107  }
3108 
3109  return kTRUE;
3110 }
3111 
3112 ////////////////////////////////////////////////////////////////////////////////
3113 /// Merge objects in output the lists.
3114 
3116 {
3117  PDB(kOutput,1) Info("MergeOutput","Enter");
3118 
3119  TObject *obj = 0;
3120  if (fOutputLists) {
3121 
3122  TIter next(fOutputLists);
3123 
3124  TList *list;
3125  while ( (list = (TList *) next()) ) {
3126 
3127  if (!(obj = fOutput->FindObject(list->GetName()))) {
3128  obj = list->First();
3129  list->Remove(obj);
3130  fOutput->Add(obj);
3131  }
3132 
3133  if ( list->IsEmpty() ) continue;
3134 
3135  TMethodCall callEnv;
3136  if (obj->IsA())
3137  callEnv.InitWithPrototype(obj->IsA(), "Merge", "TCollection*");
3138  if (callEnv.IsValid()) {
3139  callEnv.SetParam((Long_t) list);
3140  callEnv.Execute(obj);
3141  } else {
3142  // No Merge interface, return individual objects
3143  while ( (obj = list->First()) ) {
3144  fOutput->Add(obj);
3145  list->Remove(obj);
3146  }
3147  }
3148  }
3150 
3151  } else {
3152 
3153  PDB(kOutput,1) Info("MergeOutput","fOutputLists empty");
3154  }
3155 
3156  if (!IsClient() || fProof->IsLite()) {
3157  // Merge the output files created on workers, if any
3158  MergeOutputFiles();
3159  }
3160 
3161  // If there are TProofOutputFile objects we have to make sure that the internal
3162  // information is consistent for the cases where this object is going to be merged
3163  // again (e.g. when using submergers or in a multi-master setup). This may not be
3164  // the case because the first coming in is taken as reference and it has the
3165  // internal dir and raw dir of the originating worker.
3166  TString key;
3167  TNamed *nm = 0;
3168  TList rmlist;
3169  TIter nxo(fOutput);
3170  while ((obj = nxo())) {
3171  TProofOutputFile *pf = dynamic_cast<TProofOutputFile *>(obj);
3172  if (pf) {
3173  if (gProofServ) {
3174  PDB(kOutput,2) Info("MergeOutput","found TProofOutputFile '%s'", obj->GetName());
3175  TString dir(pf->GetOutputFileName());
3176  PDB(kOutput,2) Info("MergeOutput","outputfilename: '%s'", dir.Data());
3177  // The dir
3178  if (dir.Last('/') != kNPOS) dir.Remove(dir.Last('/')+1);
3179  PDB(kOutput,2) Info("MergeOutput","dir: '%s'", dir.Data());
3180  pf->SetDir(dir);
3181  // The raw dir; for xrootd based system we include the 'localroot', if any
3182  TUrl u(dir);
3183  dir = u.GetFile();
3184  TString pfx = gEnv->GetValue("Path.Localroot","");
3185  if (!pfx.IsNull() &&
3186  (!strcmp(u.GetProtocol(), "root") || !strcmp(u.GetProtocol(), "xrd")))
3187  dir.Insert(0, pfx);
3188  PDB(kOutput,2) Info("MergeOutput","rawdir: '%s'", dir.Data());
3189  pf->SetDir(dir, kTRUE);
3190  // The worker ordinal
3192  // The saved output file name, if any
3193  key.Form("PROOF_OutputFileName_%s", pf->GetFileName());
3194  if ((nm = (TNamed *) fOutput->FindObject(key.Data()))) {
3195  pf->SetOutputFileName(nm->GetTitle());
3196  rmlist.Add(nm);
3198  pf->SetOutputFileName(0);
3200  }
3201  // The filename (order is important to exclude '.merger' from the key)
3202  dir = pf->GetFileName();
3204  dir += ".merger";
3205  pf->SetMerged(kFALSE);
3206  } else {
3207  if (dir.EndsWith(".merger")) dir.Remove(dir.Last('.'));
3208  }
3209  pf->SetFileName(dir);
3210  } else if (fProof->IsLite()) {
3211  // The ordinal
3212  pf->SetWorkerOrdinal("0");
3213  // The dir
3215  // The filename and raw dir
3216  TUrl u(pf->GetOutputFileName(), kTRUE);
3217  pf->SetFileName(gSystem->BaseName(u.GetFile()));
3218  pf->SetDir(gSystem->GetDirName(u.GetFile()), kTRUE);
3219  // Notify the output path
3220  Printf("\nOutput file: %s", pf->GetOutputFileName());
3221  }
3222  } else {
3223  PDB(kOutput,2) Info("MergeOutput","output object '%s' is not a TProofOutputFile", obj->GetName());
3224  }
3225  }
3226 
3227  // Remove temporary objects from fOutput
3228  if (rmlist.GetSize() > 0) {
3229  TIter nxrm(&rmlist);
3230  while ((obj = nxrm()))
3231  fOutput->Remove(obj);
3232  rmlist.SetOwner(kTRUE);
3233  }
3234 
3235  // If requested (typically in case of submerger to count possible side-effects in that process)
3236  // save the measured memory usage
3237  if (saveMemValues) {
3238  TPerfStats::Stop();
3239  // Save memory usage on master
3240  Long_t vmaxmst, rmaxmst;
3241  TPerfStats::GetMemValues(vmaxmst, rmaxmst);
3242  TStatus *status = (TStatus *) fOutput->FindObject("PROOF_Status");
3243  if (status) status->SetMemValues(vmaxmst, rmaxmst, kFALSE);
3244  }
3245 
3246  PDB(kOutput,1) fOutput->Print();
3247  PDB(kOutput,1) Info("MergeOutput","leave (%d object(s))", fOutput->GetSize());
3248 }
3249 
3250 ////////////////////////////////////////////////////////////////////////////////
3251 /// Progress signal.
3252 
3254 {
3255  if (IsClient()) {
3256  fProof->Progress(total, processed);
3257  } else {
3258  // Send to the previous tier
3260  m << total << processed;
3261  gProofServ->GetSocket()->Send(m);
3262  }
3263 }
3264 
3265 ////////////////////////////////////////////////////////////////////////////////
3266 /// Progress signal.
3267 
3269  Long64_t bytesread,
3270  Float_t initTime, Float_t procTime,
3271  Float_t evtrti, Float_t mbrti)
3272 {
3273  PDB(kGlobal,1)
3274  Info("Progress","%lld %lld %lld %f %f %f %f", total, processed, bytesread,
3275  initTime, procTime, evtrti, mbrti);
3276 
3277  if (IsClient()) {
3278  fProof->Progress(total, processed, bytesread, initTime, procTime, evtrti, mbrti);
3279  } else {
3280  // Send to the previous tier
3282  m << total << processed << bytesread << initTime << procTime << evtrti << mbrti;
3283  gProofServ->GetSocket()->Send(m);
3284  }
3285 }
3286 
3287 ////////////////////////////////////////////////////////////////////////////////
3288 /// Progress signal.
3289 
3291 {
3292  if (pi) {
3293  PDB(kGlobal,1)
3294  Info("Progress","%lld %lld %lld %f %f %f %f %d %f", pi->fTotal, pi->fProcessed, pi->fBytesRead,
3295  pi->fInitTime, pi->fProcTime, pi->fEvtRateI, pi->fMBRateI,
3296  pi->fActWorkers, pi->fEffSessions);
3297 
3298  if (IsClient()) {
3299  fProof->Progress(pi->fTotal, pi->fProcessed, pi->fBytesRead,
3300  pi->fInitTime, pi->fProcTime,
3301  pi->fEvtRateI, pi->fMBRateI,
3302  pi->fActWorkers, pi->fTotSessions, pi->fEffSessions);
3303  } else {
3304  // Send to the previous tier
3306  m << pi;
3307  gProofServ->GetSocket()->Send(m);
3308  }
3309  } else {
3310  Warning("Progress","TProofProgressInfo object undefined!");
3311  }
3312 }
3313 
3314 
3315 ////////////////////////////////////////////////////////////////////////////////
3316 /// Feedback signal.
3317 
3319 {
3320  fProof->Feedback(objs);
3321 }
3322 
3323 ////////////////////////////////////////////////////////////////////////////////
3324 /// Stop process after this event.
3325 
3327 {
3328  if (fPacketizer != 0)
3329  fPacketizer->StopProcess(abort, kFALSE);
3330  if (abort == kTRUE)
3332  else
3334 }
3335 
3336 ////////////////////////////////////////////////////////////////////////////////
3337 /// Incorporate the received object 'obj' into the output list fOutput.
3338 /// The latter is created if not existing.
3339 /// This method short cuts 'StoreOutput + MergeOutput' optimizing the memory
3340 /// consumption.
3341 /// Returns -1 in case of error, 1 if the object has been merged into another
3342 /// one (so that its ownership has not been taken and can be deleted), and 0
3343 /// otherwise.
3344 
3346 {
3347  PDB(kOutput,1)
3348  Info("AddOutputObject","Enter: %p (%s)", obj, obj ? obj->ClassName() : "undef");
3349 
3350  // We must something to process
3351  if (!obj) {
3352  PDB(kOutput,1) Info("AddOutputObject","Invalid input (obj == 0x0)");
3353  return -1;
3354  }
3355 
3356  // Create the output list, if not yet done
3357  if (!fOutput)
3358  fOutput = new THashList;
3359 
3360  // Flag about merging
3361  Bool_t merged = kTRUE;
3362 
3363  // Process event lists first
3364  TList *elists = dynamic_cast<TList *> (obj);
3365  if (elists && !strcmp(elists->GetName(), "PROOF_EventListsList")) {
3366 
3367  // Create a global event list, result of merging the event lists
3368  // coresponding to the various data set elements
3369  TEventList *evlist = new TEventList("PROOF_EventList");
3370 
3371  // Iterate the list of event list segments
3372  TIter nxevl(elists);
3373  TEventList *evl = 0;
3374  while ((evl = dynamic_cast<TEventList *> (nxevl()))) {
3375 
3376  // Find the file offset (fDSet is the current TDSet instance)
3377  // locating the element by name
3378  TIter nxelem(fDSet->GetListOfElements());
3379  TDSetElement *elem = 0;
3380  while ((elem = dynamic_cast<TDSetElement *> (nxelem()))) {
3381  if (!strcmp(elem->GetFileName(), evl->GetName()))
3382  break;
3383  }
3384  if (!elem) {
3385  Error("AddOutputObject", "Found an event list for %s, but no object with"
3386  " the same name in the TDSet", evl->GetName());
3387  continue;
3388  }
3389  Long64_t offset = elem->GetTDSetOffset();
3390 
3391  // Shift the list by the number of first event in that file
3392  Long64_t *arr = evl->GetList();
3393  Int_t num = evl->GetN();
3394  if (arr && offset > 0)
3395  for (Int_t i = 0; i < num; i++)
3396  arr[i] += offset;
3397 
3398  // Add to the global event list
3399  evlist->Add(evl);
3400  }
3401 
3402  // Incorporate the resulting global list in fOutput
3403  SetLastMergingMsg(evlist);
3404  Incorporate(evlist, fOutput, merged);
3405  NotifyMemory(evlist);
3406 
3407  // Delete the global list if merged
3408  if (merged)
3409  SafeDelete(evlist);
3410 
3411  // The original object has been transformed in something else; we do
3412  // not have ownership on it
3413  return 1;
3414  }
3415 
3416  // Check if we need to merge files
3417  TProofOutputFile *pf = dynamic_cast<TProofOutputFile*>(obj);
3418  if (pf) {
3419  fMergeFiles = kTRUE;
3420  if (!IsClient() || fProof->IsLite()) {
3421  if (pf->IsMerge()) {
3422  Bool_t hasfout = (pf->GetOutputFileName() &&
3423  strlen(pf->GetOutputFileName()) > 0 &&
3425  Bool_t setfout = (!hasfout || TestBit(TVirtualProofPlayer::kIsSubmerger)) ? kTRUE : kFALSE;
3426  if (setfout) {
3427 
3428  TString ddir, ddopts;
3429  if (gProofServ) {
3430  ddir.Form("%s/", gProofServ->GetDataDir());
3431  if (gProofServ->GetDataDirOpts()) ddopts = gProofServ->GetDataDirOpts();
3432  }
3433  // Set the output file
3434  TString outfile(pf->GetOutputFileName());
3435  outfile.ReplaceAll("<datadir>/", ddir.Data());
3436  if (!ddopts.IsNull()) outfile += TString::Format("?%s", ddopts.Data());
3437  pf->SetOutputFileName(outfile);
3438 
3439  if (gProofServ) {
3440  // If submerger, save first the existing filename, if any
3441  if (TestBit(TVirtualProofPlayer::kIsSubmerger) && hasfout) {
3442  TString key = TString::Format("PROOF_OutputFileName_%s", pf->GetFileName());
3443  if (!fOutput->FindObject(key.Data()))
3444  fOutput->Add(new TNamed(key.Data(), pf->GetOutputFileName()));
3445  }
3446  TString of;
3448  if (of.IsNull()) {
3449  // Assume an xroot server running on the machine
3450  of.Form("root://%s/", gSystem->HostName());
3451  if (gSystem->Getenv("XRDPORT")) {
3452  TString sp(gSystem->Getenv("XRDPORT"));
3453  if (sp.IsDigit())
3454  of.Form("root://%s:%s/", gSystem->HostName(), sp.Data());
3455  }
3456  }
3457  TString sessionPath(gProofServ->GetSessionDir());
3458  TProofServ::FilterLocalroot(sessionPath, of);
3459  of += TString::Format("%s/%s", sessionPath.Data(), pf->GetFileName());
3461  if (!of.EndsWith(".merger")) of += ".merger";
3462  } else {
3463  if (of.EndsWith(".merger")) of.Remove(of.Last('.'));
3464  }
3465  pf->SetOutputFileName(of);
3466  }
3467  }
3468  // Notify
3469  PDB(kOutput, 1) pf->Print();
3470  }
3471  } else {
3472  // On clients notify the output path
3473  Printf("Output file: %s", pf->GetOutputFileName());
3474  }
3475  }
3476 
3477  // For other objects we just run the incorporation procedure
3478  SetLastMergingMsg(obj);
3479  Incorporate(obj, fOutput, merged);
3480  NotifyMemory(obj);
3481 
3482  // We are done
3483  return (merged ? 1 : 0);
3484 }
3485 
3486 ////////////////////////////////////////////////////////////////////////////////
3487 /// Control output redirection to TProof::fLogFileW
3488 
3490 {
3491  if (on && fProof && fProof->fLogFileW) {
3494  } else if (!on) {
3495  if (fErrorHandler) {
3498  }
3499  }
3500 }
3501 
3502 ////////////////////////////////////////////////////////////////////////////////
3503 /// Incorporate the content of the received output list 'out' into the final
3504 /// output list fOutput. The latter is created if not existing.
3505 /// This method short cuts 'StoreOutput + MergeOutput' limiting the memory
3506 /// consumption.
3507 
3509 {
3510  PDB(kOutput,1) Info("AddOutput","Enter");
3511 
3512  // We must something to process
3513  if (!out) {
3514  PDB(kOutput,1) Info("AddOutput","Invalid input (out == 0x0)");
3515  return;
3516  }
3517 
3518  // Create the output list, if not yet done
3519  if (!fOutput)
3520  fOutput = new THashList;
3521 
3522  // Process event lists first
3523  Bool_t merged = kTRUE;
3524  TList *elists = dynamic_cast<TList *> (out->FindObject("PROOF_EventListsList"));
3525  if (elists) {
3526 
3527  // Create a global event list, result of merging the event lists
3528  // corresponding to the various data set elements
3529  TEventList *evlist = new TEventList("PROOF_EventList");
3530 
3531  // Iterate the list of event list segments
3532  TIter nxevl(elists);
3533  TEventList *evl = 0;
3534  while ((evl = dynamic_cast<TEventList *> (nxevl()))) {
3535 
3536  // Find the file offset (fDSet is the current TDSet instance)
3537  // locating the element by name
3538  TIter nxelem(fDSet->GetListOfElements());
3539  TDSetElement *elem = 0;
3540  while ((elem = dynamic_cast<TDSetElement *> (nxelem()))) {
3541  if (!strcmp(elem->GetFileName(), evl->GetName()))
3542  break;
3543  }
3544  if (!elem) {
3545  Error("AddOutput", "Found an event list for %s, but no object with"
3546  " the same name in the TDSet", evl->GetName());
3547  continue;
3548  }
3549  Long64_t offset = elem->GetTDSetOffset();
3550 
3551  // Shift the list by the number of first event in that file
3552  Long64_t *arr = evl->GetList();
3553  Int_t num = evl->GetN();
3554  if (arr && offset > 0)
3555  for (Int_t i = 0; i < num; i++)
3556  arr[i] += offset;
3557 
3558  // Add to the global event list
3559  evlist->Add(evl);
3560  }
3561 
3562  // Remove and delete the events lists object to avoid spoiling iteration
3563  // during next steps
3564  out->Remove(elists);
3565  delete elists;
3566 
3567  // Incorporate the resulting global list in fOutput
3568  SetLastMergingMsg(evlist);
3569  Incorporate(evlist, fOutput, merged);
3570  NotifyMemory(evlist);
3571  }
3572 
3573  // Iterate on the remaining objects in the received list
3574  TIter nxo(out);
3575  TObject *obj = 0;
3576  while ((obj = nxo())) {
3577  SetLastMergingMsg(obj);
3578  Incorporate(obj, fOutput, merged);
3579  // If not merged, drop from the temporary list, as the ownership
3580  // passes to fOutput
3581  if (!merged)
3582  out->Remove(obj);
3583  NotifyMemory(obj);
3584  }
3585 
3586  // Done
3587  return;
3588 }
3589 
3590 ////////////////////////////////////////////////////////////////////////////////
3591 /// Printout the memory record after merging object 'obj'
3592 /// This record is used by the memory monitor
3593 
3595 {
3596  if (fProof && (!IsClient() || fProof->IsLite())){
3597  ProcInfo_t pi;
3598  if (!gSystem->GetProcInfo(&pi)){
3599  // For PROOF-Lite we redirect this output to a the open log file so that the
3600  // memory monitor can pick these messages up
3602  Info("NotifyMemory|Svc", "Memory %ld virtual %ld resident after merging object %s",
3603  pi.fMemVirtual, pi.fMemResident, obj->GetName());
3604  RedirectOutput(0);
3605  }
3606  // Record also values for monitoring
3608  }
3609 }
3610 
3611 ////////////////////////////////////////////////////////////////////////////////
3612 /// Set the message to be notified in case of exception
3613 
3615 {
3616  TString lastMsg = TString::Format("while merging object '%s'", obj->GetName());
3617  TProofServ::SetLastMsg(lastMsg);
3618 }
3619 
3620 ////////////////////////////////////////////////////////////////////////////////
3621 /// Incorporate object 'newobj' in the list 'outlist'.
3622 /// The object is merged with an object of the same name already existing in
3623 /// the list, or just added.
3624 /// The boolean merged is set to kFALSE when the object is just added to 'outlist';
3625 /// this happens if the Merge() method does not exist or if a object named as 'obj'
3626 /// is not already in the list. If the obj is not 'merged' than it should not be
3627 /// deleted, unless outlist is not owner of its objects.
3628 /// Return 0 on success, -1 on error.
3629 
3631 {
3632  merged = kTRUE;
3633 
3634  PDB(kOutput,1)
3635  Info("Incorporate", "enter: obj: %p (%s), list: %p",
3636  newobj, newobj ? newobj->ClassName() : "undef", outlist);
3637 
3638  // The object and list must exist
3639  if (!newobj || !outlist) {
3640  Error("Incorporate","Invalid inputs: obj: %p, list: %p", newobj, outlist);
3641  return -1;
3642  }
3643 
3644  // Special treatment for histograms in autobin mode
3645  Bool_t specialH =
3647  if (specialH && newobj->InheritsFrom(TH1::Class())) {
3648  if (!HandleHistogram(newobj, merged)) {
3649  if (merged) {
3650  PDB(kOutput,1) Info("Incorporate", "histogram object '%s' merged", newobj->GetName());
3651  } else {
3652  PDB(kOutput,1) Info("Incorporate", "histogram object '%s' added to the"
3653  " appropriate list for delayed merging", newobj->GetName());
3654  }
3655  return 0;
3656  }
3657  }
3658 
3659  // Check if an object with the same name exists already
3660  TObject *obj = outlist->FindObject(newobj->GetName());
3661 
3662  // If no, add the new object and return
3663  if (!obj) {
3664  outlist->Add(newobj);
3665  merged = kFALSE;
3666  // Done
3667  return 0;
3668  }
3669 
3670  // Locate the Merge(TCollection *) method
3671  TMethodCall callEnv;
3672  if (obj->IsA())
3673  callEnv.InitWithPrototype(obj->IsA(), "Merge", "TCollection*");
3674  if (callEnv.IsValid()) {
3675  // Found: put the object in a one-element list
3676  static TList *xlist = new TList;
3677  xlist->Add(newobj);
3678  // Call the method
3679  callEnv.SetParam((Long_t) xlist);
3680  callEnv.Execute(obj);
3681  // Ready for next call
3682  xlist->Clear();
3683  } else {
3684  // Not found: return individual objects
3685  outlist->Add(newobj);
3686  merged = kFALSE;
3687  }
3688 
3689  // Done
3690  return 0;
3691 }
3692 
3693 ////////////////////////////////////////////////////////////////////////////////
3694 /// Low statistic histograms need a special treatment when using autobin
3695 
3697 {
3698  TH1 *h = dynamic_cast<TH1 *>(obj);
3699  if (!h) {
3700  // Not an histo
3701  return obj;
3702  }
3703 
3704  // This is only used if we return (TObject *)0 and there is only one case
3705  // when we set this to kTRUE
3706  merged = kFALSE;
3707 
3708  // Does is still needs binning ?
3709  Bool_t tobebinned = (h->GetBuffer()) ? kTRUE : kFALSE;
3710 
3711  // Number of entries
3712  Int_t nent = h->GetBufferLength();
3713  PDB(kOutput,2) Info("HandleHistogram", "h:%s ent:%d, buffer size: %d",
3714  h->GetName(), nent, h->GetBufferSize());
3715 
3716  // Attach to the list in the outputlists, if any
3717  TList *list = 0;
3718  if (!fOutputLists) {
3719  PDB(kOutput,2) Info("HandleHistogram", "create fOutputLists");
3720  fOutputLists = new TList;
3722  }
3723  list = (TList *) fOutputLists->FindObject(h->GetName());
3724 
3725  TH1 *href = 0;
3726  if (tobebinned) {
3727 
3728  // The histogram needs to be projected in a reasonable range: we
3729  // do this at the end with all the histos, so we need to create
3730  // a list here
3731  if (!list) {
3732  // Create the list
3733  list = new TList;
3734  list->SetName(h->GetName());
3735  list->SetOwner();
3736  fOutputLists->Add(list);
3737  // Move in it any previously merged object from the output list
3738  if (fOutput && (href = (TH1 *) fOutput->FindObject(h->GetName()))) {
3739  fOutput->Remove(href);
3740  list->Add(href);
3741  }
3742  }
3743  TIter nxh(list);
3744  while ((href = (TH1 *) nxh())) {
3745  if (href->GetBuffer() && href->GetBufferLength() < nent) break;
3746  }
3747  if (href) {
3748  list->AddBefore(href, h);
3749  } else {
3750  list->Add(h);
3751  }
3752  // Done
3753  return (TObject *)0;
3754 
3755  } else {
3756 
3757  if (list) {
3758  TIter nxh(list);
3759  while ((href = (TH1 *) nxh())) {
3760  if (href->GetBuffer() || href->GetEntries() < nent) break;
3761  }
3762  if (href) {
3763  list->AddBefore(href, h);
3764  } else {
3765  list->Add(h);
3766  }
3767  // Done
3768  return (TObject *)0;
3769 
3770  } else {
3771  // Check if we can 'Add' the histogram to an existing one; this is more efficient
3772  // then using Merge
3773  TH1 *hout = (TH1*) fOutput->FindObject(h->GetName());
3774  if (hout) {
3775  // Remove the existing histo from the output list ...
3776  fOutput->Remove(hout);
3777  // ... and create either the list to merge in one-go at the end
3778  // (more efficient than merging one by one) or, if too big, merge
3779  // these two and start the 'one-by-one' technology
3780  Int_t hsz = h->GetNbinsX() * h->GetNbinsY() * h->GetNbinsZ();
3781  if (fMergeTH1OneByOne || (gProofServ && hsz > gProofServ->GetMsgSizeHWM())) {
3782  list = new TList;
3783  list->Add(hout);
3784  h->Merge(list);
3785  list->SetOwner();
3786  delete list;
3787  return h;
3788  } else {
3789  list = new TList;
3790  list->SetName(h->GetName());
3791  list->SetOwner();
3792  fOutputLists->Add(list);
3793  // Add the existing and the incoming histos
3794  list->Add(hout);
3795  list->Add(h);
3796  // Done
3797  return (TObject *)0;
3798  }
3799  } else {
3800  // This is the first one; add it to the output list
3801  fOutput->Add(h);
3802  return (TObject *)0;
3803  }
3804  }
3805  }
3806 }
3807 
3808 ////////////////////////////////////////////////////////////////////////////////
3809 /// Return kTRUE is the histograms 'h0' and 'h1' have the same binning and ranges
3810 /// on the axis (i.e. if they can be just Add-ed for merging).
3811 
3813 {
3814  Bool_t rc = kFALSE;
3815  if (!h0 || !h1) return rc;
3816 
3817  TAxis *a0 = 0, *a1 = 0;
3818 
3819  // Check X
3820  a0 = h0->GetXaxis();
3821  a1 = h1->GetXaxis();
3822  if (a0->GetNbins() == a1->GetNbins())
3823  if (TMath::Abs(a0->GetXmax() - a1->GetXmax()) < 1.e-9)
3824  if (TMath::Abs(a0->GetXmin() - a1->GetXmin()) < 1.e-9) rc = kTRUE;
3825 
3826  // Check Y, if needed
3827  if (h0->GetDimension() > 1) {
3828  rc = kFALSE;
3829  a0 = h0->GetYaxis();
3830  a1 = h1->GetYaxis();
3831  if (a0->GetNbins() == a1->GetNbins())
3832  if (TMath::Abs(a0->GetXmax() - a1->GetXmax()) < 1.e-9)
3833  if (TMath::Abs(a0->GetXmin() - a1->GetXmin()) < 1.e-9) rc = kTRUE;
3834  }
3835 
3836  // Check Z, if needed
3837  if (h0->GetDimension() > 2) {
3838  rc = kFALSE;
3839  a0 = h0->GetZaxis();
3840  a1 = h1->GetZaxis();
3841  if (a0->GetNbins() == a1->GetNbins())
3842  if (TMath::Abs(a0->GetXmax() - a1->GetXmax()) < 1.e-9)
3843  if (TMath::Abs(a0->GetXmin() - a1->GetXmin()) < 1.e-9) rc = kTRUE;
3844  }
3845 
3846  // Done
3847  return rc;
3848 }
3849 
3850 ////////////////////////////////////////////////////////////////////////////////
3851 /// Store received output list.
3852 
3854 {
3855  PDB(kOutput,1) Info("StoreOutput","Enter");
3856 
3857  if ( out == 0 ) {
3858  PDB(kOutput,1) Info("StoreOutput","Leave (empty)");
3859  return;
3860  }
3861 
3862  TIter next(out);
3863  out->SetOwner(kFALSE); // take ownership of the contents
3864 
3865  if (fOutputLists == 0) {
3866  PDB(kOutput,2) Info("StoreOutput","Create fOutputLists");
3867  fOutputLists = new TList;
3869  }
3870  // process eventlists first
3871  TList* lists = dynamic_cast<TList*> (out->FindObject("PROOF_EventListsList"));
3872  if (lists) {
3873  out->Remove(lists);
3874  TEventList *mainList = new TEventList("PROOF_EventList");
3875  out->Add(mainList);
3876  TIter it(lists);
3877  TEventList *aList;
3878  while ( (aList = dynamic_cast<TEventList*> (it())) ) {
3879  // find file offset
3880  TIter nxe(fDSet->GetListOfElements());
3881  TDSetElement *elem;
3882  while ( (elem = dynamic_cast<TDSetElement*> (nxe())) ) {
3883  if (strcmp(elem->GetFileName(), aList->GetName()) == 0)
3884  break;
3885  }
3886  if (!elem) {
3887  Error("StoreOutput", "found the EventList for %s, but no object with that name "
3888  "in the TDSet", aList->GetName());
3889  continue;
3890  }
3891  Long64_t offset = elem->GetTDSetOffset();
3892 
3893  // shift the list by the number of first event in that file
3894  Long64_t *arr = aList->GetList();
3895  Int_t num = aList->GetN();
3896  if (arr && offset)
3897  for (int i = 0; i < num; i++)
3898  arr[i] += offset;
3899 
3900  mainList->Add(aList); // add to the main list
3901  }
3902  delete lists;
3903  }
3904 
3905  TObject *obj;
3906  while( (obj = next()) ) {
3907  PDB(kOutput,2) Info("StoreOutput","find list for '%s'", obj->GetName() );
3908 
3909  TList *list = (TList *) fOutputLists->FindObject( obj->GetName() );
3910  if ( list == 0 ) {
3911  PDB(kOutput,2) Info("StoreOutput", "list for '%s' not found (creating)", obj->GetName());
3912  list = new TList;
3913  list->SetName( obj->GetName() );
3914  list->SetOwner();
3915  fOutputLists->Add( list );
3916  }
3917  list->Add( obj );
3918  }
3919 
3920  delete out;
3921  PDB(kOutput,1) Info("StoreOutput", "leave");
3922 }
3923 
3924 ////////////////////////////////////////////////////////////////////////////////
3925 /// Merge feedback lists.
3926 
3928 {
3929  PDB(kFeedback,1)
3930  Info("MergeFeedback","Enter");
3931 
3932  if ( fFeedbackLists == 0 ) {
3933  PDB(kFeedback,1)
3934  Info("MergeFeedback","Leave (no output)");
3935  return 0;
3936  }
3937 
3938  TList *fb = new TList; // collection of feedback objects
3939  fb->SetOwner();
3940 
3941  TIter next(fFeedbackLists);
3942 
3943  TMap *map;
3944  while ( (map = (TMap*) next()) ) {
3945 
3946  PDB(kFeedback,2)
3947  Info("MergeFeedback", "map %s size: %d", map->GetName(), map->GetSize());
3948 
3949  // turn map into list ...
3950 
3951  TList *list = new TList;
3952  TIter keys(map);
3953 
3954 #ifndef R__TH1MERGEFIXED
3955  Int_t nbmx = -1;
3956  TObject *oref = 0;
3957 #endif
3958  while ( TObject *key = keys() ) {
3959  TObject *o = map->GetValue(key);
3960  TH1 *h = dynamic_cast<TH1 *>(o);
3961 #ifndef R__TH1MERGEFIXED
3962  // Temporary fix for to cope with the problem in TH1::Merge.
3963  // We need to use a reference histo the one with the largest number
3964  // of bins so that the histos from all submasters can be correctly
3965  // fit in
3966  if (h && !strncmp(o->GetName(),"PROOF_",6)) {
3967  if (h->GetNbinsX() > nbmx) {
3968  nbmx= h->GetNbinsX();
3969  oref = o;
3970  }
3971  }
3972 #endif
3973  if (h) {
3974  TIter nxh(list);
3975  TH1 *href= 0;
3976  while ((href = (TH1 *)nxh())) {
3977  if (h->GetBuffer()) {
3978  if (href->GetBuffer() && href->GetBufferLength() < h->GetBufferLength()) break;
3979  } else {
3980  if (href->GetBuffer() || href->GetEntries() < h->GetEntries()) break;
3981  }
3982  }
3983  if (href) {
3984  list->AddBefore(href, h);
3985  } else {
3986  list->Add(h);
3987  }
3988  } else {
3989  list->Add(o);
3990  }
3991  }
3992 
3993  // clone first object, remove from list
3994 #ifdef R__TH1MERGEFIXED
3995  TObject *obj = list->First();
3996 #else
3997  TObject *obj = (oref) ? oref : list->First();
3998 #endif
3999  list->Remove(obj);
4000  obj = obj->Clone();
4001  fb->Add(obj);
4002 
4003  if ( list->IsEmpty() ) {
4004  delete list;
4005  continue;
4006  }
4007 
4008  // merge list with clone
4009  TMethodCall callEnv;
4010  if (obj->IsA())
4011  callEnv.InitWithPrototype(obj->IsA(), "Merge", "TCollection*");
4012  if (callEnv.IsValid()) {
4013  callEnv.SetParam((Long_t) list);
4014  callEnv.Execute(obj);
4015  } else {
4016  // No Merge interface, return copy of individual objects
4017  while ( (obj = list->First()) ) {
4018  fb->Add(obj->Clone());
4019  list->Remove(obj);
4020  }
4021  }
4022 
4023  delete list;
4024  }
4025 
4026  PDB(kFeedback,1)
4027  Info("MergeFeedback","Leave (%d object(s))", fb->GetSize());
4028 
4029  return fb;
4030 }
4031 
4032 ////////////////////////////////////////////////////////////////////////////////
4033 /// Store feedback results from the specified slave.
4034 
4036 {
4037  PDB(kFeedback,1)
4038  Info("StoreFeedback","Enter");
4039 
4040  if ( out == 0 ) {
4041  PDB(kFeedback,1)
4042  Info("StoreFeedback","Leave (empty)");
4043  return;
4044  }
4045 
4046  if ( IsClient() ) {
4047  // in client
4048  Feedback(out);
4049  delete out;
4050  return;
4051  }
4052 
4053  if (fFeedbackLists == 0) {
4054  PDB(kFeedback,2) Info("StoreFeedback","Create fFeedbackLists");
4055  fFeedbackLists = new TList;
4057  }
4058 
4059  TIter next(out);
4060  out->SetOwner(kFALSE); // take ownership of the contents
4061 
4062  const char *ord = ((TSlave*) slave)->GetOrdinal();
4063 
4064  TObject *obj;
4065  while( (obj = next()) ) {
4066  PDB(kFeedback,2)
4067  Info("StoreFeedback","%s: Find '%s'", ord, obj->GetName() );
4068  TMap *map = (TMap*) fFeedbackLists->FindObject(obj->GetName());
4069  if ( map == 0 ) {
4070  PDB(kFeedback,2)
4071  Info("StoreFeedback", "%s: map for '%s' not found (creating)", ord, obj->GetName());
4072  // Map must not be owner (ownership is with regards to the keys (only))
4073  map = new TMap;
4074  map->SetName(obj->GetName());
4075  fFeedbackLists->Add(map);
4076  } else {
4077  PDB(kFeedback,2)
4078  Info("StoreFeedback","%s: removing previous value", ord);
4079  if (map->GetValue(slave))
4080  delete map->GetValue(slave);
4081  map->Remove(slave);
4082  }
4083  map->Add(slave, obj);
4084  PDB(kFeedback,2)
4085  Info("StoreFeedback","%s: %s, size: %d", ord, obj->GetName(), map->GetSize());
4086  }
4087 
4088  delete out;
4089  PDB(kFeedback,1)
4090  Info("StoreFeedback","Leave");
4091 }
4092 
4093 ////////////////////////////////////////////////////////////////////////////////
4094 /// Setup reporting of feedback objects.
4095 
4097 {
4098  if (IsClient()) return; // Client does not need timer
4099 
4100  fFeedback = (TList*) fInput->FindObject("FeedbackList");
4101 
4102  PDB(kFeedback,1) Info("SetupFeedback","\"FeedbackList\" %sfound",
4103  fFeedback == 0 ? "NOT ":"");
4104 
4105  if (fFeedback == 0 || fFeedback->GetSize() == 0) return;
4106 
4107  // OK, feedback was requested, setup the timer
4109  fFeedbackPeriod = 2000;
4110  TProof::GetParameter(fInput, "PROOF_FeedbackPeriod", fFeedbackPeriod);
4111  fFeedbackTimer = new TTimer;
4112  fFeedbackTimer->SetObject(this);
4114 }
4115 
4116 ////////////////////////////////////////////////////////////////////////////////
4117 /// Stop reporting of feedback objects.
4118 
4120 {
4121  if (fFeedbackTimer == 0) return;
4122 
4123  PDB(kFeedback,1) Info("StopFeedback","Stop Timer");
4124 
4126 }
4127 
4128 ////////////////////////////////////////////////////////////////////////////////
4129 /// Send feedback objects to client.
4130 
4132 {
4133  PDB(kFeedback,2) Info("HandleTimer","Entry");
4134 
4135  if (fFeedbackTimer == 0) return kFALSE; // timer already switched off
4136 
4137  // process local feedback objects
4138 
4139  TList *fb = new TList;
4140  fb->SetOwner();
4141 
4142  TIter next(fFeedback);
4143  while( TObjString *name = (TObjString*) next() ) {
4144  TObject *o = fOutput->FindObject(name->GetName());
4145  if (o != 0) {
4146  fb->Add(o->Clone());
4147  // remove the corresponding entry from the feedback list
4148  TMap *m = 0;
4149  if (fFeedbackLists &&
4150  (m = (TMap *) fFeedbackLists->FindObject(name->GetName()))) {
4152  m->DeleteValues();
4153  delete m;
4154  }
4155  }
4156  }
4157 
4158  if (fb->GetSize() > 0) {
4159  StoreFeedback(this, fb); // adopts fb
4160  } else {
4161  delete fb;
4162  }
4163 
4164  if (fFeedbackLists == 0) {
4165  fFeedbackTimer->Start(fFeedbackPeriod, kTRUE); // maybe next time
4166  return kFALSE;
4167  }
4168 
4169  fb = MergeFeedback();
4170 
4171  PDB(kFeedback,2) Info("HandleTimer","Sending %d objects", fb->GetSize());
4172 
4174  m << fb;
4175 
4176  // send message to client;
4177  gProofServ->GetSocket()->Send(m);
4178 
4179  delete fb;
4180 
4182 
4183  return kFALSE; // ignored?
4184 }
4185 
4186 ////////////////////////////////////////////////////////////////////////////////
4187 /// Get next packet for specified slave.
4188 
4190 {
4191  // The first call to this determines the end of initialization
4192  SetInitTime();
4193 
4194  if (fProcPackets) {
4195  Int_t bin = fProcPackets->GetXaxis()->FindBin(slave->GetOrdinal());
4196  if (bin >= 0) {
4197  if (fProcPackets->GetBinContent(bin) > 0)
4198  fProcPackets->Fill(slave->GetOrdinal(), -1);
4199  }
4200  }
4201 
4202  TDSetElement *e = fPacketizer->GetNextPacket( slave, r );
4203 
4204  if (e == 0) {
4205  PDB(kPacketizer,2)
4206  Info("GetNextPacket","%s: done!", slave->GetOrdinal());
4207  } else if (e == (TDSetElement*) -1) {
4208  PDB(kPacketizer,2)
4209  Info("GetNextPacket","%s: waiting ...", slave->GetOrdinal());
4210  } else {
4211  PDB(kPacketizer,2)
4212  Info("GetNextPacket","%s (%s): '%s' '%s' '%s' %lld %lld",
4213  slave->GetOrdinal(), slave->GetName(), e->GetFileName(),
4214  e->GetDirectory(), e->GetObjName(), e->GetFirst(), e->GetNum());
4215  if (fProcPackets) fProcPackets->Fill(slave->GetOrdinal(), 1);
4216  }
4217 
4218  return e;
4219 }
4220 
4221 ////////////////////////////////////////////////////////////////////////////////
4222 /// Is the player running on the client?
4223 
4225 {
4227 }
4228 
4229 ////////////////////////////////////////////////////////////////////////////////
4230 /// Draw (support for TChain::Draw()).
4231 /// Returns -1 in case of error or number of selected events in case of success.
4232 
4234  const char *selection, Option_t *option,
4235  Long64_t nentries, Long64_t firstentry)
4236 {
4237  if (!fgDrawInputPars) {
4238  fgDrawInputPars = new THashList;
4239  fgDrawInputPars->Add(new TObjString("FeedbackList"));
4240  fgDrawInputPars->Add(new TObjString("PROOF_ChainWeight"));
4241  fgDrawInputPars->Add(new TObjString("PROOF_LineColor"));
4242  fgDrawInputPars->Add(new TObjString("PROOF_LineStyle"));
4243  fgDrawInputPars->Add(new TObjString("PROOF_LineWidth"));
4244  fgDrawInputPars->Add(new TObjString("PROOF_MarkerColor"));
4245  fgDrawInputPars->Add(new TObjString("PROOF_MarkerStyle"));
4246  fgDrawInputPars->Add(new TObjString("PROOF_MarkerSize"));
4247  fgDrawInputPars->Add(new TObjString("PROOF_FillColor"));
4248  fgDrawInputPars->Add(new TObjString("PROOF_FillStyle"));
4249  fgDrawInputPars->Add(new TObjString("PROOF_ListOfAliases"));
4250  }
4251 
4252  TString selector, objname;
4253  if (GetDrawArgs(varexp, selection, option, selector, objname) != 0) {
4254  Error("DrawSelect", "parsing arguments");
4255  return -1;
4256  }
4257 
4258  TNamed *varexpobj = new TNamed("varexp", varexp);
4259  TNamed *selectionobj = new TNamed("selection", selection);
4260 
4261  // Save the current input list
4262  TObject *o = 0;
4263  TList *savedInput = new TList;
4264  TIter nxi(fInput);
4265  while ((o = nxi())) {
4266  savedInput->Add(o);
4267  TString n(o->GetName());
4268  if (fgDrawInputPars &&
4270  !n.BeginsWith("alias:")) fInput->Remove(o);
4271  }
4272 
4273  fInput->Add(varexpobj);
4274  fInput->Add(selectionobj);
4275 
4276  // Make sure we have an object name
4277  if (objname == "") objname = "htemp";
4278 
4279  fProof->AddFeedback(objname);
4280  Long64_t r = Process(set, selector, option, nentries, firstentry);
4281  fProof->RemoveFeedback(objname);
4282 
4283  fInput->Remove(varexpobj);
4284  fInput->Remove(selectionobj);
4285  if (TNamed *opt = dynamic_cast<TNamed*> (fInput->FindObject("PROOF_OPTIONS"))) {
4286  fInput->Remove(opt);
4287  delete opt;
4288  }
4289 
4290  delete varexpobj;
4291  delete selectionobj;
4292 
4293  // Restore the input list
4294  fInput->Clear();
4295  TIter nxsi(savedInput);
4296  while ((o = nxsi()))
4297  fInput->Add(o);
4298  savedInput->SetOwner(kFALSE);
4299  delete savedInput;
4300 
4301  return r;
4302 }
4303 
4304 ////////////////////////////////////////////////////////////////////////////////
4305 /// Set init time
4306 
4308 {
4309  if (fPacketizer)
4311 }
4312 
4313 //------------------------------------------------------------------------------
4314 
4315 
4317 
4318 ////////////////////////////////////////////////////////////////////////////////
4319 /// Setup feedback.
4320 
4322 {
4323  TList *fb = (TList*) fInput->FindObject("FeedbackList");
4324  if (fb) {
4325  PDB(kFeedback,1)
4326  Info("SetupFeedback","\"FeedbackList\" found: %d objects", fb->GetSize());
4327  } else {
4328  PDB(kFeedback,1)
4329  Info("SetupFeedback","\"FeedbackList\" NOT found");
4330  }
4331 
4332  if (fb == 0 || fb->GetSize() == 0) return;
4333 
4334  // OK, feedback was requested, setup the timer
4335 
4337  fFeedbackPeriod = 2000;
4338  TProof::GetParameter(fInput, "PROOF_FeedbackPeriod", fFeedbackPeriod);
4339  fFeedbackTimer = new TTimer;
4340  fFeedbackTimer->SetObject(this);
4342 
4343  fFeedback = fb;
4344 }
4345 
4346 ////////////////////////////////////////////////////////////////////////////////
4347 /// Stop feedback.
4348 
4350 {
4351  if (fFeedbackTimer == 0) return;
4352 
4353  PDB(kFeedback,1) Info("StopFeedback","Stop Timer");
4354 
4356 }
4357 
4358 ////////////////////////////////////////////////////////////////////////////////
4359 /// Handle timer event.
4360 
4362 {
4363  PDB(kFeedback,2) Info("HandleTimer","Entry");
4364 
4365  // If in sequential (0-slave-PROOF) mode we do not have a packetizer
4366  // so we also send the info to update the progress bar.
4367  if (gProofServ) {
4368  Bool_t sendm = kFALSE;
4370  if (gProofServ->IsMaster() && !gProofServ->IsParallel()) {
4371  sendm = kTRUE;
4372  if (gProofServ->GetProtocol() > 25) {
4373  m << GetProgressStatus();
4374  } else if (gProofServ->GetProtocol() > 11) {
4376  m << fTotalEvents << ps->GetEntries() << ps->GetBytesRead()
4377  << (Float_t) -1. << (Float_t) ps->GetProcTime()
4378  << (Float_t) ps->GetRate() << (Float_t) -1.;
4379  } else {
4381  }
4382  }
4383  if (sendm) gProofServ->GetSocket()->Send(m);
4384  }
4385 
4386  if (fFeedback == 0) return kFALSE;
4387 
4388  TList *fb = new TList;
4389  fb->SetOwner(kFALSE);
4390 
4391  if (fOutput == 0) {
4393  }
4394 
4395  if (fOutput) {
4396  TIter next(fFeedback);
4397  while( TObjString *name = (TObjString*) next() ) {
4398  // TODO: find object in memory ... maybe allow only in fOutput ?
4399  TObject *o = fOutput->FindObject(name->GetName());
4400  if (o != 0) fb->Add(o);
4401  }
4402  }
4403 
4404  PDB(kFeedback,2) Info("HandleTimer","Sending %d objects", fb->GetSize());
4405 
4407  m << fb;
4408 
4409  // send message to client;
4410  if (gProofServ) gProofServ->GetSocket()->Send(m);
4411 
4412  delete fb;
4413 
4415 
4416  return kFALSE; // ignored?
4417 }
4418 
4419 ////////////////////////////////////////////////////////////////////////////////
4420 /// Handle tree header request.
4421 
4423 {
4425 
4426  TDSet *dset;
4427  (*mess) >> dset;
4428  dset->Reset();
4429  TDSetElement *e = dset->Next();
4430  Long64_t entries = 0;
4431  TFile *f = 0;
4432  TTree *t = 0;
4433  if (!e) {
4434  PDB(kGlobal, 1) Info("HandleGetTreeHeader", "empty TDSet");
4435  } else {
4436  f = TFile::Open(e->GetFileName());
4437  t = 0;
4438  if (f) {
4439  t = (TTree*) f->Get(e->GetObjName());
4440  if (t) {
4441  t->SetMaxVirtualSize(0);
4442  t->DropBaskets();
4443  entries = t->GetEntries();
4444 
4445  // compute #entries in all the files
4446  while ((e = dset->Next()) != 0) {
4447  TFile *f1 = TFile::Open(e->GetFileName());
4448  if (f1) {
4449  TTree *t1 = (TTree*) f1->Get(e->GetObjName());
4450  if (t1) {
4451  entries += t1->GetEntries();
4452  delete t1;
4453  }
4454  delete f1;
4455  }
4456  }
4457  t->SetMaxEntryLoop(entries); // this field will hold the total number of entries ;)
4458  }
4459  }
4460  }
4461  if (t)
4462  answ << TString("Success") << t;
4463  else
4464  answ << TString("Failed") << t;
4465 
4466  fSocket->Send(answ);
4467 
4468  SafeDelete(t);
4469  SafeDelete(f);
4470 }
4471 
4472 
4473 //------------------------------------------------------------------------------
4474 
4476 
4477 ////////////////////////////////////////////////////////////////////////////////
4478 /// Process specified TDSet on PROOF. Runs on super master.
4479 /// The return value is -1 in case of error and TSelector::GetStatus() in
4480 /// in case of success.
4481 
4482 Long64_t TProofPlayerSuperMaster::Process(TDSet *dset, const char *selector_file,
4483  Option_t *option, Long64_t nentries,
4484  Long64_t first)
4485 {
4487  PDB(kGlobal,1) Info("Process","Enter");
4488 
4489  TProofSuperMaster *proof = dynamic_cast<TProofSuperMaster*>(GetProof());
4490  if (!proof) return -1;
4491 
4492  delete fOutput;
4493  fOutput = new THashList;
4494 
4496 
4497  if (!SendSelector(selector_file)) {
4498  Error("Process", "sending selector %s", selector_file);
4499  return -1;
4500  }
4501 
4502  TCleanup clean(this);
4503  SetupFeedback();
4504 
4505  if (proof->IsMaster()) {
4506 
4507  // make sure the DSet is valid
4508  if (!dset->ElementsValid()) {
4509  proof->ValidateDSet(dset);
4510  if (!dset->ElementsValid()) {
4511  Error("Process", "could not validate TDSet");
4512  return -1;
4513  }
4514  }
4515 
4516  TList msds;
4517  msds.SetOwner(); // This will delete TPairs
4518 
4519  TList keyholder; // List to clean up key part of the pairs
4520  keyholder.SetOwner();
4521  TList valueholder; // List to clean up value part of the pairs
4522  valueholder.SetOwner();
4523 
4524  // Construct msd list using the slaves
4525  TIter nextslave(proof->GetListOfActiveSlaves());
4526  while (TSlave *sl = dynamic_cast<TSlave*>(nextslave())) {
4527  TList *submasters = 0;
4528  TPair *msd = dynamic_cast<TPair*>(msds.FindObject(sl->GetMsd()));
4529  if (!msd) {
4530  submasters = new TList;
4531  submasters->SetName(sl->GetMsd());
4532  keyholder.Add(submasters);
4533  TList *setelements = new TSortedList(kSortDescending);
4534  setelements->SetName(TString(sl->GetMsd())+"_Elements");
4535  valueholder.Add(setelements);
4536  msds.Add(new TPair(submasters, setelements));
4537  } else {
4538  submasters = dynamic_cast<TList*>(msd->Key());
4539  }
4540  if (submasters) submasters->Add(sl);
4541  }
4542 
4543  // Add TDSetElements to msd list
4544  Long64_t cur = 0; //start of next element
4545  TIter nextelement(dset->GetListOfElements());
4546  while (TDSetElement *elem = dynamic_cast<TDSetElement*>(nextelement())) {
4547 
4548  if (elem->GetNum()<1) continue; // get rid of empty elements
4549 
4550  if (nentries !=-1 && cur>=first+nentries) {
4551  // we are done
4552  break;
4553  }
4554 
4555  if (cur+elem->GetNum()-1<first) {
4556  //element is before first requested entry
4557  cur+=elem->GetNum();
4558  continue;
4559  }
4560 
4561  if (cur<first) {
4562  //modify element to get proper start
4563  elem->SetNum(elem->GetNum()-(first-cur));
4564  elem->SetFirst(elem->GetFirst()+first-cur);
4565  cur=first;
4566  }
4567 
4568  if (nentries==-1 || cur+elem->GetNum()<=first+nentries) {
4569  cur+=elem->GetNum();
4570  } else {
4571  //modify element to get proper end
4572  elem->SetNum(first+nentries-cur);
4573  cur=first+nentries;
4574  }
4575 
4576  TPair *msd = dynamic_cast<TPair*>(msds.FindObject(elem->GetMsd()));
4577  if (!msd) {
4578  Error("Process", "data requires mass storage domain '%s'"
4579  " which is not accessible in this proof session",
4580  elem->GetMsd());
4581  return -1;
4582  } else {
4583  TList *elements = dynamic_cast<TList*>(msd->Value());
4584  if (elements) elements->Add(elem);
4585  }
4586  }
4587 
4588  TList usedmasters;
4589  TIter nextmsd(msds.MakeIterator());
4590  while (TPair *msd = dynamic_cast<TPair*>(nextmsd())) {
4591  TList *submasters = dynamic_cast<TList*>(msd->Key());
4592  TList *setelements = dynamic_cast<TList*>(msd->Value());
4593 
4594  // distribute elements over the masters
4595  Int_t nmasters = submasters ? submasters->GetSize() : -1;
4596  Int_t nelements = setelements ? setelements->GetSize() : -1;
4597  for (Int_t i=0; i<nmasters; i++) {
4598 
4599  Long64_t nent = 0;
4600  TDSet set(dset->GetType(), dset->GetObjName(),
4601  dset->GetDirectory());
4602  for (Int_t j = (i*nelements)/nmasters;
4603  j < ((i+1)*nelements)/nmasters;
4604  j++) {
4605  TDSetElement *elem = setelements ?
4606  dynamic_cast<TDSetElement*>(setelements->At(j)) : (TDSetElement *)0;
4607  if (elem) {
4608  set.Add(elem->GetFileName(), elem->GetObjName(),
4609  elem->GetDirectory(), elem->GetFirst(),
4610  elem->GetNum(), elem->GetMsd());
4611  nent += elem->GetNum();
4612  } else {
4613  Warning("Process", "not a TDSetElement object");
4614  }
4615  }
4616 
4617  if (set.GetListOfElements()->GetSize()>0) {
4618  TMessage mesg(kPROOF_PROCESS);
4619  TString fn(gSystem->BaseName(selector_file));
4620  TString opt = option;
4621  mesg << &set << fn << fInput << opt << Long64_t(-1) << Long64_t(0);
4622 
4623  TSlave *sl = dynamic_cast<TSlave*>(submasters->At(i));
4624  if (sl) {
4625  PDB(kGlobal,1) Info("Process",
4626  "Sending TDSet with %d elements to submaster %s",
4627  set.GetListOfElements()->GetSize(),
4628  sl->GetOrdinal());
4629  sl->GetSocket()->Send(mesg);
4630  usedmasters.Add(sl);
4631 
4632  // setup progress info
4633  fSlaves.AddLast(sl);
4637  fSlaveTotals[fSlaveTotals.GetSize()-1] = nent;
4645  fSlaveEvtRti[fSlaveEvtRti.GetSize()-1] = -1.;
4647  fSlaveMBRti[fSlaveMBRti.GetSize()-1] = -1.;
4649  fSlaveActW[fSlaveActW.GetSize()-1] = 0;
4651  fSlaveTotS[fSlaveTotS.GetSize()-1] = 0;
4653  fSlaveEffS[fSlaveEffS.GetSize()-1] = 0.;
4654  } else {
4655  Warning("Process", "not a TSlave object");
4656  }
4657  }
4658  }
4659  }
4660 
4661  if ( !IsClient() ) HandleTimer(0);
4662  PDB(kGlobal,1) Info("Process","Calling Collect");
4663  proof->Collect(&usedmasters);
4664  HandleTimer(0);
4665 
4666  }
4667 
4668  StopFeedback();
4669 
4670  PDB(kGlobal,1) Info("Process","Calling Merge Output");
4671  MergeOutput();
4672 
4673  TPerfStats::Stop();
4674 
4675  return 0;
4676 }
4677 
4678 ////////////////////////////////////////////////////////////////////////////////
4679 /// Report progress.
4680 
4682 {
4683  Int_t idx = fSlaves.IndexOf(sl);
4684  fSlaveProgress[idx] = processed;
4685  if (fSlaveTotals[idx] != total)
4686  Warning("Progress", "total events has changed for slave %s", sl->GetName());
4687  fSlaveTotals[idx] = total;
4688 
4689  Long64_t tot = 0;
4690  Int_t i;
4691  for (i = 0; i < fSlaveTotals.GetSize(); i++) tot += fSlaveTotals[i];
4692  Long64_t proc = 0;
4693  for (i = 0; i < fSlaveProgress.GetSize(); i++) proc += fSlaveProgress[i];
4694 
4695  Progress(tot, proc);
4696 }
4697 
4698 ////////////////////////////////////////////////////////////////////////////////
4699 /// Report progress.
4700 
4702  Long64_t processed, Long64_t bytesread,
4703  Float_t initTime, Float_t procTime,
4704  Float_t evtrti, Float_t mbrti)
4705 {
4706  PDB(kGlobal,2)
4707  Info("Progress","%s: %lld %lld %f %f %f %f", sl->GetName(),
4708  processed, bytesread, initTime, procTime, evtrti, mbrti);
4709 
4710  Int_t idx = fSlaves.IndexOf(sl);
4711  if (fSlaveTotals[idx] != total)
4712  Warning("Progress", "total events has changed for slave %s", sl->GetName());
4713  fSlaveTotals[idx] = total;
4714  fSlaveProgress[idx] = processed;
4715  fSlaveBytesRead[idx] = bytesread;
4716  fSlaveInitTime[idx] = (initTime > -1.) ? initTime : fSlaveInitTime[idx];
4717  fSlaveProcTime[idx] = (procTime > -1.) ? procTime : fSlaveProcTime[idx];
4718  fSlaveEvtRti[idx] = (evtrti > -1.) ? evtrti : fSlaveEvtRti[idx];
4719  fSlaveMBRti[idx] = (mbrti > -1.) ? mbrti : fSlaveMBRti[idx];
4720 
4721  Int_t i;
4722  Long64_t tot = 0;
4723  Long64_t proc = 0;
4724  Long64_t bytes = 0;
4725  Float_t init = -1.;
4726  Float_t ptime = -1.;
4727  Float_t erti = 0.;
4728  Float_t srti = 0.;
4729  Int_t nerti = 0;
4730  Int_t nsrti = 0;
4731  for (i = 0; i < fSlaveTotals.GetSize(); i++) {
4732  tot += fSlaveTotals[i];
4733  if (i < fSlaveProgress.GetSize())
4734  proc += fSlaveProgress[i];
4735  if (i < fSlaveBytesRead.GetSize())
4736  bytes += fSlaveBytesRead[i];
4737  if (i < fSlaveInitTime.GetSize())
4738  if (fSlaveInitTime[i] > -1. && (init < 0. || fSlaveInitTime[i] < init))
4739  init = fSlaveInitTime[i];
4740  if (i < fSlaveProcTime.GetSize())
4741  if (fSlaveProcTime[i] > -1. && (ptime < 0. || fSlaveProcTime[i] > ptime))
4742  ptime = fSlaveProcTime[i];
4743  if (i < fSlaveEvtRti.GetSize())
4744  if (fSlaveEvtRti[i] > -1.) {
4745  erti += fSlaveEvtRti[i];
4746  nerti++;
4747  }
4748  if (i < fSlaveMBRti.GetSize())
4749  if (fSlaveMBRti[i] > -1.) {
4750  srti += fSlaveMBRti[i];
4751  nsrti++;
4752  }
4753  }
4754  srti = (nsrti > 0) ? srti / nerti : 0.;
4755 
4756  Progress(tot, proc, bytes, init, ptime, erti, srti);
4757 }
4758 
4759 ////////////////////////////////////////////////////////////////////////////////
4760 /// Progress signal.
4761 
4763 {
4764  if (pi) {
4765  PDB(kGlobal,2)
4766  Info("Progress","%s: %lld %lld %lld %f %f %f %f %d %f", wrk->GetOrdinal(),
4767  pi->fTotal, pi->fProcessed, pi->fBytesRead,
4768  pi->fInitTime, pi->fProcTime, pi->fEvtRateI, pi->fMBRateI,
4769  pi->fActWorkers, pi->fEffSessions);
4770 
4771  Int_t idx = fSlaves.IndexOf(wrk);
4772  if (fSlaveTotals[idx] != pi->fTotal)
4773  Warning("Progress", "total events has changed for worker %s", wrk->GetName());
4774  fSlaveTotals[idx] = pi->fTotal;
4775  fSlaveProgress[idx] = pi->fProcessed;
4776  fSlaveBytesRead[idx] = pi->fBytesRead;
4777  fSlaveInitTime[idx] = (pi->fInitTime > -1.) ? pi->fInitTime : fSlaveInitTime[idx];
4778  fSlaveProcTime[idx] = (pi->fProcTime > -1.) ? pi->fProcTime : fSlaveProcTime[idx];
4779  fSlaveEvtRti[idx] = (pi->fEvtRateI > -1.) ? pi->fEvtRateI : fSlaveEvtRti[idx];
4780  fSlaveMBRti[idx] = (pi->fMBRateI > -1.) ? pi->fMBRateI : fSlaveMBRti[idx];
4781  fSlaveActW[idx] = (pi->fActWorkers > -1) ? pi->fActWorkers : fSlaveActW[idx];
4782  fSlaveTotS[idx] = (pi->fTotSessions > -1) ? pi->fTotSessions : fSlaveTotS[idx];
4783  fSlaveEffS[idx] = (pi->fEffSessions > -1.) ? pi->fEffSessions : fSlaveEffS[idx];
4784 
4785  Int_t i;
4786  Int_t nerti = 0;
4787  Int_t nsrti = 0;
4788  TProofProgressInfo pisum(0, 0, 0, -1., -1., 0., 0., 0, 0, 0.);
4789  for (i = 0; i < fSlaveTotals.GetSize(); i++) {
4790  pisum.fTotal += fSlaveTotals[i];
4791  if (i < fSlaveProgress.GetSize())
4792  pisum.fProcessed += fSlaveProgress[i];
4793  if (i < fSlaveBytesRead.GetSize())
4794  pisum.fBytesRead += fSlaveBytesRead[i];
4795  if (i < fSlaveInitTime.GetSize())
4796  if (fSlaveInitTime[i] > -1. && (pisum.fInitTime < 0. || fSlaveInitTime[i] < pisum.fInitTime))
4797  pisum.fInitTime = fSlaveInitTime[i];
4798  if (i < fSlaveProcTime.GetSize())
4799  if (fSlaveProcTime[i] > -1. && (pisum.fProcTime < 0. || fSlaveProcTime[i] > pisum.fProcTime))
4800  pisum.fProcTime = fSlaveProcTime[i];
4801  if (i < fSlaveEvtRti.GetSize())
4802  if (fSlaveEvtRti[i] > -1.) {
4803  pisum.fEvtRateI += fSlaveEvtRti[i];
4804  nerti++;
4805  }
4806  if (i < fSlaveMBRti.GetSize())
4807  if (fSlaveMBRti[i] > -1.) {
4808  pisum.fMBRateI += fSlaveMBRti[i];
4809  nsrti++;
4810  }
4811  if (i < fSlaveActW.GetSize())
4812  pisum.fActWorkers += fSlaveActW[i];
4813  if (i < fSlaveTotS.GetSize())
4814  if (fSlaveTotS[i] > -1 && (pisum.fTotSessions < 0. || fSlaveTotS[i] > pisum.fTotSessions))
4815  pisum.fTotSessions = fSlaveTotS[i];
4816  if (i < fSlaveEffS.GetSize())
4817  if (fSlaveEffS[i] > -1. && (pisum.fEffSessions < 0. || fSlaveEffS[i] > pisum.fEffSessions))
4818  pisum.fEffSessions = fSlaveEffS[i];
4819  }
4820  pisum.fMBRateI = (nsrti > 0) ? pisum.fMBRateI / nerti : 0.;
4821 
4822  Progress(&pisum);
4823  }
4824 }
4825 
4826 ////////////////////////////////////////////////////////////////////////////////
4827 /// Send progress and feedback to client.
4828 
4830 {
4831  if (fFeedbackTimer == 0) return kFALSE; // timer stopped already
4832 
4833  Int_t i;
4834  Long64_t tot = 0;
4835  Long64_t proc = 0;
4836  Long64_t bytes = 0;
4837  Float_t init = -1.;
4838  Float_t ptime = -1.;
4839  Float_t erti = 0.;
4840  Float_t srti = 0.;
4841  Int_t nerti = 0;
4842  Int_t nsrti = 0;
4843  for (i = 0; i < fSlaveTotals.GetSize(); i++) {
4844  tot += fSlaveTotals[i];
4845  if (i < fSlaveProgress.GetSize())
4846  proc += fSlaveProgress[i];
4847  if (i < fSlaveBytesRead.GetSize())
4848  bytes += fSlaveBytesRead[i];
4849  if (i < fSlaveInitTime.GetSize())
4850  if (fSlaveInitTime[i] > -1. && (init < 0. || fSlaveInitTime[i] < init))
4851  init = fSlaveInitTime[i];
4852  if (i < fSlaveProcTime.GetSize())
4853  if (fSlaveProcTime[i] > -1. && (ptime < 0. || fSlaveProcTime[i] > ptime))
4854  ptime = fSlaveProcTime[i];
4855  if (i < fSlaveEvtRti.GetSize())
4856  if (fSlaveEvtRti[i] > -1.) {
4857  erti += fSlaveEvtRti[i];
4858  nerti++;
4859  }
4860  if (i < fSlaveMBRti.GetSize())
4861  if (fSlaveMBRti[i] > -1.) {
4862  srti += fSlaveMBRti[i];
4863  nsrti++;
4864  }
4865  }
4866  erti = (nerti > 0) ? erti / nerti : 0.;
4867  srti = (nsrti > 0) ? srti / nerti : 0.;
4868 
4870  if (gProofServ->GetProtocol() > 25) {
4871  // Fill the message now
4872  TProofProgressInfo pi(tot, proc, bytes, init, ptime,
4873  erti, srti, -1,
4875  m << &pi;
4876  } else {
4877 
4878  m << tot << proc << bytes << init << ptime << erti << srti;
4879  }
4880 
4881  // send message to client;
4882  gProofServ->GetSocket()->Send(m);
4883 
4884  if (fReturnFeedback)
4886  else
4887  return</