Logo ROOT  
Reference Guide
TProofPlayer.cxx
Go to the documentation of this file.
1 // @(#)root/proofplayer:$Id$
2 // Author: Maarten Ballintijn 07/01/02
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2001, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 /** \class TProofPlayer
13 \ingroup proofkernel
14 
15 Internal class steering processing in PROOF.
16 Instances of the TProofPlayer class are created on the worker nodes
17 per session and do the processing.
18 Instances of its subclass - TProofPlayerRemote are created per each
19 query on the master(s) and on the client. On the master(s),
20 TProofPlayerRemote coordinate processing, check the dataset, create
21 the packetizer and take care of merging the results of the workers.
22 The instance on the client collects information on the input
23 (dataset and selector), it invokes the Begin() method and finalizes
24 the query by calling Terminate().
25 
26 */
27 
28 #include "TProofDraw.h"
29 #include "TProofPlayer.h"
30 #include "THashList.h"
31 #include "TEnv.h"
32 #include "TEventIter.h"
33 #include "TVirtualPacketizer.h"
34 #include "TSelector.h"
35 #include "TSocket.h"
36 #include "TProofServ.h"
37 #include "TProof.h"
38 #include "TProofOutputFile.h"
39 #include "TProofSuperMaster.h"
40 #include "TSlave.h"
41 #include "TClass.h"
42 #include "TROOT.h"
43 #include "TError.h"
44 #include "TException.h"
45 #include "MessageTypes.h"
46 #include "TMessage.h"
47 #include "TDSetProxy.h"
48 #include "TString.h"
49 #include "TSystem.h"
50 #include "TFile.h"
51 #include "TFileCollection.h"
52 #include "TFileInfo.h"
53 #include "TFileMerger.h"
54 #include "TProofDebug.h"
55 #include "TTimer.h"
56 #include "TMap.h"
57 #include "TPerfStats.h"
58 #include "TStatus.h"
59 #include "TEventList.h"
60 #include "TProofLimitsFinder.h"
61 #include "TSortedList.h"
62 #include "TTree.h"
63 #include "TEntryList.h"
64 #include "TDSet.h"
65 #include "TDrawFeedback.h"
66 #include "TNamed.h"
67 #include "TObjString.h"
68 #include "TQueryResult.h"
69 #include "TMD5.h"
70 #include "TMethodCall.h"
71 #include "TObjArray.h"
72 #include "TH1.h"
73 #include "TVirtualMonitoring.h"
74 #include "TParameter.h"
76 #include "TStopwatch.h"
77 
78 // Timeout exception
79 #define kPEX_STOPPED 1001
80 #define kPEX_ABORTED 1002
81 
82 // To flag an abort condition: use a local static variable to avoid
83 // warnings about problems with longjumps
84 static Bool_t gAbort = kFALSE;
85 
86 class TAutoBinVal : public TNamed {
87 private:
88  Double_t fXmin, fXmax, fYmin, fYmax, fZmin, fZmax;
89 
90 public:
91  TAutoBinVal(const char *name, Double_t xmin, Double_t xmax, Double_t ymin,
92  Double_t ymax, Double_t zmin, Double_t zmax) : TNamed(name,"")
93  {
94  fXmin = xmin; fXmax = xmax;
95  fYmin = ymin; fYmax = ymax;
96  fZmin = zmin; fZmax = zmax;
97  }
98  void GetAll(Double_t& xmin, Double_t& xmax, Double_t& ymin,
99  Double_t& ymax, Double_t& zmin, Double_t& zmax)
100  {
101  xmin = fXmin; xmax = fXmax;
102  ymin = fYmin; ymax = fYmax;
103  zmin = fZmin; zmax = fZmax;
104  }
105 
106 };
107 
108 //
109 // Special timer to dispatch pending events while processing
110 ////////////////////////////////////////////////////////////////////////////////
111 
112 class TDispatchTimer : public TTimer {
113 private:
114  TProofPlayer *fPlayer;
115 
116 public:
117  TDispatchTimer(TProofPlayer *p) : TTimer(1000, kFALSE), fPlayer(p) { }
118 
119  Bool_t Notify();
120 };
121 ////////////////////////////////////////////////////////////////////////////////
122 /// Handle expiration of the timer associated with dispatching pending
123 /// events while processing. We must act as fast as possible here, so
124 /// we just set a flag submitting a request for dispatching pending events
125 
126 Bool_t TDispatchTimer::Notify()
127 {
128  if (gDebug > 0) printf("TDispatchTimer::Notify: called!\n");
129 
130  fPlayer->SetBit(TProofPlayer::kDispatchOneEvent);
131 
132  // Needed for the next shot
133  Reset();
134  return kTRUE;
135 }
136 
137 //
138 // Special timer to notify reach of max packet proc time
139 ////////////////////////////////////////////////////////////////////////////////
140 
141 class TProctimeTimer : public TTimer {
142 private:
143  TProofPlayer *fPlayer;
144 
145 public:
146  TProctimeTimer(TProofPlayer *p, Long_t to) : TTimer(to, kFALSE), fPlayer(p) { }
147 
148  Bool_t Notify();
149 };
150 ////////////////////////////////////////////////////////////////////////////////
151 /// Handle expiration of the timer associated with dispatching pending
152 /// events while processing. We must act as fast as possible here, so
153 /// we just set a flag submitting a request for dispatching pending events
154 
155 Bool_t TProctimeTimer::Notify()
156 {
157  if (gDebug > 0) printf("TProctimeTimer::Notify: called!\n");
158 
159  fPlayer->SetBit(TProofPlayer::kMaxProcTimeReached);
160 
161  // One shot only
162  return kTRUE;
163 }
164 
165 //
166 // Special timer to handle stop/abort request via exception raising
167 ////////////////////////////////////////////////////////////////////////////////
168 
169 class TStopTimer : public TTimer {
170 private:
171  Bool_t fAbort;
172  TProofPlayer *fPlayer;
173 
174 public:
175  TStopTimer(TProofPlayer *p, Bool_t abort, Int_t to);
176 
177  Bool_t Notify();
178 };
179 
180 ////////////////////////////////////////////////////////////////////////////////
181 /// Constructor for the timer to stop/abort processing.
182 /// The 'timeout' is in seconds.
183 /// Make sure that 'to' make sense, i.e. not larger than 10 days;
184 /// the minimum value is 10 ms (0 does not seem to start the timer ...).
185 
186 TStopTimer::TStopTimer(TProofPlayer *p, Bool_t abort, Int_t to)
187  : TTimer(((to <= 0 || to > 864000) ? 10 : to * 1000), kFALSE)
188 {
189  if (gDebug > 0)
190  Info ("TStopTimer","enter: %d, timeout: %d", abort, to);
191 
192  fPlayer = p;
193  fAbort = abort;
194 
195  if (gDebug > 1)
196  Info ("TStopTimer","timeout set to %s ms", fTime.AsString());
197 }
198 
199 ////////////////////////////////////////////////////////////////////////////////
200 /// Handle the signal coming from the expiration of the timer
201 /// associated with an abort or stop request.
202 /// We raise an exception which will be processed in the
203 /// event loop.
204 
205 Bool_t TStopTimer::Notify()
206 {
207  if (gDebug > 0) printf("TStopTimer::Notify: called!\n");
208 
209  if (fAbort)
211  else
213 
214  return kTRUE;
215 }
216 
217 //------------------------------------------------------------------------------
218 
220 
222 
223 ////////////////////////////////////////////////////////////////////////////////
224 /// Default ctor.
225 
227  : fAutoBins(0), fOutput(0), fSelector(0), fCreateSelObj(kTRUE), fSelectorClass(0),
228  fFeedbackTimer(0), fFeedbackPeriod(2000),
229  fEvIter(0), fSelStatus(0),
230  fTotalEvents(0), fReadBytesRun(0), fReadCallsRun(0), fProcessedRun(0),
231  fQueryResults(0), fQuery(0), fPreviousQuery(0), fDrawQueries(0),
232  fMaxDrawQueries(1), fStopTimer(0), fDispatchTimer(0),
233  fProcTimeTimer(0), fProcTime(0),
234  fOutputFile(0),
235  fSaveMemThreshold(-1), fSavePartialResults(kFALSE), fSaveResultsPerPacket(kFALSE)
236 {
237  fInput = new TList;
244 
245  static Bool_t initLimitsFinder = kFALSE;
246  if (!initLimitsFinder && gProofServ && !gProofServ->IsMaster()) {
248  initLimitsFinder = kTRUE;
249  }
250 
251 }
252 
253 ////////////////////////////////////////////////////////////////////////////////
254 /// Destructor.
255 
257 {
258  fInput->Clear("nodelete");
260  // The output list is owned by fSelector and destroyed in there
269 }
270 
271 ////////////////////////////////////////////////////////////////////////////////
272 /// Set processing bit according to 'on'
273 
275 {
276  if (on)
278  else
280 }
281 
282 ////////////////////////////////////////////////////////////////////////////////
283 /// Stop the process after this event. If timeout is positive, start
284 /// a timer firing after timeout seconds to hard-stop time-expensive
285 /// events.
286 
288 {
289  if (gDebug > 0)
290  Info ("StopProcess","abort: %d, timeout: %d", abort, timeout);
291 
292  if (fEvIter != 0)
293  fEvIter->StopProcess(abort);
294  Long_t to = 1;
295  if (abort == kTRUE) {
297  } else {
299  to = timeout;
300  }
301  // Start countdown, if needed
302  if (to > 0)
303  SetStopTimer(kTRUE, abort, to);
304 }
305 
306 ////////////////////////////////////////////////////////////////////////////////
307 /// Enable/disable the timer to dispatch pening events while processing.
308 
310 {
313  if (on) {
314  fDispatchTimer = new TDispatchTimer(this);
316  }
317 }
318 
319 ////////////////////////////////////////////////////////////////////////////////
320 /// Enable/disable the timer to stop/abort processing.
321 /// The 'timeout' is in seconds.
322 
324 {
325  std::lock_guard<std::mutex> lock(fStopTimerMtx);
326 
327  // Clean-up the timer
329  if (on) {
330  // create timer
331  fStopTimer = new TStopTimer(this, abort, timeout);
332  // Start the countdown
333  fStopTimer->Start();
334  if (gDebug > 0)
335  Info ("SetStopTimer", "%s timer STARTED (timeout: %d)",
336  (abort ? "ABORT" : "STOP"), timeout);
337  } else {
338  if (gDebug > 0)
339  Info ("SetStopTimer", "timer STOPPED");
340  }
341 }
342 
343 ////////////////////////////////////////////////////////////////////////////////
344 /// Add query result to the list, making sure that there are no
345 /// duplicates.
346 
348 {
349  if (!q) {
350  Warning("AddQueryResult","query undefined - do nothing");
351  return;
352  }
353 
354  // Treat differently normal and draw queries
355  if (!(q->IsDraw())) {
356  if (!fQueryResults) {
357  fQueryResults = new TList;
358  fQueryResults->Add(q);
359  } else {
360  TIter nxr(fQueryResults);
361  TQueryResult *qr = 0;
362  TQueryResult *qp = 0;
363  while ((qr = (TQueryResult *) nxr())) {
364  // If same query, remove old version and break
365  if (*qr == *q) {
366  fQueryResults->Remove(qr);
367  delete qr;
368  break;
369  }
370  // Record position according to start time
371  if (qr->GetStartTime().Convert() <= q->GetStartTime().Convert())
372  qp = qr;
373  }
374 
375  if (!qp) {
377  } else {
378  fQueryResults->AddAfter(qp, q);
379  }
380  }
381  } else if (IsClient()) {
382  // If max reached, eliminate first the oldest one
384  TIter nxr(fQueryResults);
385  TQueryResult *qr = 0;
386  while ((qr = (TQueryResult *) nxr())) {
387  // If same query, remove old version and break
388  if (qr->IsDraw()) {
389  fDrawQueries--;
390  fQueryResults->Remove(qr);
391  delete qr;
392  break;
393  }
394  }
395  }
396  // Add new draw query
397  if (fDrawQueries >= 0 && fDrawQueries < fMaxDrawQueries) {
398  fDrawQueries++;
399  if (!fQueryResults)
400  fQueryResults = new TList;
401  fQueryResults->Add(q);
402  }
403  }
404 }
405 
406 ////////////////////////////////////////////////////////////////////////////////
407 /// Remove all query result instances referenced 'ref' from
408 /// the list of results.
409 
410 void TProofPlayer::RemoveQueryResult(const char *ref)
411 {
412  if (fQueryResults) {
413  TIter nxq(fQueryResults);
414  TQueryResult *qr = 0;
415  while ((qr = (TQueryResult *) nxq())) {
416  if (qr->Matches(ref)) {
417  fQueryResults->Remove(qr);
418  delete qr;
419  }
420  }
421  }
422 }
423 
424 ////////////////////////////////////////////////////////////////////////////////
425 /// Get query result instances referenced 'ref' from
426 /// the list of results.
427 
429 {
430  if (fQueryResults) {
431  if (ref && strlen(ref) > 0) {
432  TIter nxq(fQueryResults);
433  TQueryResult *qr = 0;
434  while ((qr = (TQueryResult *) nxq())) {
435  if (qr->Matches(ref))
436  return qr;
437  }
438  } else {
439  // Get last
440  return (TQueryResult *) fQueryResults->Last();
441  }
442  }
443 
444  // Nothing found
445  return (TQueryResult *)0;
446 }
447 
448 ////////////////////////////////////////////////////////////////////////////////
449 /// Set current query and save previous value.
450 
452 {
454  fQuery = q;
455 }
456 
457 ////////////////////////////////////////////////////////////////////////////////
458 /// Add object to input list.
459 
461 {
462  fInput->Add(inp);
463 }
464 
465 ////////////////////////////////////////////////////////////////////////////////
466 /// Clear input list.
467 
469 {
470  fInput->Clear();
471 }
472 
473 ////////////////////////////////////////////////////////////////////////////////
474 /// Get output object by name.
475 
477 {
478  if (fOutput)
479  return fOutput->FindObject(name);
480  return 0;
481 }
482 
483 ////////////////////////////////////////////////////////////////////////////////
484 /// Get output list.
485 
487 {
488  TList *ol = fOutput;
489  if (!ol && fQuery)
490  ol = fQuery->GetOutputList();
491  return ol;
492 }
493 
494 ////////////////////////////////////////////////////////////////////////////////
495 /// Reinitialize fSelector using the selector files in the query result.
496 /// Needed when Finalize is called after a Process execution for the same
497 /// selector name.
498 
500 {
501  Int_t rc = 0;
502 
503  // Make sure we have a query
504  if (!qr) {
505  Info("ReinitSelector", "query undefined - do nothing");
506  return -1;
507  }
508 
509  // Selector name
510  TString selec = qr->GetSelecImp()->GetName();
511  if (selec.Length() <= 0) {
512  Info("ReinitSelector", "selector name undefined - do nothing");
513  return -1;
514  }
515 
516  // Find out if this is a standard selection used for Draw actions
517  Bool_t stdselec = TSelector::IsStandardDraw(selec);
518 
519  // Find out if this is a precompiled selector: in such a case we do not
520  // have the code in TMacros, so we must rely on local libraries
521  Bool_t compselec = (selec.Contains(".") || stdselec) ? kFALSE : kTRUE;
522 
523  // If not, find out if it needs to be expanded
524  TString ipathold;
525  if (!stdselec && !compselec) {
526  // Check checksums for the versions of the selector files
527  Bool_t expandselec = kTRUE;
528  TString dir, ipath;
529  char *selc = gSystem->Which(TROOT::GetMacroPath(), selec, kReadPermission);
530  if (selc) {
531  // Check checksums
532  TMD5 *md5icur = 0, *md5iold = 0, *md5hcur = 0, *md5hold = 0;
533  // Implementation files
534  md5icur = TMD5::FileChecksum(selc);
535  md5iold = qr->GetSelecImp()->Checksum();
536  // Header files
537  TString selh(selc);
538  Int_t dot = selh.Last('.');
539  if (dot != kNPOS) selh.Remove(dot);
540  selh += ".h";
542  md5hcur = TMD5::FileChecksum(selh);
543  md5hold = qr->GetSelecHdr()->Checksum();
544 
545  // If nothing has changed nothing to do
546  if (md5hcur && md5hold && md5icur && md5iold)
547  if (*md5hcur == *md5hold && *md5icur == *md5iold)
548  expandselec = kFALSE;
549 
550  SafeDelete(md5icur);
551  SafeDelete(md5hcur);
552  SafeDelete(md5iold);
553  SafeDelete(md5hold);
554  if (selc) delete [] selc;
555  }
556 
557  Bool_t ok = kTRUE;
558  // Expand selector files, if needed
559  if (expandselec) {
560 
561  ok = kFALSE;
562  // Expand files in a temporary directory
563  TUUID u;
564  dir = Form("%s/%s",gSystem->TempDirectory(),u.AsString());
565  if (!(gSystem->MakeDirectory(dir))) {
566 
567  // Export implementation file
568  selec = Form("%s/%s",dir.Data(),selec.Data());
569  qr->GetSelecImp()->SaveSource(selec);
570 
571  // Export header file
572  TString seleh = Form("%s/%s",dir.Data(),qr->GetSelecHdr()->GetName());
573  qr->GetSelecHdr()->SaveSource(seleh);
574 
575  // Adjust include path
576  ipathold = gSystem->GetIncludePath();
577  ipath = Form("-I%s %s", dir.Data(), gSystem->GetIncludePath());
578  gSystem->SetIncludePath(ipath.Data());
579 
580  ok = kTRUE;
581  }
582  }
583  TString opt(qr->GetOptions());
584  Ssiz_t id = opt.Last('#');
585  if (id != kNPOS && id < opt.Length() - 1)
586  selec += opt(id + 1, opt.Length());
587 
588  if (!ok) {
589  Info("ReinitSelector", "problems locating or exporting selector files");
590  return -1;
591  }
592  }
593 
594  // Cleanup previous stuff
596  fSelectorClass = 0;
597 
598  // Init the selector now
599  Int_t iglevelsave = gErrorIgnoreLevel;
600  if (compselec)
601  // Silent error printout on first attempt
603 
604  if ((fSelector = TSelector::GetSelector(selec))) {
605  if (compselec)
606  gErrorIgnoreLevel = iglevelsave; // restore ignore level
607  fSelectorClass = fSelector->IsA();
609 
610  } else {
611  if (compselec) {
612  gErrorIgnoreLevel = iglevelsave; // restore ignore level
613  // Retry by loading first the libraries listed in TQueryResult, if any
614  if (strlen(qr->GetLibList()) > 0) {
615  TString sl(qr->GetLibList());
616  TObjArray *oa = sl.Tokenize(" ");
617  if (oa) {
618  Bool_t retry = kFALSE;
619  TIter nxl(oa);
620  TObjString *os = 0;
621  while ((os = (TObjString *) nxl())) {
622  TString lib = gSystem->BaseName(os->GetName());
623  if (lib != "lib") {
624  lib.ReplaceAll("-l", "lib");
625  if (gSystem->Load(lib) == 0)
626  retry = kTRUE;
627  }
628  }
629  // Retry now, if the case
630  if (retry)
632  }
633  }
634  }
635  if (!fSelector) {
636  if (compselec)
637  Info("ReinitSelector", "compiled selector re-init failed:"
638  " automatic reload unsuccessful:"
639  " please load manually the correct library");
640  rc = -1;
641  }
642  }
643  if (fSelector) {
644  // Draw needs to reinit temp histos
646  if (stdselec) {
647  ((TProofDraw *)fSelector)->DefVar();
648  } else {
649  // variables may have been initialized in Begin()
650  fSelector->Begin(0);
651  }
652  }
653 
654  // Restore original include path, if needed
655  if (ipathold.Length() > 0)
656  gSystem->SetIncludePath(ipathold.Data());
657 
658  return rc;
659 }
660 
661 ////////////////////////////////////////////////////////////////////////////////
662 /// Incorporate output object (may not be used in this class).
663 
665 {
666  MayNotUse("AddOutputObject");
667  return -1;
668 }
669 
670 ////////////////////////////////////////////////////////////////////////////////
671 /// Incorporate output list (may not be used in this class).
672 
674 {
675  MayNotUse("AddOutput");
676 }
677 
678 ////////////////////////////////////////////////////////////////////////////////
679 /// Store output list (may not be used in this class).
680 
682 {
683  MayNotUse("StoreOutput");
684 }
685 
686 ////////////////////////////////////////////////////////////////////////////////
687 /// Store feedback list (may not be used in this class).
688 
690 {
691  MayNotUse("StoreFeedback");
692 }
693 
694 ////////////////////////////////////////////////////////////////////////////////
695 /// Report progress (may not be used in this class).
696 
697 void TProofPlayer::Progress(Long64_t /*total*/, Long64_t /*processed*/)
698 {
699  MayNotUse("Progress");
700 }
701 
702 ////////////////////////////////////////////////////////////////////////////////
703 /// Report progress (may not be used in this class).
704 
705 void TProofPlayer::Progress(Long64_t /*total*/, Long64_t /*processed*/,
706  Long64_t /*bytesread*/,
707  Float_t /*evtRate*/, Float_t /*mbRate*/,
708  Float_t /*evtrti*/, Float_t /*mbrti*/)
709 {
710  MayNotUse("Progress");
711 }
712 
713 ////////////////////////////////////////////////////////////////////////////////
714 /// Report progress (may not be used in this class).
715 
717 {
718  MayNotUse("Progress");
719 }
720 
721 ////////////////////////////////////////////////////////////////////////////////
722 /// Set feedback list (may not be used in this class).
723 
725 {
726  MayNotUse("Feedback");
727 }
728 
729 ////////////////////////////////////////////////////////////////////////////////
730 /// Draw feedback creation proxy. When accessed via TProof avoids
731 /// link dependency on libProofPlayer.
732 
734 {
735  return new TDrawFeedback(p);
736 }
737 
738 ////////////////////////////////////////////////////////////////////////////////
739 /// Set draw feedback option.
740 
742 {
743  if (f)
744  f->SetOption(opt);
745 }
746 
747 ////////////////////////////////////////////////////////////////////////////////
748 /// Delete draw feedback object.
749 
751 {
752  delete f;
753 }
754 
755 ////////////////////////////////////////////////////////////////////////////////
756 /// Save the partial results of this query to a dedicated file under the user
757 /// data directory. The file name has the form
758 /// <session_tag>.q<query_seq_num>.root
759 /// The file pat and the file are created if not existing already.
760 /// Only objects in the outputlist not being TProofOutputFile are saved.
761 /// The packets list 'packets' is saved if given.
762 /// Trees not attached to any file are attached to the open file.
763 /// If 'queryend' is kTRUE evrything is written out (TTrees included).
764 /// The actual saving action is controlled by 'force' and by fSavePartialResults /
765 /// fSaveResultsPerPacket:
766 ///
767 /// fSavePartialResults = kFALSE/kTRUE no-saving/saving
768 /// fSaveResultsPerPacket = kFALSE/kTRUE save-per-query/save-per-packet
769 ///
770 /// The function CheckMemUsage sets fSavePartialResults = 1 if fSaveMemThreshold > 0 and
771 /// ProcInfo_t::fMemResident >= fSaveMemThreshold: from that point on partial results
772 /// are always saved and expensive calls to TSystem::GetProcInfo saved.
773 /// The switch fSaveResultsPerPacket is instead controlled by the user or admin
774 /// who can also force saving in all cases; parameter PROOF_SavePartialResults or
775 /// RC env ProofPlayer.SavePartialResults .
776 /// However, if 'force' is kTRUE, fSavePartialResults and fSaveResultsPerPacket
777 /// are ignored.
778 /// Return -1 in case of problems, 0 otherwise.
779 
781 {
782  Bool_t save = (force || (fSavePartialResults &&
783  (queryend || fSaveResultsPerPacket))) ? kTRUE : kFALSE;
784  if (!save) {
785  PDB(kOutput, 2)
786  Info("SavePartialResults", "partial result saving disabled");
787  return 0;
788  }
789 
790  // Sanity check
791  if (!gProofServ) {
792  Error("SavePartialResults", "gProofServ undefined: something really wrong going on!!!");
793  return -1;
794  }
795  if (!fOutput) {
796  Error("SavePartialResults", "fOutput undefined: something really wrong going on!!!");
797  return -1;
798  }
799 
800  PDB(kOutput, 1)
801  Info("SavePartialResults", "start saving partial results {%d,%d,%d,%d}",
802  queryend, force, fSavePartialResults, fSaveResultsPerPacket);
803 
804  // Get list of processed packets from the iterator
805  PDB(kOutput, 2) Info("SavePartialResults", "fEvIter: %p", fEvIter);
806 
807  TList *packets = (fEvIter) ? fEvIter->GetPackets() : 0;
808  PDB(kOutput, 2) Info("SavePartialResults", "list of packets: %p, sz: %d",
809  packets, (packets ? packets->GetSize(): -1));
810 
811  // Open the file
812  const char *oopt = "UPDATE";
813  // Check if the file has already been defined
814  TString baseName(fOutputFilePath);
815  if (fOutputFilePath.IsNull()) {
816  baseName.Form("output-%s.q%d.root", gProofServ->GetTopSessionTag(), gProofServ->GetQuerySeqNum());
817  if (gProofServ->GetDataDirOpts() && strlen(gProofServ->GetDataDirOpts()) > 0) {
818  fOutputFilePath.Form("%s/%s?%s", gProofServ->GetDataDir(), baseName.Data(),
820  } else {
821  fOutputFilePath.Form("%s/%s", gProofServ->GetDataDir(), baseName.Data());
822  }
823  Info("SavePartialResults", "file with (partial) output: '%s'", fOutputFilePath.Data());
824  oopt = "RECREATE";
825  }
826  // Open the file in write mode
827  if (!(fOutputFile = TFile::Open(fOutputFilePath, oopt)) ||
828  (fOutputFile && fOutputFile->IsZombie())) {
829  Error("SavePartialResults", "cannot open '%s' for writing", fOutputFilePath.Data());
831  return -1;
832  }
833 
834  // Save current directory
835  TDirectory *curdir = gDirectory;
836  fOutputFile->cd();
837 
838  // Write first the packets list, if required
839  if (packets) {
840  TDirectory *packetsDir = fOutputFile->mkdir("packets");
841  if (packetsDir) packetsDir->cd();
843  fOutputFile->cd();
844  }
845 
846  Bool_t notempty = kFALSE;
847  // Write out the output list
848  TList torm;
849  TIter nxo(fOutput);
850  TObject *o = 0;
851  while ((o = nxo())) {
852  // Skip output file drivers
853  if (o->InheritsFrom(TProofOutputFile::Class())) continue;
854  // Skip control objets
855  if (!strncmp(o->GetName(), "PROOF_", 6)) continue;
856  // Skip data members mapping
858  // Skip missing file info
859  if (!strcmp(o->GetName(), "MissingFiles")) continue;
860  // Trees need a special treatment
861  if (o->InheritsFrom("TTree")) {
862  TTree *t = (TTree *) o;
863  TDirectory *d = t->GetDirectory();
864  // If the tree is not attached to any file ...
865  if (!d || (d && !d->InheritsFrom("TFile"))) {
866  // ... we attach it
868  }
869  if (t->GetDirectory() == fOutputFile) {
870  if (queryend) {
871  // ... we write it out
872  o->Write(0, TObject::kOverwrite);
873  // At least something in the file
874  notempty = kTRUE;
875  // Flag for removal from the outputlist
876  torm.Add(o);
877  // Prevent double-deletion attempts
878  t->SetDirectory(0);
879  } else {
880  // ... or we set in automatic flush mode
881  t->SetAutoFlush();
882  }
883  }
884  } else if (queryend || fSaveResultsPerPacket) {
885  // Save overwriting what's already there
886  o->Write(0, TObject::kOverwrite);
887  // At least something in the file
888  notempty = kTRUE;
889  // Flag for removal from the outputlist
890  if (queryend) torm.Add(o);
891  }
892  }
893 
894  // Restore previous directory
895  gDirectory = curdir;
896 
897  // Close the file if required
898  if (notempty) {
899  if (!fOutput->FindObject(baseName)) {
900  TProofOutputFile *po = 0;
901  // Get directions
902  TNamed *nm = (TNamed *) fInput->FindObject("PROOF_DefaultOutputOption");
903  TString oname = (nm) ? nm->GetTitle() : fOutputFilePath.Data();
904  if (nm && oname.BeginsWith("ds:")) {
905  oname.Replace(0, 3, "");
906  TString qtag =
908  oname.ReplaceAll("<qtag>", qtag);
909  // Create the TProofOutputFile for dataset creation
910  po = new TProofOutputFile(baseName, "DRO", oname.Data());
911  } else {
912  Bool_t hasddir = kFALSE;
913  // Create the TProofOutputFile for automatic merging
914  po = new TProofOutputFile(baseName, "M");
915  if (oname.BeginsWith("of:")) oname.Replace(0, 3, "");
916  if (gProofServ->IsTopMaster()) {
917  if (!strcmp(TUrl(oname, kTRUE).GetProtocol(), "file")) {
918  TString dsrv;
920  TProofServ::FilterLocalroot(oname, dsrv);
921  oname.Insert(0, dsrv);
922  }
923  } else {
924  if (nm) {
925  // The name has been sent by the client: resolve local place holders
926  oname.ReplaceAll("<file>", baseName);
927  } else {
928  // We did not get any indication; the final file will be in the datadir on
929  // the top master and it will be resolved there
930  oname.Form("<datadir>/%s", baseName.Data());
931  hasddir = kTRUE;
932  }
933  }
934  po->SetOutputFileName(oname.Data());
935  if (hasddir)
936  // Reset the bit, so that <datadir> has a chance to be resolved in AddOutputObject
938  po->SetName(gSystem->BaseName(oname.Data()));
939  }
940  po->AdoptFile(fOutputFile);
941  fOutput->Add(po);
942  // Flag the nature of this file
944  }
945  }
946  fOutputFile->Close();
948 
949  // If last call, cleanup the output list from objects saved to file
950  if (queryend && torm.GetSize() > 0) {
951  TIter nxrm(&torm);
952  while ((o = nxrm())) { fOutput->Remove(o); }
953  }
954  torm.SetOwner(kFALSE);
955 
956  PDB(kOutput, 1)
957  Info("SavePartialResults", "partial results saved to file");
958  // We are done
959  return 0;
960 }
961 
962 ////////////////////////////////////////////////////////////////////////////////
963 /// Make sure that a valid selector object
964 /// Return -1 in case of problems, 0 otherwise
965 
966 Int_t TProofPlayer::AssertSelector(const char *selector_file)
967 {
968  if (selector_file && strlen(selector_file)) {
970 
971  // Get selector files from cache
972  TString ocwd = gSystem->WorkingDirectory();
973  if (gProofServ) {
976  }
977 
978  fSelector = TSelector::GetSelector(selector_file);
979 
980  if (gProofServ) {
981  gSystem->ChangeDirectory(ocwd);
983  }
984 
985  if (!fSelector) {
986  Error("AssertSelector", "cannot load: %s", selector_file );
987  return -1;
988  }
989 
991  Info("AssertSelector", "Processing via filename (%s)", selector_file);
992  } else if (!fSelector) {
993  Error("AssertSelector", "no TSelector object define : cannot continue!");
994  return -1;
995  } else {
996  Info("AssertSelector", "Processing via TSelector object");
997  }
998  // Done
999  return 0;
1000 }
1001 ////////////////////////////////////////////////////////////////////////////////
1002 /// Update fProgressStatus
1003 
1005 {
1006  if (fProgressStatus) {
1011  if (gMonitoringWriter)
1014  fProcessedRun = 0;
1015  }
1016 }
1017 
1018 ////////////////////////////////////////////////////////////////////////////////
1019 /// Process specified TDSet on PROOF worker.
1020 /// The return value is -1 in case of error and TSelector::GetStatus()
1021 /// in case of success.
1022 
1023 Long64_t TProofPlayer::Process(TDSet *dset, const char *selector_file,
1024  Option_t *option, Long64_t nentries,
1025  Long64_t first)
1026 {
1027  PDB(kGlobal,1) Info("Process","Enter");
1028 
1030  fOutput = 0;
1031 
1032  TCleanup clean(this);
1033 
1034  fSelectorClass = 0;
1035  TString wmsg;
1036  TRY {
1037  if (AssertSelector(selector_file) != 0 || !fSelector) {
1038  Error("Process", "cannot assert the selector object");
1039  return -1;
1040  }
1041 
1042  fSelectorClass = fSelector->IsA();
1043  Int_t version = fSelector->Version();
1044  if (version == 0 && IsClient()) fSelector->GetOutputList()->Clear();
1045 
1047 
1048  if (gProofServ)
1050 
1051  fSelStatus = new TStatus;
1053 
1054  fSelector->SetOption(option);
1056 
1057  // If in sequential (0-PROOF) mode validate the data set to get
1058  // the number of entries
1060  if (fTotalEvents < 0 && gProofServ &&
1062  dset->Validate();
1063  dset->Reset();
1064  TDSetElement *e = 0;
1065  while ((e = dset->Next())) {
1066  fTotalEvents += e->GetNum();
1067  }
1068  }
1069 
1070  dset->Reset();
1071 
1072  // Set parameters controlling the iterator behaviour
1073  Int_t useTreeCache = 1;
1074  if (TProof::GetParameter(fInput, "PROOF_UseTreeCache", useTreeCache) == 0) {
1075  if (useTreeCache > -1 && useTreeCache < 2)
1076  gEnv->SetValue("ProofPlayer.UseTreeCache", useTreeCache);
1077  }
1078  Long64_t cacheSize = -1;
1079  if (TProof::GetParameter(fInput, "PROOF_CacheSize", cacheSize) == 0) {
1080  TString sz = TString::Format("%lld", cacheSize);
1081  gEnv->SetValue("ProofPlayer.CacheSize", sz.Data());
1082  }
1083  // Parallel unzipping
1084  Int_t useParallelUnzip = 0;
1085  if (TProof::GetParameter(fInput, "PROOF_UseParallelUnzip", useParallelUnzip) == 0) {
1086  if (useParallelUnzip > -1 && useParallelUnzip < 2)
1087  gEnv->SetValue("ProofPlayer.UseParallelUnzip", useParallelUnzip);
1088  }
1089  // OS file caching (Mac Os X only)
1090  Int_t dontCacheFiles = 0;
1091  if (TProof::GetParameter(fInput, "PROOF_DontCacheFiles", dontCacheFiles) == 0) {
1092  if (dontCacheFiles == 1)
1093  gEnv->SetValue("ProofPlayer.DontCacheFiles", 1);
1094  }
1096 
1097  // Control file object swap
1098  // <how>*10 + <force>
1099  // <how> = 0 end of run
1100  // 1 after each packet
1101  // <force> = 0 no, swap only if memory threshold is reached
1102  // 1 swap in all cases, accordingly to <how>
1103  Int_t opt = 0;
1104  if (TProof::GetParameter(fInput, "PROOF_SavePartialResults", opt) != 0) {
1105  opt = gEnv->GetValue("ProofPlayer.SavePartialResults", 0);
1106  }
1107  fSaveResultsPerPacket = (opt >= 10) ? kTRUE : kFALSE;
1108  fSavePartialResults = (opt%10 > 0) ? kTRUE : kFALSE;
1109  Info("Process", "save partial results? %d per-packet? %d", fSavePartialResults, fSaveResultsPerPacket);
1110 
1111  // Memory threshold for file object swap
1112  Float_t memfrac = gEnv->GetValue("ProofPlayer.SaveMemThreshold", -1.);
1113  if (memfrac > 0.) {
1114  // The threshold is per core
1115  SysInfo_t si;
1116  if (gSystem->GetSysInfo(&si) == 0) {
1117  fSaveMemThreshold = (Long_t) ((memfrac * si.fPhysRam * 1024.) / si.fCpus);
1118  Info("Process", "memory threshold for saving objects to file set to %ld kB",
1120  } else {
1121  Error("Process", "cannot get SysInfo_t (!)");
1122  }
1123  }
1124 
1125  if (version == 0) {
1126  PDB(kLoop,1) Info("Process","Call Begin(0)");
1127  fSelector->Begin(0);
1128  } else {
1129  if (IsClient()) {
1130  // on client (for local run)
1131  PDB(kLoop,1) Info("Process","Call Begin(0)");
1132  fSelector->Begin(0);
1133  }
1135  PDB(kLoop,1) Info("Process","Call SlaveBegin(0)");
1136  fSelector->SlaveBegin(0); // Init is called explicitly
1137  // from GetNextEvent()
1138  }
1139  }
1140 
1141  } CATCH(excode) {
1143  Error("Process","exception %d caught", excode);
1145  return -1;
1146  } ENDTRY;
1147 
1148  // Save the results, if needed, closing the file
1149  if (SavePartialResults(kFALSE) < 0)
1150  Warning("Process", "problems seetting up file-object swapping");
1151 
1152  // Create feedback lists, if required
1153  SetupFeedback();
1154 
1155  if (gMonitoringWriter)
1157 
1158  PDB(kLoop,1)
1159  Info("Process","Looping over Process()");
1160 
1161  // get the byte read counter at the beginning of processing
1164  fProcessedRun = 0;
1165  // force the first monitoring info
1166  if (gMonitoringWriter)
1168 
1169  // Start asynchronous timer to dispatch pending events
1171 
1172  // Loop over range
1173  gAbort = kFALSE;
1174  Long64_t entry;
1177 
1178  TRY {
1179 
1180  Int_t mrc = -1;
1181  // Get the frequency for checking memory consumption and logging information
1182  Long64_t memlogfreq = -1;
1183  if (((mrc = TProof::GetParameter(fInput, "PROOF_MemLogFreq", memlogfreq))) != 0) memlogfreq = -1;
1184  Long64_t singleshot = 1;
1185  Bool_t warnHWMres = kTRUE, warnHWMvir = kTRUE;
1186  TString lastMsg("(unfortunately no detailed info is available about current packet)");
1187 
1188  // Initial memory footprint
1189  if (!CheckMemUsage(singleshot, warnHWMres, warnHWMvir, wmsg)) {
1190  Error("Process", "%s", wmsg.Data());
1191  wmsg.Insert(0, TString::Format("ERROR:%s, after SlaveBegin(), ", gProofServ ? gProofServ->GetOrdinal() : "gProofServ is nullptr"));
1192  fSelStatus->Add(wmsg.Data());
1193  if (gProofServ) {
1194  gProofServ->SendAsynMessage(wmsg.Data());
1196  }
1199  } else if (!wmsg.IsNull()) {
1200  Warning("Process", "%s", wmsg.Data());
1201  }
1202 
1203  TPair *currentElem = 0;
1204  // The event loop on the worker
1205  Long64_t fst = -1, num;
1206  Long_t maxproctime = -1;
1207  Bool_t newrun = kFALSE;
1208  while ((fEvIter->GetNextPacket(fst, num) != -1) &&
1211  // This is needed by the inflate infrastructure to calculate
1212  // sleeping times
1214 
1215  // Give the possibility to the selector to access additional info in the
1216  // incoming packet
1217  if (dset->Current()) {
1218  if (!currentElem) {
1219  currentElem = new TPair(new TObjString("PROOF_CurrentElement"), dset->Current());
1220  fInput->Add(currentElem);
1221  } else {
1222  if (currentElem->Value() != dset->Current()) {
1223  currentElem->SetValue(dset->Current());
1224  } else if (dset->Current()->TestBit(TDSetElement::kNewRun)) {
1226  }
1227  }
1228  if (dset->Current()->TestBit(TDSetElement::kNewPacket)) {
1229  if (dset->TestBit(TDSet::kEmpty)) {
1230  lastMsg = "check logs for possible stacktrace - last cycle:";
1231  } else {
1232  TDSetElement *elem = dynamic_cast<TDSetElement *>(currentElem->Value());
1233  TString fn = (elem) ? elem->GetFileName() : "<undef>";
1234  lastMsg.Form("while processing dset:'%s', file:'%s'"
1235  " - check logs for possible stacktrace - last event:", dset->GetName(), fn.Data());
1236  }
1237  TProofServ::SetLastMsg(lastMsg);
1238  }
1239  // Set the max proc time, if any
1240  if (dset->Current()->GetMaxProcTime() >= 0.)
1241  maxproctime = (Long_t) (1000 * dset->Current()->GetMaxProcTime());
1242  newrun = (dset->Current()->TestBit(TDSetElement::kNewPacket)) ? kTRUE : kFALSE;
1243  }
1244 
1247  // Setup packet proc time measurement
1248  if (maxproctime > 0) {
1249  if (!fProcTimeTimer) fProcTimeTimer = new TProctimeTimer(this, maxproctime);
1250  fProcTimeTimer->Start(maxproctime, kTRUE); // One shot
1251  if (!fProcTime) fProcTime = new TStopwatch();
1252  fProcTime->Reset(); // Reset counters
1253  }
1254  Long64_t refnum = num;
1255  if (refnum < 0 && maxproctime <= 0) {
1256  wmsg.Form("neither entries nor max proc time specified:"
1257  " risk of infinite loop: processing aborted");
1258  Error("Process", "%s", wmsg.Data());
1259  if (gProofServ) {
1260  wmsg.Insert(0, TString::Format("ERROR:%s, entry:%lld, ",
1262  gProofServ->SendAsynMessage(wmsg.Data());
1263  }
1266  break;
1267  }
1268  while (refnum < 0 || num--) {
1269 
1270  // Did we use all our time?
1272  fProcTime->Stop();
1273  if (!newrun && !TestBit(TProofPlayer::kMaxProcTimeExtended) && refnum > 0) {
1274  // How much are we left with?
1275  Float_t xleft = (refnum > num) ? (Float_t) num / (Float_t) (refnum) : 1.;
1276  if (xleft < 0.2) {
1277  // Give another try, 1.5 times the remaining measured expected time
1278  Long_t mpt = (Long_t) (1500 * num / ((Double_t)(refnum - num) / fProcTime->RealTime()));
1280  fProcTimeTimer->Start(mpt, kTRUE); // One shot
1282  }
1283  }
1285  Info("Process", "max proc time reached (%ld msecs): packet processing stopped:\n%s",
1286  maxproctime, lastMsg.Data());
1287 
1288  break;
1289  }
1290  }
1291 
1292  if (!(!fSelStatus->TestBit(TStatus::kNotOk) &&
1293  fSelector->GetAbort() == TSelector::kContinue)) break;
1294 
1295  // Get the netry number, taking into account entry or event lists
1296  entry = fEvIter->GetEntryNumber(fst);
1297  fst++;
1298 
1299  // Set the last entry
1300  TProofServ::SetLastEntry(entry);
1301 
1302  if (fSelector->Version() == 0) {
1303  PDB(kLoop,3)
1304  Info("Process","Call ProcessCut(%lld)", entry);
1305  if (fSelector->ProcessCut(entry)) {
1306  PDB(kLoop,3)
1307  Info("Process","Call ProcessFill(%lld)", entry);
1308  fSelector->ProcessFill(entry);
1309  }
1310  } else {
1311  PDB(kLoop,3)
1312  Info("Process","Call Process(%lld)", entry);
1313  fSelector->Process(entry);
1316  break;
1317  } else if (fSelector->GetAbort() == TSelector::kAbortFile) {
1318  Info("Process", "packet processing aborted following the"
1319  " selector settings:\n%s", lastMsg.Data());
1322  }
1323  }
1325 
1326  // Check the memory footprint, if required
1327  if (memlogfreq > 0 && (GetEventsProcessed() + fProcessedRun)%memlogfreq == 0) {
1328  if (!CheckMemUsage(memlogfreq, warnHWMres, warnHWMvir, wmsg)) {
1329  Error("Process", "%s", wmsg.Data());
1330  if (gProofServ) {
1331  wmsg.Insert(0, TString::Format("ERROR:%s, entry:%lld, ",
1332  gProofServ->GetOrdinal(), entry));
1333  gProofServ->SendAsynMessage(wmsg.Data());
1334  }
1338  break;
1339  } else {
1340  if (!wmsg.IsNull()) {
1341  Warning("Process", "%s", wmsg.Data());
1342  if (gProofServ) {
1343  wmsg.Insert(0, TString::Format("WARNING:%s, entry:%lld, ",
1344  gProofServ->GetOrdinal(), entry));
1345  gProofServ->SendAsynMessage(wmsg.Data());
1346  }
1347  }
1348  }
1349  }
1353  }
1355  if (fSelStatus->TestBit(TStatus::kNotOk) || gROOT->IsInterrupted()) break;
1356 
1357  // Make sure that the selector abort status is reset
1359  fSelector->Abort("status reset", TSelector::kContinue);
1360  }
1361  }
1362 
1363  } CATCH(excode) {
1364  if (excode == kPEX_STOPPED) {
1365  Info("Process","received stop-process signal");
1367  } else if (excode == kPEX_ABORTED) {
1368  gAbort = kTRUE;
1369  Info("Process","received abort-process signal");
1371  } else {
1372  Error("Process","exception %d caught", excode);
1373  // Perhaps we need a dedicated status code here ...
1374  gAbort = kTRUE;
1376  }
1378  } ENDTRY;
1379 
1380  // Clean-up the envelop for the current element
1381  TPair *currentElem = 0;
1382  if ((currentElem = (TPair *) fInput->FindObject("PROOF_CurrentElement"))) {
1383  if ((currentElem = (TPair *) fInput->Remove(currentElem))) {
1384  delete currentElem->Key();
1385  delete currentElem;
1386  }
1387  }
1388 
1389  // Final memory footprint
1390  Long64_t singleshot = 1;
1391  Bool_t warnHWMres = kTRUE, warnHWMvir = kTRUE;
1392  Bool_t shrc = CheckMemUsage(singleshot, warnHWMres, warnHWMvir, wmsg);
1393  if (!wmsg.IsNull()) Warning("Process", "%s (%s)", wmsg.Data(), shrc ? "warn" : "hwm");
1394 
1395  PDB(kGlobal,2)
1396  Info("Process","%lld events processed", fProgressStatus->GetEntries());
1397 
1398  if (gMonitoringWriter) {
1402  }
1403 
1404  // Stop active timers
1406  if (fStopTimer != 0)
1408  if (fFeedbackTimer != 0)
1409  HandleTimer(0);
1410 
1411  StopFeedback();
1412 
1413  // Save the results, if needed, closing the file
1414  if (SavePartialResults(kTRUE) < 0)
1415  Warning("Process", "problems saving the results to file");
1416 
1418 
1419  // Finalize
1420 
1421  if (fExitStatus != kAborted) {
1422 
1423  TIter nxo(GetOutputList());
1424  TObject *o = 0;
1425  while ((o = nxo())) {
1426  // Special treatment for files
1427  if (o->IsA() == TProofOutputFile::Class()) {
1429  of->Print();
1431  const char *dir = of->GetDir();
1432  if (!dir || (dir && strlen(dir) <= 0)) {
1434  } else if (dir && strlen(dir) > 0) {
1435  TUrl u(dir);
1436  if (!strcmp(u.GetHost(), "localhost") || !strcmp(u.GetHost(), "127.0.0.1") ||
1437  !strcmp(u.GetHost(), "localhost.localdomain")) {
1439  of->SetDir(u.GetUrl(kTRUE));
1440  }
1441  of->Print();
1442  }
1443  }
1444  }
1445 
1447 
1449  if (fSelector->Version() == 0) {
1450  PDB(kLoop,1) Info("Process","Call Terminate()");
1451  fSelector->Terminate();
1452  } else {
1453  PDB(kLoop,1) Info("Process","Call SlaveTerminate()");
1456  PDB(kLoop,1) Info("Process","Call Terminate()");
1457  fSelector->Terminate();
1458  }
1459  }
1460  }
1461 
1462  // Add Selector status in the output list so it can be returned to the client as done
1463  // by Tree::Process (see ROOT-748). The status from the various workers will be added.
1464  fOutput->Add(new TParameter<Long64_t>("PROOF_SelectorStatus", (Long64_t) fSelector->GetStatus()));
1465 
1466  if (gProofServ && !gProofServ->IsParallel()) { // put all the canvases onto the output list
1467  TIter nxc(gROOT->GetListOfCanvases());
1468  while (TObject *c = nxc())
1469  fOutput->Add(c);
1470  }
1471  }
1472 
1473  if (gProofServ)
1474  TPerfStats::Stop();
1475 
1476  return 0;
1477 }
1478 
1479 ////////////////////////////////////////////////////////////////////////////////
1480 /// Process specified TDSet on PROOF worker with TSelector object
1481 /// The return value is -1 in case of error and TSelector::GetStatus()
1482 /// in case of success.
1483 
1485  Option_t *option, Long64_t nentries,
1486  Long64_t first)
1487 {
1488  if (!selector) {
1489  Error("Process", "selector object undefiend!");
1490  return -1;
1491  }
1492 
1494  fSelector = selector;
1496  return Process(dset, (const char *)0, option, nentries, first);
1497 }
1498 
1499 ////////////////////////////////////////////////////////////////////////////////
1500 /// Not implemented: meaningful only in the remote player. Returns kFALSE.
1501 
1503 {
1504  return kFALSE;
1505 }
1506 
1507 ////////////////////////////////////////////////////////////////////////////////
1508 /// Check the memory usage, if requested.
1509 /// Return kTRUE if OK, kFALSE if above 95% of at least one between virtual or
1510 /// resident limits are depassed.
1511 
1513  Bool_t &w80v, TString &wmsg)
1514 {
1515  Long64_t processed = GetEventsProcessed() + fProcessedRun;
1516  if (mfreq > 0 && processed%mfreq == 0) {
1517  // Record the memory information
1518  ProcInfo_t pi;
1519  if (!gSystem->GetProcInfo(&pi)){
1520  wmsg = "";
1521  if (gProofServ)
1522  Info("CheckMemUsage|Svc", "Memory %ld virtual %ld resident event %lld",
1523  pi.fMemVirtual, pi.fMemResident, processed);
1524  // Save info in TStatus
1525  fSelStatus->SetMemValues(pi.fMemVirtual, pi.fMemResident);
1526  // Apply limit on virtual memory, if any: warn if above 80%, stop if above 95% of max
1527  if (TProofServ::GetVirtMemMax() > 0) {
1528  if (pi.fMemVirtual > TProofServ::GetMemStop() * TProofServ::GetVirtMemMax()) {
1529  wmsg.Form("using more than %d%% of allowed virtual memory (%ld kB)"
1530  " - STOP processing", (Int_t) (TProofServ::GetMemStop() * 100), pi.fMemVirtual);
1531  return kFALSE;
1532  } else if (pi.fMemVirtual > TProofServ::GetMemHWM() * TProofServ::GetVirtMemMax() && w80v) {
1533  // Refine monitoring
1534  mfreq = 1;
1535  wmsg.Form("using more than %d%% of allowed virtual memory (%ld kB)",
1536  (Int_t) (TProofServ::GetMemHWM() * 100), pi.fMemVirtual);
1537  w80v = kFALSE;
1538  }
1539  }
1540  // Apply limit on resident memory, if any: warn if above 80%, stop if above 95% of max
1541  if (TProofServ::GetResMemMax() > 0) {
1542  if (pi.fMemResident > TProofServ::GetMemStop() * TProofServ::GetResMemMax()) {
1543  wmsg.Form("using more than %d%% of allowed resident memory (%ld kB)"
1544  " - STOP processing", (Int_t) (TProofServ::GetMemStop() * 100), pi.fMemResident);
1545  return kFALSE;
1546  } else if (pi.fMemResident > TProofServ::GetMemHWM() * TProofServ::GetResMemMax() && w80r) {
1547  // Refine monitoring
1548  mfreq = 1;
1549  if (wmsg.Length() > 0) {
1550  wmsg.Form("using more than %d%% of allowed both virtual and resident memory ({%ld,%ld} kB)",
1551  (Int_t) (TProofServ::GetMemHWM() * 100), pi.fMemVirtual, pi.fMemResident);
1552  } else {
1553  wmsg.Form("using more than %d%% of allowed resident memory (%ld kB)",
1554  (Int_t) (TProofServ::GetMemHWM() * 100), pi.fMemResident);
1555  }
1556  w80r = kFALSE;
1557  }
1558  }
1559  // In saving-partial-results mode flag the saving regime when reached to save expensive calls
1560  // to TSystem::GetProcInfo in SavePartialResults
1561  if (fSaveMemThreshold > 0 && pi.fMemResident >= fSaveMemThreshold) fSavePartialResults = kTRUE;
1562  }
1563  }
1564  // Done
1565  return kTRUE;
1566 }
1567 
1568 ////////////////////////////////////////////////////////////////////////////////
1569 /// Finalize query (may not be used in this class).
1570 
1572 {
1573  MayNotUse("Finalize");
1574  return -1;
1575 }
1576 
1577 ////////////////////////////////////////////////////////////////////////////////
1578 /// Finalize query (may not be used in this class).
1579 
1581 {
1582  MayNotUse("Finalize");
1583  return -1;
1584 }
1585 ////////////////////////////////////////////////////////////////////////////////
1586 /// Merge output (may not be used in this class).
1587 
1589 {
1590  MayNotUse("MergeOutput");
1591  return;
1592 }
1593 
1594 ////////////////////////////////////////////////////////////////////////////////
1595 
1597 {
1599  fOutput->Add(olsdm);
1600 }
1601 
1602 ////////////////////////////////////////////////////////////////////////////////
1603 /// Update automatic binning parameters for given object "name".
1604 
1608  Double_t& zmin, Double_t& zmax)
1609 {
1610  if ( fAutoBins == 0 ) {
1611  fAutoBins = new THashList;
1612  }
1613 
1614  TAutoBinVal *val = (TAutoBinVal*) fAutoBins->FindObject(name);
1615 
1616  if ( val == 0 ) {
1617  //look for info in higher master
1618  if (gProofServ && !gProofServ->IsTopMaster()) {
1619  TString key = name;
1621  }
1622 
1623  val = new TAutoBinVal(name,xmin,xmax,ymin,ymax,zmin,zmax);
1624  fAutoBins->Add(val);
1625  } else {
1626  val->GetAll(xmin,xmax,ymin,ymax,zmin,zmax);
1627  }
1628 }
1629 
1630 ////////////////////////////////////////////////////////////////////////////////
1631 /// Get next packet (may not be used in this class).
1632 
1634 {
1635  MayNotUse("GetNextPacket");
1636  return 0;
1637 }
1638 
1639 ////////////////////////////////////////////////////////////////////////////////
1640 /// Set up feedback (may not be used in this class).
1641 
1643 {
1644  MayNotUse("SetupFeedback");
1645 }
1646 
1647 ////////////////////////////////////////////////////////////////////////////////
1648 /// Stop feedback (may not be used in this class).
1649 
1651 {
1652  MayNotUse("StopFeedback");
1653 }
1654 
1655 ////////////////////////////////////////////////////////////////////////////////
1656 /// Draw (may not be used in this class).
1657 
1658 Long64_t TProofPlayer::DrawSelect(TDSet * /*set*/, const char * /*varexp*/,
1659  const char * /*selection*/, Option_t * /*option*/,
1660  Long64_t /*nentries*/, Long64_t /*firstentry*/)
1661 {
1662  MayNotUse("DrawSelect");
1663  return -1;
1664 }
1665 
1666 ////////////////////////////////////////////////////////////////////////////////
1667 /// Handle tree header request.
1668 
1670 {
1671  MayNotUse("HandleGetTreeHeader|");
1672 }
1673 
1674 ////////////////////////////////////////////////////////////////////////////////
1675 /// Receive histo from slave.
1676 
1678 {
1679  TObject *obj = mess->ReadObject(mess->GetClass());
1680  if (obj->InheritsFrom(TH1::Class())) {
1681  TH1 *h = (TH1*)obj;
1682  h->SetDirectory(0);
1683  TH1 *horg = (TH1*)gDirectory->GetList()->FindObject(h->GetName());
1684  if (horg)
1685  horg->Add(h);
1686  else
1687  h->SetDirectory(gDirectory);
1688  }
1689 }
1690 
1691 ////////////////////////////////////////////////////////////////////////////////
1692 /// Draw the object if it is a canvas.
1693 /// Return 0 in case of success, 1 if it is not a canvas or libProofDraw
1694 /// is not available.
1695 
1697 {
1698  static Int_t (*gDrawCanvasHook)(TObject *) = 0;
1699 
1700  // Load the library the first time
1701  if (!gDrawCanvasHook) {
1702  // Load library needed for graphics ...
1703  TString drawlib = "libProofDraw";
1704  char *p = 0;
1705  if ((p = gSystem->DynamicPathName(drawlib, kTRUE))) {
1706  delete[] p;
1707  if (gSystem->Load(drawlib) != -1) {
1708  // Locate DrawCanvas
1709  Func_t f = 0;
1710  if ((f = gSystem->DynFindSymbol(drawlib,"DrawCanvas")))
1711  gDrawCanvasHook = (Int_t (*)(TObject *))(f);
1712  else
1713  Warning("DrawCanvas", "can't find DrawCanvas");
1714  } else
1715  Warning("DrawCanvas", "can't load %s", drawlib.Data());
1716  } else
1717  Warning("DrawCanvas", "can't locate %s", drawlib.Data());
1718  }
1719  if (gDrawCanvasHook && obj)
1720  return (*gDrawCanvasHook)(obj);
1721  // No drawing hook or object undefined
1722  return 1;
1723 }
1724 
1725 ////////////////////////////////////////////////////////////////////////////////
1726 /// Parse the arguments from var, sel and opt and fill the selector and
1727 /// object name accordingly.
1728 /// Return 0 in case of success, 1 if libProofDraw is not available.
1729 
1730 Int_t TProofPlayer::GetDrawArgs(const char *var, const char *sel, Option_t *opt,
1731  TString &selector, TString &objname)
1732 {
1733  static Int_t (*gGetDrawArgsHook)(const char *, const char *, Option_t *,
1734  TString &, TString &) = 0;
1735 
1736  // Load the library the first time
1737  if (!gGetDrawArgsHook) {
1738  // Load library needed for graphics ...
1739  TString drawlib = "libProofDraw";
1740  char *p = 0;
1741  if ((p = gSystem->DynamicPathName(drawlib, kTRUE))) {
1742  delete[] p;
1743  if (gSystem->Load(drawlib) != -1) {
1744  // Locate GetDrawArgs
1745  Func_t f = 0;
1746  if ((f = gSystem->DynFindSymbol(drawlib,"GetDrawArgs")))
1747  gGetDrawArgsHook = (Int_t (*)(const char *, const char *, Option_t *,
1748  TString &, TString &))(f);
1749  else
1750  Warning("GetDrawArgs", "can't find GetDrawArgs");
1751  } else
1752  Warning("GetDrawArgs", "can't load %s", drawlib.Data());
1753  } else
1754  Warning("GetDrawArgs", "can't locate %s", drawlib.Data());
1755  }
1756  if (gGetDrawArgsHook)
1757  return (*gGetDrawArgsHook)(var, sel, opt, selector, objname);
1758  // No parser hook or object undefined
1759  return 1;
1760 }
1761 
1762 ////////////////////////////////////////////////////////////////////////////////
1763 /// Create/destroy a named canvas for feedback
1764 
1765 void TProofPlayer::FeedBackCanvas(const char *name, Bool_t create)
1766 {
1767  static void (*gFeedBackCanvasHook)(const char *, Bool_t) = 0;
1768 
1769  // Load the library the first time
1770  if (!gFeedBackCanvasHook) {
1771  // Load library needed for graphics ...
1772  TString drawlib = "libProofDraw";
1773  char *p = 0;
1774  if ((p = gSystem->DynamicPathName(drawlib, kTRUE))) {
1775  delete[] p;
1776  if (gSystem->Load(drawlib) != -1) {
1777  // Locate FeedBackCanvas
1778  Func_t f = 0;
1779  if ((f = gSystem->DynFindSymbol(drawlib,"FeedBackCanvas")))
1780  gFeedBackCanvasHook = (void (*)(const char *, Bool_t))(f);
1781  else
1782  Warning("FeedBackCanvas", "can't find FeedBackCanvas");
1783  } else
1784  Warning("FeedBackCanvas", "can't load %s", drawlib.Data());
1785  } else
1786  Warning("FeedBackCanvas", "can't locate %s", drawlib.Data());
1787  }
1788  if (gFeedBackCanvasHook) (*gFeedBackCanvasHook)(name, create);
1789  // No parser hook or object undefined
1790  return;
1791 }
1792 
1793 ////////////////////////////////////////////////////////////////////////////////
1794 /// Return the size in bytes of the cache
1795 
1797 {
1798  if (fEvIter) return fEvIter->GetCacheSize();
1799  return -1;
1800 }
1801 
1802 ////////////////////////////////////////////////////////////////////////////////
1803 /// Return the number of entries in the learning phase
1804 
1806 {
1807  if (fEvIter) return fEvIter->GetLearnEntries();
1808  return -1;
1809 }
1810 
1811 ////////////////////////////////////////////////////////////////////////////////
1812 /// Switch on/off merge timer
1813 
1815 {
1816  if (on) {
1817  if (!fMergeSTW) fMergeSTW = new TStopwatch();
1818  PDB(kGlobal,1)
1819  Info("SetMerging", "ON: mergers: %d", fProof->fMergersCount);
1820  if (fNumMergers <= 0 && fProof->fMergersCount > 0)
1822  } else if (fMergeSTW) {
1823  fMergeSTW->Stop();
1824  Float_t rt = fMergeSTW->RealTime();
1825  PDB(kGlobal,1)
1826  Info("SetMerging", "OFF: rt: %f, mergers: %d", rt, fNumMergers);
1827  if (fQuery) {
1828  if (!fProof->TestBit(TProof::kIsClient) || fProof->IsLite()) {
1829  // On the master (or in Lite()) we set the merging time and the numebr of mergers
1830  fQuery->SetMergeTime(rt);
1832  } else {
1833  // In a standard client we save the transfer-to-client time
1834  fQuery->SetRecvTime(rt);
1835  }
1836  PDB(kGlobal,2) fQuery->Print("F");
1837  }
1838  }
1839 }
1840 
1841 //------------------------------------------------------------------------------
1842 
1844 
1845 ////////////////////////////////////////////////////////////////////////////////
1846 /// Process the specified TSelector object 'nentries' times.
1847 /// Used to test the PROOF interator mechanism for cycle-driven selectors in a
1848 /// local session.
1849 /// The return value is -1 in case of error and TSelector::GetStatus()
1850 /// in case of success.
1851 
1853  Long64_t nentries, Option_t *option)
1854 {
1855  if (!selector) {
1856  Error("Process", "selector object undefiend!");
1857  return -1;
1858  }
1859 
1860  TDSetProxy *set = new TDSetProxy("", "", "");
1861  set->SetBit(TDSet::kEmpty);
1862  set->SetBit(TDSet::kIsLocal);
1863  Long64_t rc = Process(set, selector, option, nentries);
1864  SafeDelete(set);
1865 
1866  // Done
1867  return rc;
1868 }
1869 
1870 ////////////////////////////////////////////////////////////////////////////////
1871 /// Process the specified TSelector file 'nentries' times.
1872 /// Used to test the PROOF interator mechanism for cycle-driven selectors in a
1873 /// local session.
1874 /// Process specified TDSet on PROOF worker with TSelector object
1875 /// The return value is -1 in case of error and TSelector::GetStatus()
1876 /// in case of success.
1877 
1879  Long64_t nentries, Option_t *option)
1880 {
1881  TDSetProxy *set = new TDSetProxy("", "", "");
1882  set->SetBit(TDSet::kEmpty);
1883  set->SetBit(TDSet::kIsLocal);
1884  Long64_t rc = Process(set, selector, option, nentries);
1885  SafeDelete(set);
1886 
1887  // Done
1888  return rc;
1889 }
1890 
1891 
1892 //------------------------------------------------------------------------------
1893 
1895 
1896 ////////////////////////////////////////////////////////////////////////////////
1897 /// Destructor.
1898 
1900 {
1901  SafeDelete(fOutput); // owns the output list
1903 
1904  // Objects stored in maps are already deleted when merging the feedback
1907 
1909 }
1910 
1911 ////////////////////////////////////////////////////////////////////////////////
1912 /// Init the packetizer
1913 /// Return 0 on success (fPacketizer is correctly initialized), -1 on failure.
1914 
1916  Long64_t first, const char *defpackunit,
1917  const char *defpackdata)
1918 {
1920  PDB(kGlobal,1) Info("Process","Enter");
1921  fDSet = dset;
1923 
1924  // This is done here to pickup on the fly changes
1925  Int_t honebyone = 1;
1926  if (TProof::GetParameter(fInput, "PROOF_MergeTH1OneByOne", honebyone) != 0)
1927  honebyone = gEnv->GetValue("ProofPlayer.MergeTH1OneByOne", 1);
1928  fMergeTH1OneByOne = (honebyone == 1) ? kTRUE : kFALSE;
1929 
1930  Bool_t noData = dset->TestBit(TDSet::kEmpty) ? kTRUE : kFALSE;
1931 
1932  TString packetizer;
1933  TList *listOfMissingFiles = 0;
1934 
1935  TMethodCall callEnv;
1936  TClass *cl;
1937  noData = dset->TestBit(TDSet::kEmpty) ? kTRUE : kFALSE;
1938 
1939  if (noData) {
1940 
1941  if (TProof::GetParameter(fInput, "PROOF_Packetizer", packetizer) != 0)
1942  packetizer = defpackunit;
1943  else
1944  Info("InitPacketizer", "using alternate packetizer: %s", packetizer.Data());
1945 
1946  // Get linked to the related class
1947  cl = TClass::GetClass(packetizer);
1948  if (cl == 0) {
1949  Error("InitPacketizer", "class '%s' not found", packetizer.Data());
1951  return -1;
1952  }
1953 
1954  // Init the constructor
1955  callEnv.InitWithPrototype(cl, cl->GetName(),"TList*,Long64_t,TList*,TProofProgressStatus*");
1956  if (!callEnv.IsValid()) {
1957  Error("InitPacketizer",
1958  "cannot find correct constructor for '%s'", cl->GetName());
1960  return -1;
1961  }
1962  callEnv.ResetParam();
1964  callEnv.SetParam((Long64_t) nentries);
1965  callEnv.SetParam((Long_t) fInput);
1966  callEnv.SetParam((Long_t) fProgressStatus);
1967 
1968  } else if (dset->TestBit(TDSet::kMultiDSet)) {
1969 
1970  // We have to process many datasets in one go, keeping them separate
1971  if (fProof->GetRunStatus() != TProof::kRunning) {
1972  // We have been asked to stop
1973  Error("InitPacketizer", "received stop/abort request");
1975  return -1;
1976  }
1977 
1978  // The multi packetizer
1979  packetizer = "TPacketizerMulti";
1980 
1981  // Get linked to the related class
1982  cl = TClass::GetClass(packetizer);
1983  if (cl == 0) {
1984  Error("InitPacketizer", "class '%s' not found", packetizer.Data());
1986  return -1;
1987  }
1988 
1989  // Init the constructor
1990  callEnv.InitWithPrototype(cl, cl->GetName(),"TDSet*,TList*,Long64_t,Long64_t,TList*,TProofProgressStatus*");
1991  if (!callEnv.IsValid()) {
1992  Error("InitPacketizer", "cannot find correct constructor for '%s'", cl->GetName());
1994  return -1;
1995  }
1996  callEnv.ResetParam();
1997  callEnv.SetParam((Long_t) dset);
1999  callEnv.SetParam((Long64_t) first);
2000  callEnv.SetParam((Long64_t) nentries);
2001  callEnv.SetParam((Long_t) fInput);
2002  callEnv.SetParam((Long_t) fProgressStatus);
2003 
2004  // We are going to test validity during the packetizer initialization
2005  dset->SetBit(TDSet::kValidityChecked);
2006  dset->ResetBit(TDSet::kSomeInvalid);
2007 
2008  } else {
2009 
2010  // Lookup - resolve the end-point urls to optmize the distribution.
2011  // The lookup was previously called in the packetizer's constructor.
2012  // A list for the missing files may already have been added to the
2013  // output list; otherwise, if needed it will be created inside
2014  if ((listOfMissingFiles = (TList *)fInput->FindObject("MissingFiles"))) {
2015  // Move it to the output list
2016  fInput->Remove(listOfMissingFiles);
2017  } else {
2018  listOfMissingFiles = new TList;
2019  }
2020  // Do the lookup; we only skip it if explicitly requested so.
2021  TString lkopt;
2022  if (TProof::GetParameter(fInput, "PROOF_LookupOpt", lkopt) != 0 || lkopt != "none")
2023  dset->Lookup(kTRUE, &listOfMissingFiles);
2024 
2025  if (fProof->GetRunStatus() != TProof::kRunning) {
2026  // We have been asked to stop
2027  Error("InitPacketizer", "received stop/abort request");
2029  return -1;
2030  }
2031 
2032  if (!(dset->GetListOfElements()) ||
2033  !(dset->GetListOfElements()->GetSize())) {
2034  if (gProofServ)
2035  gProofServ->SendAsynMessage("InitPacketizer: No files from the data set were found - Aborting");
2036  Error("InitPacketizer", "No files from the data set were found - Aborting");
2038  if (listOfMissingFiles) {
2039  listOfMissingFiles->SetOwner();
2040  fOutput->Remove(listOfMissingFiles);
2041  SafeDelete(listOfMissingFiles);
2042  }
2043  return -1;
2044  }
2045 
2046  if (TProof::GetParameter(fInput, "PROOF_Packetizer", packetizer) != 0)
2047  // Using standard packetizer TAdaptivePacketizer
2048  packetizer = defpackdata;
2049  else
2050  Info("InitPacketizer", "using alternate packetizer: %s", packetizer.Data());
2051 
2052  // Get linked to the related class
2053  cl = TClass::GetClass(packetizer);
2054  if (cl == 0) {
2055  Error("InitPacketizer", "class '%s' not found", packetizer.Data());
2057  return -1;
2058  }
2059 
2060  // Init the constructor
2061  callEnv.InitWithPrototype(cl, cl->GetName(),"TDSet*,TList*,Long64_t,Long64_t,TList*,TProofProgressStatus*");
2062  if (!callEnv.IsValid()) {
2063  Error("InitPacketizer", "cannot find correct constructor for '%s'", cl->GetName());
2065  return -1;
2066  }
2067  callEnv.ResetParam();
2068  callEnv.SetParam((Long_t) dset);
2070  callEnv.SetParam((Long64_t) first);
2071  callEnv.SetParam((Long64_t) nentries);
2072  callEnv.SetParam((Long_t) fInput);
2073  callEnv.SetParam((Long_t) fProgressStatus);
2074 
2075  // We are going to test validity during the packetizer initialization
2076  dset->SetBit(TDSet::kValidityChecked);
2077  dset->ResetBit(TDSet::kSomeInvalid);
2078  }
2079 
2080  // Get an instance of the packetizer
2081  Long_t ret = 0;
2082  callEnv.Execute(ret);
2083  if ((fPacketizer = (TVirtualPacketizer *)ret) == 0) {
2084  Error("InitPacketizer", "cannot construct '%s'", cl->GetName());
2086  return -1;
2087  }
2088 
2089  if (!fPacketizer->IsValid()) {
2090  Error("InitPacketizer",
2091  "instantiated packetizer object '%s' is invalid", cl->GetName());
2094  return -1;
2095  }
2096 
2097  // In multi mode retrieve the list of missing files
2098  if (!noData && dset->TestBit(TDSet::kMultiDSet)) {
2099  if ((listOfMissingFiles = (TList *) fInput->FindObject("MissingFiles"))) {
2100  // Remove it; it will be added to the output list
2101  fInput->Remove(listOfMissingFiles);
2102  }
2103  }
2104 
2105  if (!noData) {
2106  // Add invalid elements to the list of missing elements
2107  TDSetElement *elem = 0;
2108  if (dset->TestBit(TDSet::kSomeInvalid)) {
2109  TIter nxe(dset->GetListOfElements());
2110  while ((elem = (TDSetElement *)nxe())) {
2111  if (!elem->GetValid()) {
2112  if (!listOfMissingFiles)
2113  listOfMissingFiles = new TList;
2114  listOfMissingFiles->Add(elem->GetFileInfo(dset->GetType()));
2115  dset->Remove(elem, kFALSE);
2116  }
2117  }
2118  // The invalid elements have been removed
2120  }
2121 
2122  // Record the list of missing or invalid elements in the output list
2123  if (listOfMissingFiles && listOfMissingFiles->GetSize() > 0) {
2124  TIter missingFiles(listOfMissingFiles);
2125  TString msg;
2126  if (gDebug > 0) {
2127  TFileInfo *fi = 0;
2128  while ((fi = (TFileInfo *) missingFiles.Next())) {
2129  if (fi->GetCurrentUrl()) {
2130  msg = Form("File not found: %s - skipping!",
2131  fi->GetCurrentUrl()->GetUrl());
2132  } else {
2133  msg = Form("File not found: %s - skipping!", fi->GetName());
2134  }
2136  }
2137  }
2138  // Make sure it will be sent back
2139  if (!GetOutput("MissingFiles")) {
2140  listOfMissingFiles->SetName("MissingFiles");
2141  AddOutputObject(listOfMissingFiles);
2142  }
2143  TStatus *tmpStatus = (TStatus *)GetOutput("PROOF_Status");
2144  if (!tmpStatus) AddOutputObject((tmpStatus = new TStatus()));
2145 
2146  // Estimate how much data are missing
2147  Int_t ngood = dset->GetListOfElements()->GetSize();
2148  Int_t nbad = listOfMissingFiles->GetSize();
2149  Double_t xb = Double_t(nbad) / Double_t(ngood + nbad);
2150  msg = Form(" About %.2f %c of the requested files (%d out of %d) were missing or unusable; details in"
2151  " the 'missingFiles' list", xb * 100., '%', nbad, nbad + ngood);
2152  tmpStatus->Add(msg.Data());
2153  msg = Form(" +++\n"
2154  " +++ About %.2f %c of the requested files (%d out of %d) are missing or unusable; details in"
2155  " the 'MissingFiles' list\n"
2156  " +++", xb * 100., '%', nbad, nbad + ngood);
2158  } else {
2159  // Cleanup
2160  SafeDelete(listOfMissingFiles);
2161  }
2162  }
2163 
2164  // Done
2165  return 0;
2166 }
2167 
2168 ////////////////////////////////////////////////////////////////////////////////
2169 /// Process specified TDSet on PROOF.
2170 /// This method is called on client and on the PROOF master.
2171 /// The return value is -1 in case of an error and TSelector::GetStatus() in
2172 /// in case of success.
2173 
2174 Long64_t TProofPlayerRemote::Process(TDSet *dset, const char *selector_file,
2175  Option_t *option, Long64_t nentries,
2176  Long64_t first)
2177 {
2178  PDB(kGlobal,1) Info("Process", "Enter");
2179 
2180  fDSet = dset;
2182 
2183  if (!fProgressStatus) {
2184  Error("Process", "No progress status");
2185  return -1;
2186  }
2188 
2189  // delete fOutput;
2190  if (!fOutput)
2191  fOutput = new THashList;
2192  else
2193  fOutput->Clear();
2194 
2196 
2197  if (fProof->IsMaster()){
2199  } else {
2201  }
2202 
2203  TStopwatch elapsed;
2204 
2205  // Define filename
2206  TString fn;
2207  fSelectorFileName = selector_file;
2208 
2209  if (fCreateSelObj) {
2210  if(!SendSelector(selector_file)) return -1;
2211  fn = gSystem->BaseName(selector_file);
2212  } else {
2213  fn = selector_file;
2214  }
2215 
2216  TMessage mesg(kPROOF_PROCESS);
2217 
2218  // Parse option
2219  Bool_t sync = (fProof->GetQueryMode(option) == TProof::kSync);
2220 
2221  TList *inputtmp = 0; // List of temporary input objects
2222  TDSet *set = dset;
2223  if (fProof->IsMaster()) {
2224 
2225  PDB(kPacketizer,1) Info("Process","Create Proxy TDSet");
2226  set = new TDSetProxy( dset->GetType(), dset->GetObjName(),
2227  dset->GetDirectory() );
2228  if (dset->TestBit(TDSet::kEmpty))
2229  set->SetBit(TDSet::kEmpty);
2230 
2231  if (InitPacketizer(dset, nentries, first, "TPacketizerUnit", "TPacketizer") != 0) {
2232  Error("Process", "cannot init the packetizer");
2234  return -1;
2235  }
2236 
2237  // Reset start, this is now managed by the packetizer
2238  first = 0;
2239 
2240  // Negative memlogfreq disable checks.
2241  // If 0 is passed we try to have 100 messages about memory
2242  // Otherwise we use the frequency passed.
2243  Int_t mrc = -1;
2244  Long64_t memlogfreq = -1, mlf;
2245  if (gSystem->Getenv("PROOF_MEMLOGFREQ")) {
2246  TString clf(gSystem->Getenv("PROOF_MEMLOGFREQ"));
2247  if (clf.IsDigit()) { memlogfreq = clf.Atoi(); mrc = 0; }
2248  }
2249  if ((mrc = TProof::GetParameter(fProof->GetInputList(), "PROOF_MemLogFreq", mlf)) == 0) memlogfreq = mlf;
2250  if (memlogfreq == 0) {
2251  memlogfreq = fPacketizer->GetTotalEntries()/(fProof->GetParallel()*100);
2252  if (memlogfreq <= 0) memlogfreq = 1;
2253  }
2254  if (mrc == 0) fProof->SetParameter("PROOF_MemLogFreq", memlogfreq);
2255 
2256 
2257  // Send input data, if any
2258  TString emsg;
2259  if (TProof::SendInputData(fQuery, fProof, emsg) != 0)
2260  Warning("Process", "could not forward input data: %s", emsg.Data());
2261 
2262  // Attach to the transient histogram with the assigned packets, if required
2263  if (fInput->FindObject("PROOF_StatsHist") != 0) {
2264  if (!(fProcPackets = (TH1I *) fOutput->FindObject("PROOF_ProcPcktHist"))) {
2265  Warning("Process", "could not attach to histogram 'PROOF_ProcPcktHist'");
2266  } else {
2267  PDB(kLoop,1)
2268  Info("Process", "attached to histogram 'PROOF_ProcPcktHist' to record"
2269  " packets being processed");
2270  }
2271  }
2272 
2273  } else {
2274 
2275  // Check whether we have to enforce the use of submergers
2276  if (gEnv->Lookup("Proof.UseMergers") && !fInput->FindObject("PROOF_UseMergers")) {
2277  Int_t smg = gEnv->GetValue("Proof.UseMergers",-1);
2278  if (smg >= 0) {
2279  fInput->Add(new TParameter<Int_t>("PROOF_UseMergers", smg));
2280  if (gEnv->Lookup("Proof.MergersByHost")) {
2281  Int_t mbh = gEnv->GetValue("Proof.MergersByHost",0);
2282  if (mbh != 0) {
2283  // Administrator settings have the priority
2284  TObject *o = 0;
2285  if ((o = fInput->FindObject("PROOF_MergersByHost"))) { fInput->Remove(o); delete o; }
2286  fInput->Add(new TParameter<Int_t>("PROOF_MergersByHost", mbh));
2287  }
2288  }
2289  }
2290  }
2291 
2292  // For a new query clients should make sure that the temporary
2293  // output list is empty
2294  if (fOutputLists) {
2295  fOutputLists->Delete();
2296  delete fOutputLists;
2297  fOutputLists = 0;
2298  }
2299 
2300  if (!sync) {
2302  Printf(" ");
2303  Info("Process","starting new query");
2304  }
2305 
2306  // Define fSelector in Client if processing with filename
2307  if (fCreateSelObj) {
2309  if (!(fSelector = TSelector::GetSelector(selector_file))) {
2310  if (!sync)
2311  gSystem->RedirectOutput(0);
2312  return -1;
2313  }
2314  }
2315 
2316  fSelectorClass = 0;
2317  fSelectorClass = fSelector->IsA();
2318 
2319  // Add fSelector to inputlist if processing with object
2320  if (!fCreateSelObj) {
2321  // In any input list was set into the selector move it to the PROOF
2322  // input list, because we do not want to stream the selector one
2323  if (fSelector->GetInputList() && fSelector->GetInputList()->GetSize() > 0) {
2324  TIter nxi(fSelector->GetInputList());
2325  TObject *o = 0;
2326  while ((o = nxi())) {
2327  if (!fInput->FindObject(o)) {
2328  fInput->Add(o);
2329  if (!inputtmp) {
2330  inputtmp = new TList;
2331  inputtmp->SetOwner(kFALSE);
2332  }
2333  inputtmp->Add(o);
2334  }
2335  }
2336  }
2337  fInput->Add(fSelector);
2338  }
2339  // Set the input list for initialization
2341  fSelector->SetOption(option);
2343 
2344  PDB(kLoop,1) Info("Process","Call Begin(0)");
2345  fSelector->Begin(0);
2346 
2347  // Reset the input list to avoid double streaming and related problems (saving
2348  // the TQueryResult)
2350 
2351  // Send large input data objects, if any
2353 
2354  if (!sync)
2355  gSystem->RedirectOutput(0);
2356  }
2357 
2358  TCleanup clean(this);
2359  SetupFeedback();
2360 
2361  TString opt = option;
2362 
2363  // Old servers need a dedicated streamer
2364  if (fProof->fProtocol < 13)
2365  dset->SetWriteV3(kTRUE);
2366 
2367  // Workers will get the entry ranges from the packetizer
2369  Long64_t fst = (gProofServ && gProofServ->IsMaster() && gProofServ->IsParallel()) ? -1 : first;
2370 
2371  // Entry- or Event- list ?
2372  TEntryList *enl = (!fProof->IsMaster()) ? dynamic_cast<TEntryList *>(set->GetEntryList())
2373  : (TEntryList *)0;
2374  TEventList *evl = (!fProof->IsMaster() && !enl) ? dynamic_cast<TEventList *>(set->GetEntryList())
2375  : (TEventList *)0;
2376  if (fProof->fProtocol > 14) {
2377  if (fProcessMessage) delete fProcessMessage;
2379  mesg << set << fn << fInput << opt << num << fst << evl << sync << enl;
2380  (*fProcessMessage) << set << fn << fInput << opt << num << fst << evl << sync << enl;
2381  } else {
2382  mesg << set << fn << fInput << opt << num << fst << evl << sync;
2383  if (enl)
2384  // Not supported remotely
2385  Warning("Process","entry lists not supported by the server");
2386  }
2387 
2388  // Reset the merging progress information
2389  fProof->ResetMergePrg();
2390 
2391  Int_t nb = fProof->Broadcast(mesg);
2392  PDB(kGlobal,1) Info("Process", "Broadcast called: %d workers notified", nb);
2393  if (fProof->IsLite()) fProof->fNotIdle += nb;
2394 
2395  // Reset streamer choice
2396  if (fProof->fProtocol < 13)
2397  dset->SetWriteV3(kFALSE);
2398 
2399  // Redirect logs from master to special log frame
2400  if (IsClient())
2401  fProof->fRedirLog = kTRUE;
2402 
2403  if (!IsClient()){
2404  // Signal the start of finalize for the memory log grepping
2405  Info("Process|Svc", "Start merging Memory information");
2406  }
2407 
2408  if (!sync) {
2409  if (IsClient()) {
2410  // Asynchronous query: just make sure that asynchronous input
2411  // is enabled and return the prompt
2412  PDB(kGlobal,1) Info("Process","Asynchronous processing:"
2413  " activating CollectInputFrom");
2414  fProof->Activate();
2415 
2416  // Receive the acknowledgement and query sequential number
2417  fProof->Collect();
2418 
2419  return fProof->fSeqNum;
2420 
2421  } else {
2422  PDB(kGlobal,1) Info("Process","Calling Collect");
2423  fProof->Collect();
2424 
2425  HandleTimer(0); // force an update of final result
2426  // This forces a last call to TPacketizer::HandleTimer via the second argument
2427  // (the first is ignored). This is needed when some events were skipped so that
2428  // the total number of entries is not the one requested. The packetizer has no
2429  // way in such a case to understand that processing is finished: it must be told.
2430  if (fPacketizer) {
2432  // The progress timer will now stop itself at the next call
2434  // Store process info
2435  elapsed.Stop();
2436  if (fQuery)
2439  elapsed.RealTime());
2440  }
2441  StopFeedback();
2442 
2443  return Finalize(kFALSE,sync);
2444  }
2445  } else {
2446 
2447  PDB(kGlobal,1) Info("Process","Synchronous processing: calling Collect");
2448  fProof->Collect();
2449  if (!(fProof->IsSync())) {
2450  // The server required to switch to asynchronous mode
2451  Info("Process", "switching to the asynchronous mode ...");
2452  return fProof->fSeqNum;
2453  }
2454 
2455  // Restore prompt logging, for clients (Collect leaves things as they were
2456  // at the time it was called)
2457  if (IsClient())
2458  fProof->fRedirLog = kFALSE;
2459 
2460  if (!IsClient()) {
2461  // Force an update of final result
2462  HandleTimer(0);
2463  // This forces a last call to TPacketizer::HandleTimer via the second argument
2464  // (the first is ignored). This is needed when some events were skipped so that
2465  // the total number of entries is not the one requested. The packetizer has no
2466  // way in such a case to understand that processing is finished: it must be told.
2467  if (fPacketizer) {
2469  // The progress timer will now stop itself at the next call
2471  // Store process info
2472  if (fQuery)
2476  }
2477  } else {
2478  // Set the input list: maybe required at termination
2480  }
2481  StopFeedback();
2482 
2483  Long64_t rc = -1;
2485  rc = Finalize(kFALSE,sync);
2486 
2487  // Remove temporary input objects, if any
2488  if (inputtmp) {
2489  TIter nxi(inputtmp);
2490  TObject *o = 0;
2491  while ((o = nxi())) fInput->Remove(o);
2492  SafeDelete(inputtmp);
2493  }
2494 
2495  // Done
2496  return rc;
2497  }
2498 }
2499 
2500 ////////////////////////////////////////////////////////////////////////////////
2501 /// Process specified TDSet on PROOF.
2502 /// This method is called on client and on the PROOF master.
2503 /// The return value is -1 in case of an error and TSelector::GetStatus() in
2504 /// in case of success.
2505 
2507  Option_t *option, Long64_t nentries,
2508  Long64_t first)
2509 {
2510  if (!selector) {
2511  Error("Process", "selector object undefined");
2512  return -1;
2513  }
2514 
2515  // Define fSelector in Client
2516  if (IsClient() && (selector != fSelector)) {
2518  fSelector = selector;
2519  }
2520 
2522  Long64_t rc = Process(dset, selector->ClassName(), option, nentries, first);
2523  fCreateSelObj = kTRUE;
2524 
2525  // Done
2526  return rc;
2527 }
2528 
2529 ////////////////////////////////////////////////////////////////////////////////
2530 /// Prepares the given list of new workers to join a progressing process.
2531 /// Returns kTRUE on success, kFALSE otherwise.
2532 
2534 {
2535  if (!fProcessMessage || !fProof || !fPacketizer) {
2536  Error("Process", "Should not happen: fProcessMessage=%p fProof=%p fPacketizer=%p",
2538  return kFALSE;
2539  }
2540 
2541  if (!workers || !fProof->IsMaster()) {
2542  Error("Process", "Invalid call");
2543  return kFALSE;
2544  }
2545 
2546  PDB(kGlobal, 1)
2547  Info("Process", "Preparing %d new worker(s) to process", workers->GetEntries());
2548 
2549  // Sends the file associated to the TSelector, if necessary
2550  if (fCreateSelObj) {
2551  PDB(kGlobal, 2)
2552  Info("Process", "Sending selector file %s", fSelectorFileName.Data());
2554  Error("Process", "Problems in sending selector file %s", fSelectorFileName.Data());
2555  return kFALSE;
2556  }
2557  }
2558 
2559  if (fProof->IsLite()) fProof->fNotIdle += workers->GetSize();
2560 
2561  PDB(kGlobal, 2)
2562  Info("Process", "Adding new workers to the packetizer");
2563  if (fPacketizer->AddWorkers(workers) == -1) {
2564  Error("Process", "Cannot add new workers to the packetizer!");
2565  return kFALSE; // TODO: make new wrks inactive
2566  }
2567 
2568  PDB(kGlobal, 2)
2569  Info("Process", "Broadcasting process message to new workers");
2570  fProof->Broadcast(*fProcessMessage, workers);
2571 
2572  // Don't call Collect(): we came here from a global Collect() already which
2573  // will take care of new workers as well
2574 
2575  return kTRUE;
2576 
2577 }
2578 
2579 ////////////////////////////////////////////////////////////////////////////////
2580 /// Merge output in files
2581 
2583 {
2584  PDB(kOutput,1) Info("MergeOutputFiles", "enter: fOutput size: %d", fOutput->GetSize());
2585  PDB(kOutput,2) fOutput->ls();
2586 
2587  TList *rmList = 0;
2588  if (fMergeFiles) {
2589  TIter nxo(fOutput);
2590  TObject *o = 0;
2591  TProofOutputFile *pf = 0;
2592  while ((o = nxo())) {
2593  if ((pf = dynamic_cast<TProofOutputFile*>(o))) {
2594 
2595  PDB(kOutput,2) pf->Print();
2596 
2597  if (pf->IsMerge()) {
2598 
2599  // Point to the merger
2600  Bool_t localMerge = (pf->GetTypeOpt() == TProofOutputFile::kLocal) ? kTRUE : kFALSE;
2601  TFileMerger *filemerger = pf->GetFileMerger(localMerge);
2602  if (!filemerger) {
2603  Error("MergeOutputFiles", "file merger is null in TProofOutputFile! Protocol error?");
2604  pf->Print();
2605  continue;
2606  }
2607  // If only one instance the list in the merger is not yet created: do it now
2608  if (!pf->IsMerged()) {
2609  PDB(kOutput,2) pf->Print();
2610  TString fileLoc = TString::Format("%s/%s", pf->GetDir(), pf->GetFileName());
2611  filemerger->AddFile(fileLoc);
2612  }
2613  // Datadir
2614  TString ddir, ddopts;
2615  if (gProofServ) {
2616  ddir.Form("%s/", gProofServ->GetDataDir());
2618  }
2619  // Set the output file
2620  TString outfile(pf->GetOutputFileName());
2621  if (outfile.Contains("<datadir>/")) {
2622  outfile.ReplaceAll("<datadir>/", ddir.Data());
2623  if (!ddopts.IsNull())
2624  outfile += TString::Format("?%s", ddopts.Data());
2625  pf->SetOutputFileName(outfile);
2626  }
2627  if ((gProofServ && gProofServ->IsTopMaster()) || (fProof && fProof->IsLite())) {
2629  TString srv;
2631  TUrl usrv(srv);
2632  Bool_t localFile = kFALSE;
2633  if (pf->IsRetrieve()) {
2634  // This file will be retrieved by the client: we created it in the data dir
2635  // and save the file URL on the client in the title
2636  if (outfile.BeginsWith("client:")) outfile.Replace(0, 7, "");
2637  TString bn = gSystem->BaseName(TUrl(outfile.Data(), kTRUE).GetFile());
2638  // The output file path on the master
2639  outfile.Form("%s%s", ddir.Data(), bn.Data());
2640  // Save the client path in the title if not defined yet
2641  if (strlen(pf->GetTitle()) <= 0) pf->SetTitle(bn);
2642  // The file is local
2643  localFile = kTRUE;
2644  } else {
2645  // Check if the file is on the master or elsewhere
2646  if (outfile.BeginsWith("master:")) outfile.Replace(0, 7, "");
2647  // Check locality
2648  TUrl uof(outfile.Data(), kTRUE);
2649  TString lfn;
2650  ftyp = TFile::GetType(uof.GetUrl(), "RECREATE", &lfn);
2651  if (ftyp == TFile::kLocal && !srv.IsNull()) {
2652  // Check if is a different server
2653  if (uof.GetPort() > 0 && usrv.GetPort() > 0 &&
2654  usrv.GetPort() != uof.GetPort()) ftyp = TFile::kNet;
2655  }
2656  // If it is really local set the file name
2657  if (ftyp == TFile::kLocal) outfile = lfn;
2658  // The file maybe local
2659  if (ftyp == TFile::kLocal || ftyp == TFile::kFile) localFile = kTRUE;
2660  }
2661  // The remote output file name (the one to be used by the client)
2662  TString outfilerem(outfile);
2663  // For local files we add the local server
2664  if (localFile) {
2665  // Remove prefix, if any, if included and if Xrootd
2666  TProofServ::FilterLocalroot(outfilerem, srv);
2667  outfilerem.Insert(0, srv);
2668  }
2669  // Save the new remote output filename
2670  pf->SetOutputFileName(outfilerem);
2671  // Align the filename
2672  pf->SetFileName(gSystem->BaseName(outfilerem));
2673  }
2674  if (!filemerger->OutputFile(outfile)) {
2675  Error("MergeOutputFiles", "cannot open the output file");
2676  continue;
2677  }
2678  // Merge
2679  PDB(kSubmerger,2) filemerger->PrintFiles("");
2680  if (!filemerger->Merge()) {
2681  Error("MergeOutputFiles", "cannot merge the output files");
2682  continue;
2683  }
2684  // Remove the files
2685  TList *fileList = filemerger->GetMergeList();
2686  if (fileList) {
2687  TIter next(fileList);
2688  TObjString *url = 0;
2689  while((url = (TObjString*)next())) {
2690  TUrl u(url->GetName());
2691  if (!strcmp(u.GetProtocol(), "file")) {
2692  gSystem->Unlink(u.GetFile());
2693  } else {
2694  gSystem->Unlink(url->GetName());
2695  }
2696  }
2697  }
2698  // Reset the merger
2699  filemerger->Reset();
2700 
2701  } else {
2702 
2703  // If not yet merged (for example when having only 1 active worker,
2704  // we need to create the dataset by calling Merge on an effectively empty list
2705  if (!pf->IsMerged()) {
2706  TList dumlist;
2707  dumlist.Add(new TNamed("dum", "dum"));
2708  dumlist.SetOwner(kTRUE);
2709  pf->Merge(&dumlist);
2710  }
2711  // Point to the dataset
2713  if (!fc) {
2714  Error("MergeOutputFiles", "file collection is null in TProofOutputFile! Protocol error?");
2715  pf->Print();
2716  continue;
2717  }
2718  // Add the collection to the output list for registration and/or to be returned
2719  // to the client
2720  fOutput->Add(fc);
2721  // Do not cleanup at destruction
2722  pf->ResetFileCollection();
2723  // Tell the main thread to register this dataset, if needed
2724  if (pf->IsRegister()) {
2725  TString opt;
2726  if ((pf->GetTypeOpt() & TProofOutputFile::kOverwrite)) opt += "O";
2727  if ((pf->GetTypeOpt() & TProofOutputFile::kVerify)) opt += "V";
2728  if (!fOutput->FindObject("PROOFSERV_RegisterDataSet"))
2729  fOutput->Add(new TNamed("PROOFSERV_RegisterDataSet", ""));
2730  TString tag = TString::Format("DATASET_%s", pf->GetTitle());
2731  fOutput->Add(new TNamed(tag, opt));
2732  }
2733  // Remove this object from the output list and schedule it for distruction
2734  fOutput->Remove(pf);
2735  if (!rmList) rmList = new TList;
2736  rmList->Add(pf);
2737  PDB(kOutput,2) fOutput->Print();
2738  }
2739  }
2740  }
2741  }
2742 
2743  // Remove objects scheduled for removal
2744  if (rmList && rmList->GetSize() > 0) {
2745  TIter nxo(rmList);
2746  TObject *o = 0;
2747  while((o = nxo())) {
2748  fOutput->Remove(o);
2749  }
2750  rmList->SetOwner(kTRUE);
2751  delete rmList;
2752  }
2753 
2754  PDB(kOutput,1) Info("MergeOutputFiles", "done!");
2755 
2756  // Done
2757  return kTRUE;
2758 }
2759 
2760 
2761 ////////////////////////////////////////////////////////////////////////////////
2762 /// Set the selector's data members:
2763 /// find the mapping of data members to otuput list entries in the output list
2764 /// and apply it.
2765 
2767 {
2770  if (!olsdm) {
2771  PDB(kOutput,1) Warning("SetSelectorDataMembersFromOutputList",
2772  "failed to find map object in output list!");
2773  return;
2774  }
2775 
2776  olsdm->SetDataMembers(fSelector);
2777 }
2778 
2779 ////////////////////////////////////////////////////////////////////////////////
2780 
2782 {
2783  // Finalize a query.
2784  // Returns -1 in case of an error, 0 otherwise.
2785 
2786  if (IsClient()) {
2787  if (fOutputLists == 0) {
2788  if (force)
2789  if (fQuery)
2790  return fProof->Finalize(Form("%s:%s", fQuery->GetTitle(),
2791  fQuery->GetName()), force);
2792  } else {
2793  // Make sure the all objects are in the output list
2794  PDB(kGlobal,1) Info("Finalize","Calling Merge Output to finalize the output list");
2795  MergeOutput();
2796  }
2797  }
2798 
2799  Long64_t rv = 0;
2800  if (fProof->IsMaster()) {
2801 
2802  // Fill information for monitoring and stop it
2803  TStatus *status = (TStatus *) fOutput->FindObject("PROOF_Status");
2804  if (!status) {
2805  // The query was aborted: let's add some info in the output list
2806  status = new TStatus();
2807  fOutput->Add(status);
2808  TString emsg = TString::Format("Query aborted after %lld entries", GetEventsProcessed());
2809  status->Add(emsg);
2810  }
2811  status->SetExitStatus((Int_t) GetExitStatus());
2812 
2813  PDB(kOutput,1) Info("Finalize","Calling Merge Output");
2814  // Some objects (e.g. histos in autobin) may not have been merged yet
2815  // do it now
2816  MergeOutput();
2817 
2818  fOutput->SetOwner();
2819 
2820  // Add the active-wrks-vs-proctime info from the packetizer
2821  if (fPacketizer) {
2823  if (pperf) fOutput->Add(pperf);
2825  if (parms) {
2826  TIter nxo(parms);
2827  TObject *o = 0;
2828  while ((o = nxo())) fOutput->Add(o);
2829  }
2830 
2831  // If other invalid elements were found during processing, add them to the
2832  // list of missing elements
2833  TDSetElement *elem = 0;
2834  if (fPacketizer->GetFailedPackets()) {
2836  TList *listOfMissingFiles = (TList *) fOutput->FindObject("MissingFiles");
2837  if (!listOfMissingFiles) {
2838  listOfMissingFiles = new TList;
2839  listOfMissingFiles->SetName("MissingFiles");
2840  }
2842  while ((elem = (TDSetElement *)nxe()))
2843  listOfMissingFiles->Add(elem->GetFileInfo(type));
2844  if (!fOutput->FindObject(listOfMissingFiles)) fOutput->Add(listOfMissingFiles);
2845  }
2846  }
2847 
2848  TPerfStats::Stop();
2849  // Save memory usage on master
2850  Long_t vmaxmst, rmaxmst;
2851  TPerfStats::GetMemValues(vmaxmst, rmaxmst);
2852  status->SetMemValues(vmaxmst, rmaxmst, kTRUE);
2853 
2855 
2856  } else {
2857  if (fExitStatus != kAborted) {
2858 
2859  if (!sync) {
2860  // Reinit selector (with multi-sessioning we must do this until
2861  // TSelector::GetSelector() is optimized to i) avoid reloading of an
2862  // unchanged selector and ii) invalidate existing instances of
2863  // reloaded selector)
2864  if (ReinitSelector(fQuery) == -1) {
2865  Info("Finalize", "problems reinitializing selector \"%s\"",
2866  fQuery->GetSelecImp()->GetName());
2867  return -1;
2868  }
2869  }
2870 
2871  if (fPacketizer)
2872  if (TList *failedPackets = fPacketizer->GetFailedPackets()) {
2874  failedPackets->SetName("FailedPackets");
2875  AddOutputObject(failedPackets);
2876 
2877  TStatus *status = (TStatus *)GetOutput("PROOF_Status");
2878  if (!status) AddOutputObject((status = new TStatus()));
2879  status->Add("Some packets were not processed! Check the the"
2880  " 'FailedPackets' list in the output list");
2881  }
2882 
2883  // Some input parameters may be needed in Terminate
2885 
2887  if (output) {
2888  TIter next(fOutput);
2889  while(TObject* obj = next()) {
2890  if (fProof->IsParallel() || DrawCanvas(obj) == 1)
2891  // Either parallel or not a canvas or not able to display it:
2892  // just add to the list
2893  output->Add(obj);
2894  }
2895  } else {
2896  Warning("Finalize", "undefined output list in the selector! Protocol error?");
2897  }
2898 
2899  // We need to do this because the output list can be modified in TSelector::Terminate
2900  // in a way to invalidate existing objects; so we clean the links when still valid and
2901  // we re-copy back later
2903  fOutput->Clear("nodelete");
2904 
2905  // Map output objects to selector members
2907 
2908  PDB(kLoop,1) Info("Finalize","Call Terminate()");
2909  // This is the end of merging
2910  SetMerging(kFALSE);
2911  // We measure the merge time
2912  fProof->fQuerySTW.Reset();
2913  // Call Terminate now
2914  fSelector->Terminate();
2915 
2916  rv = fSelector->GetStatus();
2917 
2918  // Copy the output list back and clean the selector's list
2919  TIter it(output);
2920  while(TObject* o = it()) {
2921  fOutput->Add(o);
2922  }
2923 
2924  // Save the output list in the current query, if any
2925  if (fQuery) {
2927  // Set in finalized state (cannot be done twice)
2928  fQuery->SetFinalized();
2929  } else {
2930  Warning("Finalize","current TQueryResult object is undefined!");
2931  }
2932 
2933  if (!fCreateSelObj) {
2936  if (output) output->Remove(fSelector);
2937  fSelector = 0;
2938  }
2939 
2940  // We have transferred copy of the output objects in TQueryResult,
2941  // so now we can cleanup the selector, making sure that we do not
2942  // touch the output objects
2943  if (output) { output->SetOwner(kFALSE); output->Clear("nodelete"); }
2945 
2946  // Delete fOutput (not needed anymore, cannot be finalized twice),
2947  // making sure that the objects saved in TQueryResult are not deleted
2949  fOutput->Clear("nodelete");
2951 
2952  } else {
2953 
2954  // Cleanup
2955  fOutput->SetOwner();
2957  if (!fCreateSelObj) fSelector = 0;
2958  }
2959  }
2960  PDB(kGlobal,1) Info("Process","exit");
2961 
2962  if (!IsClient()) {
2963  Info("Finalize", "finalization on %s finished", gProofServ->GetPrefix());
2964  }
2966 
2967  return rv;
2968 }
2969 
2970 ////////////////////////////////////////////////////////////////////////////////
2971 /// Finalize the results of a query already processed.
2972 
2974 {
2975  PDB(kGlobal,1) Info("Finalize(TQueryResult *)","Enter");
2976 
2977  if (!IsClient()) {
2978  Info("Finalize(TQueryResult *)",
2979  "method to be executed only on the clients");
2980  return -1;
2981  }
2982 
2983  if (!qr) {
2984  Info("Finalize(TQueryResult *)", "query undefined");
2985  return -1;
2986  }
2987 
2988  if (qr->IsFinalized()) {
2989  Info("Finalize(TQueryResult *)", "query already finalized");
2990  return -1;
2991  }
2992 
2993  // Reset the list
2994  if (!fOutput)
2995  fOutput = new THashList;
2996  else
2997  fOutput->Clear();
2998 
2999  // Make sure that the temporary output list is empty
3000  if (fOutputLists) {
3001  fOutputLists->Delete();
3002  delete fOutputLists;
3003  fOutputLists = 0;
3004  }
3005 
3006  // Re-init the selector
3008 
3009  // Import the output list
3010  TList *tmp = (TList *) qr->GetOutputList();
3011  if (!tmp) {
3012  gSystem->RedirectOutput(0);
3013  Info("Finalize(TQueryResult *)", "outputlist is empty");
3014  return -1;
3015  }
3016  TList *out = fOutput;
3017  if (fProof->fProtocol < 11)
3018  out = new TList;
3019  TIter nxo(tmp);
3020  TObject *o = 0;
3021  while ((o = nxo()))
3022  out->Add(o->Clone());
3023 
3024  // Adopts the list
3025  if (fProof->fProtocol < 11) {
3026  out->SetOwner();
3027  StoreOutput(out);
3028  }
3029  gSystem->RedirectOutput(0);
3030 
3032 
3033  // Finalize it
3034  SetCurrentQuery(qr);
3035  Long64_t rc = Finalize();
3037 
3038  return rc;
3039 }
3040 
3041 ////////////////////////////////////////////////////////////////////////////////
3042 /// Send the selector file(s) to master or worker nodes.
3043 
3044 Bool_t TProofPlayerRemote::SendSelector(const char* selector_file)
3045 {
3046  // Check input
3047  if (!selector_file) {
3048  Info("SendSelector", "Invalid input: selector (file) name undefined");
3049  return kFALSE;
3050  }
3051 
3052  if (!strchr(gSystem->BaseName(selector_file), '.')) {
3053  if (gDebug > 1)
3054  Info("SendSelector", "selector name '%s' does not contain a '.':"
3055  " nothing to send, it will be loaded from a library", selector_file);
3056  return kTRUE;
3057  }
3058 
3059  // Extract the fine name first
3060  TString selec = selector_file;
3061  TString aclicMode;
3062  TString arguments;
3063  TString io;
3064  selec = gSystem->SplitAclicMode(selec, aclicMode, arguments, io);
3065 
3066  // Expand possible envs or '~'
3067  gSystem->ExpandPathName(selec);
3068 
3069  // Update the macro path
3071  TString np = gSystem->GetDirName(selec);
3072  if (!np.IsNull()) {
3073  np += ":";
3074  if (!mp.BeginsWith(np) && !mp.Contains(":"+np)) {
3075  Int_t ip = (mp.BeginsWith(".:")) ? 2 : 0;
3076  mp.Insert(ip, np);
3077  TROOT::SetMacroPath(mp);
3078  if (gDebug > 0)
3079  Info("SendSelector", "macro path set to '%s'", TROOT::GetMacroPath());
3080  }
3081  }
3082 
3083  // Header file
3084  TString header = selec;
3085  header.Remove(header.Last('.'));
3086  header += ".h";
3087  if (gSystem->AccessPathName(header, kReadPermission)) {
3088  TString h = header;
3089  header.Remove(header.Last('.'));
3090  header += ".hh";
3091  if (gSystem->AccessPathName(header, kReadPermission)) {
3092  Info("SendSelector",
3093  "header file not found: tried: %s %s", h.Data(), header.Data());
3094  return kFALSE;
3095  }
3096  }
3097 
3098  // Send files now
3100  Info("SendSelector", "problems sending implementation file %s", selec.Data());
3101  return kFALSE;
3102  }
3103  if (fProof->SendFile(header, (TProof::kBinary | TProof::kForward | TProof::kCp)) == -1) {
3104  Info("SendSelector", "problems sending header file %s", header.Data());
3105  return kFALSE;
3106  }
3107 
3108  return kTRUE;
3109 }
3110 
3111 ////////////////////////////////////////////////////////////////////////////////
3112 /// Merge objects in output the lists.
3113 
3115 {
3116  PDB(kOutput,1) Info("MergeOutput","Enter");
3117 
3118  TObject *obj = 0;
3119  if (fOutputLists) {
3120 
3121  TIter next(fOutputLists);
3122 
3123  TList *list;
3124  while ( (list = (TList *) next()) ) {
3125 
3126  if (!(obj = fOutput->FindObject(list->GetName()))) {
3127  obj = list->First();
3128  list->Remove(obj);
3129  fOutput->Add(obj);
3130  }
3131 
3132  if ( list->IsEmpty() ) continue;
3133 
3134  TMethodCall callEnv;
3135  if (obj->IsA())
3136  callEnv.InitWithPrototype(obj->IsA(), "Merge", "TCollection*");
3137  if (callEnv.IsValid()) {
3138  callEnv.SetParam((Long_t) list);
3139  callEnv.Execute(obj);
3140  } else {
3141  // No Merge interface, return individual objects
3142  while ( (obj = list->First()) ) {
3143  fOutput->Add(obj);
3144  list->Remove(obj);
3145  }
3146  }
3147  }
3149 
3150  } else {
3151 
3152  PDB(kOutput,1) Info("MergeOutput","fOutputLists empty");
3153  }
3154 
3155  if (!IsClient() || fProof->IsLite()) {
3156  // Merge the output files created on workers, if any
3157  MergeOutputFiles();
3158  }
3159 
3160  // If there are TProofOutputFile objects we have to make sure that the internal
3161  // information is consistent for the cases where this object is going to be merged
3162  // again (e.g. when using submergers or in a multi-master setup). This may not be
3163  // the case because the first coming in is taken as reference and it has the
3164  // internal dir and raw dir of the originating worker.
3165  TString key;
3166  TNamed *nm = 0;
3167  TList rmlist;
3168  TIter nxo(fOutput);
3169  while ((obj = nxo())) {
3170  TProofOutputFile *pf = dynamic_cast<TProofOutputFile *>(obj);
3171  if (pf) {
3172  if (gProofServ) {
3173  PDB(kOutput,2) Info("MergeOutput","found TProofOutputFile '%s'", obj->GetName());
3174  TString dir(pf->GetOutputFileName());
3175  PDB(kOutput,2) Info("MergeOutput","outputfilename: '%s'", dir.Data());
3176  // The dir
3177  if (dir.Last('/') != kNPOS) dir.Remove(dir.Last('/')+1);
3178  PDB(kOutput,2) Info("MergeOutput","dir: '%s'", dir.Data());
3179  pf->SetDir(dir);
3180  // The raw dir; for xrootd based system we include the 'localroot', if any
3181  TUrl u(dir);
3182  dir = u.GetFile();
3183  TString pfx = gEnv->GetValue("Path.Localroot","");
3184  if (!pfx.IsNull() &&
3185  (!strcmp(u.GetProtocol(), "root") || !strcmp(u.GetProtocol(), "xrd")))
3186  dir.Insert(0, pfx);
3187  PDB(kOutput,2) Info("MergeOutput","rawdir: '%s'", dir.Data());
3188  pf->SetDir(dir, kTRUE);
3189  // The worker ordinal
3191  // The saved output file name, if any
3192  key.Form("PROOF_OutputFileName_%s", pf->GetFileName());
3193  if ((nm = (TNamed *) fOutput->FindObject(key.Data()))) {
3194  pf->SetOutputFileName(nm->GetTitle());
3195  rmlist.Add(nm);
3197  pf->SetOutputFileName(0);
3199  }
3200  // The filename (order is important to exclude '.merger' from the key)
3201  dir = pf->GetFileName();
3203  dir += ".merger";
3204  pf->SetMerged(kFALSE);
3205  } else {
3206  if (dir.EndsWith(".merger")) dir.Remove(dir.Last('.'));
3207  }
3208  pf->SetFileName(dir);
3209  } else if (fProof->IsLite()) {
3210  // The ordinal
3211  pf->SetWorkerOrdinal("0");
3212  // The dir
3214  // The filename and raw dir
3215  TUrl u(pf->GetOutputFileName(), kTRUE);
3216  pf->SetFileName(gSystem->BaseName(u.GetFile()));
3217  pf->SetDir(gSystem->GetDirName(u.GetFile()), kTRUE);
3218  // Notify the output path
3219  Printf("\nOutput file: %s", pf->GetOutputFileName());
3220  }
3221  } else {
3222  PDB(kOutput,2) Info("MergeOutput","output object '%s' is not a TProofOutputFile", obj->GetName());
3223  }
3224  }
3225 
3226  // Remove temporary objects from fOutput
3227  if (rmlist.GetSize() > 0) {
3228  TIter nxrm(&rmlist);
3229  while ((obj = nxrm()))
3230  fOutput->Remove(obj);
3231  rmlist.SetOwner(kTRUE);
3232  }
3233 
3234  // If requested (typically in case of submerger to count possible side-effects in that process)
3235  // save the measured memory usage
3236  if (saveMemValues) {
3237  TPerfStats::Stop();
3238  // Save memory usage on master
3239  Long_t vmaxmst, rmaxmst;
3240  TPerfStats::GetMemValues(vmaxmst, rmaxmst);
3241  TStatus *status = (TStatus *) fOutput->FindObject("PROOF_Status");
3242  if (status) status->SetMemValues(vmaxmst, rmaxmst, kFALSE);
3243  }
3244 
3245  PDB(kOutput,1) fOutput->Print();
3246  PDB(kOutput,1) Info("MergeOutput","leave (%d object(s))", fOutput->GetSize());
3247 }
3248 
3249 ////////////////////////////////////////////////////////////////////////////////
3250 /// Progress signal.
3251 
3253 {
3254  if (IsClient()) {
3255  fProof->Progress(total, processed);
3256  } else {
3257  // Send to the previous tier
3259  m << total << processed;
3260  gProofServ->GetSocket()->Send(m);
3261  }
3262 }
3263 
3264 ////////////////////////////////////////////////////////////////////////////////
3265 /// Progress signal.
3266 
3268  Long64_t bytesread,
3269  Float_t initTime, Float_t procTime,
3270  Float_t evtrti, Float_t mbrti)
3271 {
3272  PDB(kGlobal,1)
3273  Info("Progress","%lld %lld %lld %f %f %f %f", total, processed, bytesread,
3274  initTime, procTime, evtrti, mbrti);
3275 
3276  if (IsClient()) {
3277  fProof->Progress(total, processed, bytesread, initTime, procTime, evtrti, mbrti);
3278  } else {
3279  // Send to the previous tier
3281  m << total << processed << bytesread << initTime << procTime << evtrti << mbrti;
3282  gProofServ->GetSocket()->Send(m);
3283  }
3284 }
3285 
3286 ////////////////////////////////////////////////////////////////////////////////
3287 /// Progress signal.
3288 
3290 {
3291  if (pi) {
3292  PDB(kGlobal,1)
3293  Info("Progress","%lld %lld %lld %f %f %f %f %d %f", pi->fTotal, pi->fProcessed, pi->fBytesRead,
3294  pi->fInitTime, pi->fProcTime, pi->fEvtRateI, pi->fMBRateI,
3295  pi->fActWorkers, pi->fEffSessions);
3296 
3297  if (IsClient()) {
3298  fProof->Progress(pi->fTotal, pi->fProcessed, pi->fBytesRead,
3299  pi->fInitTime, pi->fProcTime,
3300  pi->fEvtRateI, pi->fMBRateI,
3301  pi->fActWorkers, pi->fTotSessions, pi->fEffSessions);
3302  } else {
3303  // Send to the previous tier
3305  m << pi;
3306  gProofServ->GetSocket()->Send(m);
3307  }
3308  } else {
3309  Warning("Progress","TProofProgressInfo object undefined!");
3310  }
3311 }
3312 
3313 
3314 ////////////////////////////////////////////////////////////////////////////////
3315 /// Feedback signal.
3316 
3318 {
3319  fProof->Feedback(objs);
3320 }
3321 
3322 ////////////////////////////////////////////////////////////////////////////////
3323 /// Stop process after this event.
3324 
3326 {
3327  if (fPacketizer != 0)
3328  fPacketizer->StopProcess(abort, kFALSE);
3329  if (abort == kTRUE)
3331  else
3333 }
3334 
3335 ////////////////////////////////////////////////////////////////////////////////
3336 /// Incorporate the received object 'obj' into the output list fOutput.
3337 /// The latter is created if not existing.
3338 /// This method short cuts 'StoreOutput + MergeOutput' optimizing the memory
3339 /// consumption.
3340 /// Returns -1 in case of error, 1 if the object has been merged into another
3341 /// one (so that its ownership has not been taken and can be deleted), and 0
3342 /// otherwise.
3343 
3345 {
3346  PDB(kOutput,1)
3347  Info("AddOutputObject","Enter: %p (%s)", obj, obj ? obj->ClassName() : "undef");
3348 
3349  // We must something to process
3350  if (!obj) {
3351  PDB(kOutput,1) Info("AddOutputObject","Invalid input (obj == 0x0)");
3352  return -1;
3353  }
3354 
3355  // Create the output list, if not yet done
3356  if (!fOutput)
3357  fOutput = new THashList;
3358 
3359  // Flag about merging
3360  Bool_t merged = kTRUE;
3361 
3362  // Process event lists first
3363  TList *elists = dynamic_cast<TList *> (obj);
3364  if (elists && !strcmp(elists->GetName(), "PROOF_EventListsList")) {
3365 
3366  // Create a global event list, result of merging the event lists
3367  // coresponding to the various data set elements
3368  TEventList *evlist = new TEventList("PROOF_EventList");
3369 
3370  // Iterate the list of event list segments
3371  TIter nxevl(elists);
3372  TEventList *evl = 0;
3373  while ((evl = dynamic_cast<TEventList *> (nxevl()))) {
3374 
3375  // Find the file offset (fDSet is the current TDSet instance)
3376  // locating the element by name
3377  TIter nxelem(fDSet->GetListOfElements());
3378  TDSetElement *elem = 0;
3379  while ((elem = dynamic_cast<TDSetElement *> (nxelem()))) {
3380  if (!strcmp(elem->GetFileName(), evl->GetName()))
3381  break;
3382  }
3383  if (!elem) {
3384  Error("AddOutputObject", "Found an event list for %s, but no object with"
3385  " the same name in the TDSet", evl->GetName());
3386  continue;
3387  }
3388  Long64_t offset = elem->GetTDSetOffset();
3389 
3390  // Shift the list by the number of first event in that file
3391  Long64_t *arr = evl->GetList();
3392  Int_t num = evl->GetN();
3393  if (arr && offset > 0)
3394  for (Int_t i = 0; i < num; i++)
3395  arr[i] += offset;
3396 
3397  // Add to the global event list
3398  evlist->Add(evl);
3399  }
3400 
3401  // Incorporate the resulting global list in fOutput
3402  SetLastMergingMsg(evlist);
3403  Incorporate(evlist, fOutput, merged);
3404  NotifyMemory(evlist);
3405 
3406  // Delete the global list if merged
3407  if (merged)
3408  SafeDelete(evlist);
3409 
3410  // The original object has been transformed in something else; we do
3411  // not have ownership on it
3412  return 1;
3413  }
3414 
3415  // Check if we need to merge files
3416  TProofOutputFile *pf = dynamic_cast<TProofOutputFile*>(obj);
3417  if (pf) {
3418  fMergeFiles = kTRUE;
3419  if (!IsClient() || fProof->IsLite()) {
3420  if (pf->IsMerge()) {
3421  Bool_t hasfout = (pf->GetOutputFileName() &&
3422  strlen(pf->GetOutputFileName()) > 0 &&
3424  Bool_t setfout = (!hasfout || TestBit(TVirtualProofPlayer::kIsSubmerger)) ? kTRUE : kFALSE;
3425  if (setfout) {
3426 
3427  TString ddir, ddopts;
3428  if (gProofServ) {
3429  ddir.Form("%s/", gProofServ->GetDataDir());
3430  if (gProofServ->GetDataDirOpts()) ddopts = gProofServ->GetDataDirOpts();
3431  }
3432  // Set the output file
3433  TString outfile(pf->GetOutputFileName());
3434  outfile.ReplaceAll("<datadir>/", ddir.Data());
3435  if (!ddopts.IsNull()) outfile += TString::Format("?%s", ddopts.Data());
3436  pf->SetOutputFileName(outfile);
3437 
3438  if (gProofServ) {
3439  // If submerger, save first the existing filename, if any
3440  if (TestBit(TVirtualProofPlayer::kIsSubmerger) && hasfout) {
3441  TString key = TString::Format("PROOF_OutputFileName_%s", pf->GetFileName());
3442  if (!fOutput->FindObject(key.Data()))
3443  fOutput->Add(new TNamed(key.Data(), pf->GetOutputFileName()));
3444  }
3445  TString of;
3447  if (of.IsNull()) {
3448  // Assume an xroot server running on the machine
3449  of.Form("root://%s/", gSystem->HostName());
3450  if (gSystem->Getenv("XRDPORT")) {
3451  TString sp(gSystem->Getenv("XRDPORT"));
3452  if (sp.IsDigit())
3453  of.Form("root://%s:%s/", gSystem->HostName(), sp.Data());
3454  }
3455  }
3456  TString sessionPath(gProofServ->GetSessionDir());
3457  TProofServ::FilterLocalroot(sessionPath, of);
3458  of += TString::Format("%s/%s", sessionPath.Data(), pf->GetFileName());
3460  if (!of.EndsWith(".merger")) of += ".merger";
3461  } else {
3462  if (of.EndsWith(".merger")) of.Remove(of.Last('.'));
3463  }
3464  pf->SetOutputFileName(of);
3465  }
3466  }
3467  // Notify
3468  PDB(kOutput, 1) pf->Print();
3469  }
3470  } else {
3471  // On clients notify the output path
3472  Printf("Output file: %s", pf->GetOutputFileName());
3473  }
3474  }
3475 
3476  // For other objects we just run the incorporation procedure
3477  SetLastMergingMsg(obj);
3478  Incorporate(obj, fOutput, merged);
3479  NotifyMemory(obj);
3480 
3481  // We are done
3482  return (merged ? 1 : 0);
3483 }
3484 
3485 ////////////////////////////////////////////////////////////////////////////////
3486 /// Control output redirection to TProof::fLogFileW
3487 
3489 {
3490  if (on && fProof && fProof->fLogFileW) {
3493  } else if (!on) {
3494  if (fErrorHandler) {
3497  }
3498  }
3499 }
3500 
3501 ////////////////////////////////////////////////////////////////////////////////
3502 /// Incorporate the content of the received output list 'out' into the final
3503 /// output list fOutput. The latter is created if not existing.
3504 /// This method short cuts 'StoreOutput + MergeOutput' limiting the memory
3505 /// consumption.
3506 
3508 {
3509  PDB(kOutput,1) Info("AddOutput","Enter");
3510 
3511  // We must something to process
3512  if (!out) {
3513  PDB(kOutput,1) Info("AddOutput","Invalid input (out == 0x0)");
3514  return;
3515  }
3516 
3517  // Create the output list, if not yet done
3518  if (!fOutput)
3519  fOutput = new THashList;
3520 
3521  // Process event lists first
3522  Bool_t merged = kTRUE;
3523  TList *elists = dynamic_cast<TList *> (out->FindObject("PROOF_EventListsList"));
3524  if (elists) {
3525 
3526  // Create a global event list, result of merging the event lists
3527  // corresponding to the various data set elements
3528  TEventList *evlist = new TEventList("PROOF_EventList");
3529 
3530  // Iterate the list of event list segments
3531  TIter nxevl(elists);
3532  TEventList *evl = 0;
3533  while ((evl = dynamic_cast<TEventList *> (nxevl()))) {
3534 
3535  // Find the file offset (fDSet is the current TDSet instance)
3536  // locating the element by name
3537  TIter nxelem(fDSet->GetListOfElements());
3538  TDSetElement *elem = 0;
3539  while ((elem = dynamic_cast<TDSetElement *> (nxelem()))) {
3540  if (!strcmp(elem->GetFileName(), evl->GetName()))
3541  break;
3542  }
3543  if (!elem) {
3544  Error("AddOutput", "Found an event list for %s, but no object with"
3545  " the same name in the TDSet", evl->GetName());
3546  continue;
3547  }
3548  Long64_t offset = elem->GetTDSetOffset();
3549 
3550  // Shift the list by the number of first event in that file
3551  Long64_t *arr = evl->GetList();
3552  Int_t num = evl->GetN();
3553  if (arr && offset > 0)
3554  for (Int_t i = 0; i < num; i++)
3555  arr[i] += offset;
3556 
3557  // Add to the global event list
3558  evlist->Add(evl);
3559  }
3560 
3561  // Remove and delete the events lists object to avoid spoiling iteration
3562  // during next steps
3563  out->Remove(elists);
3564  delete elists;
3565 
3566  // Incorporate the resulting global list in fOutput
3567  SetLastMergingMsg(evlist);
3568  Incorporate(evlist, fOutput, merged);
3569  NotifyMemory(evlist);
3570  }
3571 
3572  // Iterate on the remaining objects in the received list
3573  TIter nxo(out);
3574  TObject *obj = 0;
3575  while ((obj = nxo())) {
3576  SetLastMergingMsg(obj);
3577  Incorporate(obj, fOutput, merged);
3578  // If not merged, drop from the temporary list, as the ownership
3579  // passes to fOutput
3580  if (!merged)
3581  out->Remove(obj);
3582  NotifyMemory(obj);
3583  }
3584 
3585  // Done
3586  return;
3587 }
3588 
3589 ////////////////////////////////////////////////////////////////////////////////
3590 /// Printout the memory record after merging object 'obj'
3591 /// This record is used by the memory monitor
3592 
3594 {
3595  if (fProof && (!IsClient() || fProof->IsLite())){
3596  ProcInfo_t pi;
3597  if (!gSystem->GetProcInfo(&pi)){
3598  // For PROOF-Lite we redirect this output to a the open log file so that the
3599  // memory monitor can pick these messages up
3601  Info("NotifyMemory|Svc", "Memory %ld virtual %ld resident after merging object %s",
3602  pi.fMemVirtual, pi.fMemResident, obj->GetName());
3603  RedirectOutput(0);
3604  }
3605  // Record also values for monitoring
3607  }
3608 }
3609 
3610 ////////////////////////////////////////////////////////////////////////////////
3611 /// Set the message to be notified in case of exception
3612 
3614 {
3615  TString lastMsg = TString::Format("while merging object '%s'", obj->GetName());
3616  TProofServ::SetLastMsg(lastMsg);
3617 }
3618 
3619 ////////////////////////////////////////////////////////////////////////////////
3620 /// Incorporate object 'newobj' in the list 'outlist'.
3621 /// The object is merged with an object of the same name already existing in
3622 /// the list, or just added.
3623 /// The boolean merged is set to kFALSE when the object is just added to 'outlist';
3624 /// this happens if the Merge() method does not exist or if a object named as 'obj'
3625 /// is not already in the list. If the obj is not 'merged' than it should not be
3626 /// deleted, unless outlist is not owner of its objects.
3627 /// Return 0 on success, -1 on error.
3628 
3630 {
3631  merged = kTRUE;
3632 
3633  PDB(kOutput,1)
3634  Info("Incorporate", "enter: obj: %p (%s), list: %p",
3635  newobj, newobj ? newobj->ClassName() : "undef", outlist);
3636 
3637  // The object and list must exist
3638  if (!newobj || !outlist) {
3639  Error("Incorporate","Invalid inputs: obj: %p, list: %p", newobj, outlist);
3640  return -1;
3641  }
3642 
3643  // Special treatment for histograms in autobin mode
3644  Bool_t specialH =
3646  if (specialH && newobj->InheritsFrom(TH1::Class())) {
3647  if (!HandleHistogram(newobj, merged)) {
3648  if (merged) {
3649  PDB(kOutput,1) Info("Incorporate", "histogram object '%s' merged", newobj->GetName());
3650  } else {
3651  PDB(kOutput,1) Info("Incorporate", "histogram object '%s' added to the"
3652  " appropriate list for delayed merging", newobj->GetName());
3653  }
3654  return 0;
3655  }
3656  }
3657 
3658  // Check if an object with the same name exists already
3659  TObject *obj = outlist->FindObject(newobj->GetName());
3660 
3661  // If no, add the new object and return
3662  if (!obj) {
3663  outlist->Add(newobj);
3664  merged = kFALSE;
3665  // Done
3666  return 0;
3667  }
3668 
3669  // Locate the Merge(TCollection *) method
3670  TMethodCall callEnv;
3671  if (obj->IsA())
3672  callEnv.InitWithPrototype(obj->IsA(), "Merge", "TCollection*");
3673  if (callEnv.IsValid()) {
3674  // Found: put the object in a one-element list
3675  static TList *xlist = new TList;
3676  xlist->Add(newobj);
3677  // Call the method
3678  callEnv.SetParam((Long_t) xlist);
3679  callEnv.Execute(obj);
3680  // Ready for next call
3681  xlist->Clear();
3682  } else {
3683  // Not found: return individual objects
3684  outlist->Add(newobj);
3685  merged = kFALSE;
3686  }
3687 
3688  // Done
3689  return 0;
3690 }
3691 
3692 ////////////////////////////////////////////////////////////////////////////////
3693 /// Low statistic histograms need a special treatment when using autobin
3694 
3696 {
3697  TH1 *h = dynamic_cast<TH1 *>(obj);
3698  if (!h) {
3699  // Not an histo
3700  return obj;
3701  }
3702 
3703  // This is only used if we return (TObject *)0 and there is only one case
3704  // when we set this to kTRUE
3705  merged = kFALSE;
3706 
3707  // Does is still needs binning ?
3708  Bool_t tobebinned = (h->GetBuffer()) ? kTRUE : kFALSE;
3709 
3710  // Number of entries
3711  Int_t nent = h->GetBufferLength();
3712  PDB(kOutput,2) Info("HandleHistogram", "h:%s ent:%d, buffer size: %d",
3713  h->GetName(), nent, h->GetBufferSize());
3714 
3715  // Attach to the list in the outputlists, if any
3716  TList *list = 0;
3717  if (!fOutputLists) {
3718  PDB(kOutput,2) Info("HandleHistogram", "create fOutputLists");
3719  fOutputLists = new TList;
3721  }
3722  list = (TList *) fOutputLists->FindObject(h->GetName());
3723 
3724  TH1 *href = 0;
3725  if (tobebinned) {
3726 
3727  // The histogram needs to be projected in a reasonable range: we
3728  // do this at the end with all the histos, so we need to create
3729  // a list here
3730  if (!list) {
3731  // Create the list
3732  list = new TList;
3733  list->SetName(h->GetName());
3734  list->SetOwner();
3735  fOutputLists->Add(list);
3736  // Move in it any previously merged object from the output list
3737  if (fOutput && (href = (TH1 *) fOutput->FindObject(h->GetName()))) {
3738  fOutput->Remove(href);
3739  list->Add(href);
3740  }
3741  }
3742  TIter nxh(list);
3743  while ((href = (TH1 *) nxh())) {
3744  if (href->GetBuffer() && href->GetBufferLength() < nent) break;
3745  }
3746  if (href) {
3747  list->AddBefore(href, h);
3748  } else {
3749  list->Add(h);
3750  }
3751  // Done
3752  return (TObject *)0;
3753 
3754  } else {
3755 
3756  if (list) {
3757  TIter nxh(list);
3758  while ((href = (TH1 *) nxh())) {
3759  if (href->GetBuffer() || href->GetEntries() < nent) break;
3760  }
3761  if (href) {
3762  list->AddBefore(href, h);
3763  } else {
3764  list->Add(h);
3765  }
3766  // Done
3767  return (TObject *)0;
3768 
3769  } else {
3770  // Check if we can 'Add' the histogram to an existing one; this is more efficient
3771  // then using Merge
3772  TH1 *hout = (TH1*) fOutput->FindObject(h->GetName());
3773  if (hout) {
3774  // Remove the existing histo from the output list ...
3775  fOutput->Remove(hout);
3776  // ... and create either the list to merge in one-go at the end
3777  // (more efficient than merging one by one) or, if too big, merge
3778  // these two and start the 'one-by-one' technology
3779  Int_t hsz = h->GetNbinsX() * h->GetNbinsY() * h->GetNbinsZ();
3780  if (fMergeTH1OneByOne || (gProofServ && hsz > gProofServ->GetMsgSizeHWM())) {
3781  list = new TList;
3782  list->Add(hout);
3783  h->Merge(list);
3784  list->SetOwner();
3785  delete list;
3786  return h;
3787  } else {
3788  list = new TList;
3789  list->SetName(h->GetName());
3790  list->SetOwner();
3791  fOutputLists->Add(list);
3792  // Add the existing and the incoming histos
3793  list->Add(hout);
3794  list->Add(h);
3795  // Done
3796  return (TObject *)0;
3797  }
3798  } else {
3799  // This is the first one; add it to the output list
3800  fOutput->Add(h);
3801  return (TObject *)0;
3802  }
3803  }
3804  }
3805 }
3806 
3807 ////////////////////////////////////////////////////////////////////////////////
3808 /// Return kTRUE is the histograms 'h0' and 'h1' have the same binning and ranges
3809 /// on the axis (i.e. if they can be just Add-ed for merging).
3810 
3812 {
3813  Bool_t rc = kFALSE;
3814  if (!h0 || !h1) return rc;
3815 
3816  TAxis *a0 = 0, *a1 = 0;
3817 
3818  // Check X
3819  a0 = h0->GetXaxis();
3820  a1 = h1->GetXaxis();
3821  if (a0->GetNbins() == a1->GetNbins())
3822  if (TMath::Abs(a0->GetXmax() - a1->GetXmax()) < 1.e-9)
3823  if (TMath::Abs(a0->GetXmin() - a1->GetXmin()) < 1.e-9) rc = kTRUE;
3824 
3825  // Check Y, if needed
3826  if (h0->GetDimension() > 1) {
3827  rc = kFALSE;
3828  a0 = h0->GetYaxis();
3829  a1 = h1->GetYaxis();
3830  if (a0->GetNbins() == a1->GetNbins())
3831  if (TMath::Abs(a0->GetXmax() - a1->GetXmax()) < 1.e-9)
3832  if (TMath::Abs(a0->GetXmin() - a1->GetXmin()) < 1.e-9) rc = kTRUE;
3833  }
3834 
3835  // Check Z, if needed
3836  if (h0->GetDimension() > 2) {
3837  rc = kFALSE;
3838  a0 = h0->GetZaxis();
3839  a1 = h1->GetZaxis();
3840  if (a0->GetNbins() == a1->GetNbins())
3841  if (TMath::Abs(a0->GetXmax() - a1->GetXmax()) < 1.e-9)
3842  if (TMath::Abs(a0->GetXmin() - a1->GetXmin()) < 1.e-9) rc = kTRUE;
3843  }
3844 
3845  // Done
3846  return rc;
3847 }
3848 
3849 ////////////////////////////////////////////////////////////////////////////////
3850 /// Store received output list.
3851 
3853 {
3854  PDB(kOutput,1) Info("StoreOutput","Enter");
3855 
3856  if ( out == 0 ) {
3857  PDB(kOutput,1) Info("StoreOutput","Leave (empty)");
3858  return;
3859  }
3860 
3861  TIter next(out);
3862  out->SetOwner(kFALSE); // take ownership of the contents
3863 
3864  if (fOutputLists == 0) {
3865  PDB(kOutput,2) Info("StoreOutput","Create fOutputLists");
3866  fOutputLists = new TList;
3868  }
3869  // process eventlists first
3870  TList* lists = dynamic_cast<TList*> (out->FindObject("PROOF_EventListsList"));
3871  if (lists) {
3872  out->Remove(lists);
3873  TEventList *mainList = new TEventList("PROOF_EventList");
3874  out->Add(mainList);
3875  TIter it(lists);
3876  TEventList *aList;
3877  while ( (aList = dynamic_cast<TEventList*> (it())) ) {
3878  // find file offset
3879  TIter nxe(fDSet->GetListOfElements());
3880  TDSetElement *elem;
3881  while ( (elem = dynamic_cast<TDSetElement*> (nxe())) ) {
3882  if (strcmp(elem->GetFileName(), aList->GetName()) == 0)
3883  break;
3884  }
3885  if (!elem) {
3886  Error("StoreOutput", "found the EventList for %s, but no object with that name "
3887  "in the TDSet", aList->GetName());
3888  continue;
3889  }
3890  Long64_t offset = elem->GetTDSetOffset();
3891 
3892  // shift the list by the number of first event in that file
3893  Long64_t *arr = aList->GetList();
3894  Int_t num = aList->GetN();
3895  if (arr && offset)
3896  for (int i = 0; i < num; i++)
3897  arr[i] += offset;
3898 
3899  mainList->Add(aList); // add to the main list
3900  }
3901  delete lists;
3902  }
3903 
3904  TObject *obj;
3905  while( (obj = next()) ) {
3906  PDB(kOutput,2) Info("StoreOutput","find list for '%s'", obj->GetName() );
3907 
3908  TList *list = (TList *) fOutputLists->FindObject( obj->GetName() );
3909  if ( list == 0 ) {
3910  PDB(kOutput,2) Info("StoreOutput", "list for '%s' not found (creating)", obj->GetName());
3911  list = new TList;
3912  list->SetName( obj->GetName() );
3913  list->SetOwner();
3914  fOutputLists->Add( list );
3915  }
3916  list->Add( obj );
3917  }
3918 
3919  delete out;
3920  PDB(kOutput,1) Info("StoreOutput", "leave");
3921 }
3922 
3923 ////////////////////////////////////////////////////////////////////////////////
3924 /// Merge feedback lists.
3925 
3927 {
3928  PDB(kFeedback,1)
3929  Info("MergeFeedback","Enter");
3930 
3931  if ( fFeedbackLists == 0 ) {
3932  PDB(kFeedback,1)
3933  Info("MergeFeedback","Leave (no output)");
3934  return 0;
3935  }
3936 
3937  TList *fb = new TList; // collection of feedback objects
3938  fb->SetOwner();
3939 
3940  TIter next(fFeedbackLists);
3941 
3942  TMap *map;
3943  while ( (map = (TMap*) next()) ) {
3944 
3945  PDB(kFeedback,2)
3946  Info("MergeFeedback", "map %s size: %d", map->GetName(), map->GetSize());
3947 
3948  // turn map into list ...
3949 
3950  TList *list = new TList;
3951  TIter keys(map);
3952 
3953 #ifndef R__TH1MERGEFIXED
3954  Int_t nbmx = -1;
3955  TObject *oref = 0;
3956 #endif
3957  while ( TObject *key = keys() ) {
3958  TObject *o = map->GetValue(key);
3959  TH1 *h = dynamic_cast<TH1 *>(o);
3960 #ifndef R__TH1MERGEFIXED
3961  // Temporary fix for to cope with the problem in TH1::Merge.
3962  // We need to use a reference histo the one with the largest number
3963  // of bins so that the histos from all submasters can be correctly
3964  // fit in
3965  if (h && !strncmp(o->GetName(),"PROOF_",6)) {
3966  if (h->GetNbinsX() > nbmx) {
3967  nbmx= h->GetNbinsX();
3968  oref = o;
3969  }
3970  }
3971 #endif
3972  if (h) {
3973  TIter nxh(list);
3974  TH1 *href= 0;
3975  while ((href = (TH1 *)nxh())) {
3976  if (h->GetBuffer()) {
3977  if (href->GetBuffer() && href->GetBufferLength() < h->GetBufferLength()) break;
3978  } else {
3979  if (href->GetBuffer() || href->GetEntries() < h->GetEntries()) break;
3980  }
3981  }
3982  if (href) {
3983  list->AddBefore(href, h);
3984  } else {
3985  list->Add(h);
3986  }
3987  } else {
3988  list->Add(o);
3989  }
3990  }
3991 
3992  // clone first object, remove from list
3993 #ifdef R__TH1MERGEFIXED
3994  TObject *obj = list->First();
3995 #else
3996  TObject *obj = (oref) ? oref : list->First();
3997 #endif
3998  list->Remove(obj);
3999  obj = obj->Clone();
4000  fb->Add(obj);
4001 
4002  if ( list->IsEmpty() ) {
4003  delete list;
4004  continue;
4005  }
4006 
4007  // merge list with clone
4008  TMethodCall callEnv;
4009  if (obj->IsA())
4010  callEnv.InitWithPrototype(obj->IsA(), "Merge", "TCollection*");
4011  if (callEnv.IsValid()) {
4012  callEnv.SetParam((Long_t) list);
4013  callEnv.Execute(obj);
4014  } else {
4015  // No Merge interface, return copy of individual objects
4016  while ( (obj = list->First()) ) {
4017  fb->Add(obj->Clone());
4018  list->Remove(obj);
4019  }
4020  }
4021 
4022  delete list;
4023  }
4024 
4025  PDB(kFeedback,1)
4026  Info("MergeFeedback","Leave (%d object(s))", fb->GetSize());
4027 
4028  return fb;
4029 }
4030 
4031 ////////////////////////////////////////////////////////////////////////////////
4032 /// Store feedback results from the specified slave.
4033 
4035 {
4036  PDB(kFeedback,1)
4037  Info("StoreFeedback","Enter");
4038 
4039  if ( out == 0 ) {
4040  PDB(kFeedback,1)
4041  Info("StoreFeedback","Leave (empty)");
4042  return;
4043  }
4044 
4045  if ( IsClient() ) {
4046  // in client
4047  Feedback(out);
4048  delete out;
4049  return;
4050  }
4051 
4052  if (fFeedbackLists == 0) {
4053  PDB(kFeedback,2) Info("StoreFeedback","Create fFeedbackLists");
4054  fFeedbackLists = new TList;
4056  }
4057 
4058  TIter next(out);
4059  out->SetOwner(kFALSE); // take ownership of the contents
4060 
4061  const char *ord = ((TSlave*) slave)->GetOrdinal();
4062 
4063  TObject *obj;
4064  while( (obj = next()) ) {
4065  PDB(kFeedback,2)
4066  Info("StoreFeedback","%s: Find '%s'", ord, obj->GetName() );
4067  TMap *map = (TMap*) fFeedbackLists->FindObject(obj->GetName());
4068  if ( map == 0 ) {
4069  PDB(kFeedback,2)
4070  Info("StoreFeedback", "%s: map for '%s' not found (creating)", ord, obj->GetName());
4071  // Map must not be owner (ownership is with regards to the keys (only))
4072  map = new TMap;
4073  map->SetName(obj->GetName());
4074  fFeedbackLists->Add(map);
4075  } else {
4076  PDB(kFeedback,2)
4077  Info("StoreFeedback","%s: removing previous value", ord);
4078  if (map->GetValue(slave))
4079  delete map->GetValue(slave);
4080  map->Remove(slave);
4081  }
4082  map->Add(slave, obj);
4083  PDB(kFeedback,2)
4084  Info("StoreFeedback","%s: %s, size: %d", ord, obj->GetName(), map->GetSize());
4085  }
4086 
4087  delete out;
4088  PDB(kFeedback,1)
4089  Info("StoreFeedback","Leave");
4090 }
4091 
4092 ////////////////////////////////////////////////////////////////////////////////
4093 /// Setup reporting of feedback objects.
4094 
4096 {
4097  if (IsClient()) return; // Client does not need timer
4098 
4099  fFeedback = (TList*) fInput->FindObject("FeedbackList");
4100 
4101  PDB(kFeedback,1) Info("SetupFeedback","\"FeedbackList\" %sfound",
4102  fFeedback == 0 ? "NOT ":"");
4103 
4104  if (fFeedback == 0 || fFeedback->GetSize() == 0) return;
4105 
4106  // OK, feedback was requested, setup the timer
4108  fFeedbackPeriod = 2000;
4109  TProof::GetParameter(fInput, "PROOF_FeedbackPeriod", fFeedbackPeriod);
4110  fFeedbackTimer = new TTimer;
4111  fFeedbackTimer->SetObject(this);
4113 }
4114 
4115 ////////////////////////////////////////////////////////////////////////////////
4116 /// Stop reporting of feedback objects.
4117 
4119 {
4120  if (fFeedbackTimer == 0) return;
4121 
4122  PDB(kFeedback,1) Info("StopFeedback","Stop Timer");
4123 
4125 }
4126 
4127 ////////////////////////////////////////////////////////////////////////////////
4128 /// Send feedback objects to client.
4129 
4131 {
4132  PDB(kFeedback,2) Info("HandleTimer","Entry");
4133 
4134  if (fFeedbackTimer == 0) return kFALSE; // timer already switched off
4135 
4136  // process local feedback objects
4137 
4138  TList *fb = new TList;
4139  fb->SetOwner();
4140 
4141  TIter next(fFeedback);
4142  while( TObjString *name = (TObjString*) next() ) {
4143  TObject *o = fOutput->FindObject(name->GetName());
4144  if (o != 0) {
4145  fb->Add(o->Clone());
4146  // remove the corresponding entry from the feedback list
4147  TMap *m = 0;
4148  if (fFeedbackLists &&
4149  (m = (TMap *) fFeedbackLists->FindObject(name->GetName()))) {
4151  m->DeleteValues();
4152  delete m;
4153  }
4154  }
4155  }
4156 
4157  if (fb->GetSize() > 0) {
4158  StoreFeedback(this, fb); // adopts fb
4159  } else {
4160  delete fb;
4161  }
4162 
4163  if (fFeedbackLists == 0) {
4164  fFeedbackTimer->Start(fFeedbackPeriod, kTRUE); // maybe next time
4165  return kFALSE;
4166  }
4167 
4168  fb = MergeFeedback();
4169 
4170  PDB(kFeedback,2) Info("HandleTimer","Sending %d objects", fb->GetSize());
4171 
4173  m << fb;
4174 
4175  // send message to client;
4176  gProofServ->GetSocket()->Send(m);
4177 
4178  delete fb;
4179 
4181 
4182  return kFALSE; // ignored?
4183 }
4184 
4185 ////////////////////////////////////////////////////////////////////////////////
4186 /// Get next packet for specified slave.
4187 
4189 {
4190  // The first call to this determines the end of initialization
4191  SetInitTime();
4192 
4193  if (fProcPackets) {
4194  Int_t bin = fProcPackets->GetXaxis()->FindBin(slave->GetOrdinal());
4195  if (bin >= 0) {
4196  if (fProcPackets->GetBinContent(bin) > 0)
4197  fProcPackets->Fill(slave->GetOrdinal(), -1);
4198  }
4199  }
4200 
4201  TDSetElement *e = fPacketizer->GetNextPacket( slave, r );
4202 
4203  if (e == 0) {
4204  PDB(kPacketizer,2)
4205  Info("GetNextPacket","%s: done!", slave->GetOrdinal());
4206  } else if (e == (TDSetElement*) -1) {
4207  PDB(kPacketizer,2)
4208  Info("GetNextPacket","%s: waiting ...", slave->GetOrdinal());
4209  } else {
4210  PDB(kPacketizer,2)
4211  Info("GetNextPacket","%s (%s): '%s' '%s' '%s' %lld %lld",
4212  slave->GetOrdinal(), slave->GetName(), e->GetFileName(),
4213  e->GetDirectory(), e->GetObjName(), e->GetFirst(), e->GetNum());
4214  if (fProcPackets) fProcPackets->Fill(slave->GetOrdinal(), 1);
4215  }
4216 
4217  return e;
4218 }
4219 
4220 ////////////////////////////////////////////////////////////////////////////////
4221 /// Is the player running on the client?
4222 
4224 {
4226 }
4227 
4228 ////////////////////////////////////////////////////////////////////////////////
4229 /// Draw (support for TChain::Draw()).
4230 /// Returns -1 in case of error or number of selected events in case of success.
4231 
4233  const char *selection, Option_t *option,
4234  Long64_t nentries, Long64_t firstentry)
4235 {
4236  if (!fgDrawInputPars) {
4237  fgDrawInputPars = new THashList;
4238  fgDrawInputPars->Add(new TObjString("FeedbackList"));
4239  fgDrawInputPars->Add(new TObjString("PROOF_ChainWeight"));
4240  fgDrawInputPars->Add(new TObjString("PROOF_LineColor"));
4241  fgDrawInputPars->Add(new TObjString("PROOF_LineStyle"));
4242  fgDrawInputPars->Add(new TObjString("PROOF_LineWidth"));
4243  fgDrawInputPars->Add(new TObjString("PROOF_MarkerColor"));
4244  fgDrawInputPars->Add(new TObjString("PROOF_MarkerStyle"));
4245  fgDrawInputPars->Add(new TObjString("PROOF_MarkerSize"));
4246  fgDrawInputPars->Add(new TObjString("PROOF_FillColor"));
4247  fgDrawInputPars->Add(new TObjString("PROOF_FillStyle"));
4248  fgDrawInputPars->Add(new TObjString("PROOF_ListOfAliases"));
4249  }
4250 
4251  TString selector, objname;
4252  if (GetDrawArgs(varexp, selection, option, selector, objname) != 0) {
4253  Error("DrawSelect", "parsing arguments");
4254  return -1;
4255  }
4256 
4257  TNamed *varexpobj = new TNamed("varexp", varexp);
4258  TNamed *selectionobj = new TNamed("selection", selection);
4259 
4260  // Save the current input list
4261  TObject *o = 0;
4262  TList *savedInput = new TList;
4263  TIter nxi(fInput);
4264  while ((o = nxi())) {
4265  savedInput->Add(o);
4266  TString n(o->GetName());
4267  if (fgDrawInputPars &&
4269  !n.BeginsWith("alias:")) fInput->Remove(o);
4270  }
4271 
4272  fInput->Add(varexpobj);
4273  fInput->Add(selectionobj);
4274 
4275  // Make sure we have an object name
4276  if (objname == "") objname = "htemp";
4277 
4278  fProof->AddFeedback(objname);
4279  Long64_t r = Process(set, selector, option, nentries, firstentry);
4280  fProof->RemoveFeedback(objname);
4281 
4282  fInput->Remove(varexpobj);
4283  fInput->Remove(selectionobj);
4284  if (TNamed *opt = dynamic_cast<TNamed*> (fInput->FindObject("PROOF_OPTIONS"))) {
4285  fInput->Remove(opt);
4286  delete opt;
4287  }
4288 
4289  delete varexpobj;
4290  delete selectionobj;
4291 
4292  // Restore the input list
4293  fInput->Clear();
4294  TIter nxsi(savedInput);
4295  while ((o = nxsi()))
4296  fInput->Add(o);
4297  savedInput->SetOwner(kFALSE);
4298  delete savedInput;
4299 
4300  return r;
4301 }
4302 
4303 ////////////////////////////////////////////////////////////////////////////////
4304 /// Set init time
4305 
4307 {
4308  if (fPacketizer)
4310 }
4311 
4312 //------------------------------------------------------------------------------
4313 
4314 
4316 
4317 ////////////////////////////////////////////////////////////////////////////////
4318 /// Setup feedback.
4319 
4321 {
4322  TList *fb = (TList*) fInput->FindObject("FeedbackList");
4323  if (fb) {
4324  PDB(kFeedback,1)
4325  Info("SetupFeedback","\"FeedbackList\" found: %d objects", fb->GetSize());
4326  } else {
4327  PDB(kFeedback,1)
4328  Info("SetupFeedback","\"FeedbackList\" NOT found");
4329  }
4330 
4331  if (fb == 0 || fb->GetSize() == 0) return;
4332 
4333  // OK, feedback was requested, setup the timer
4334 
4336  fFeedbackPeriod = 2000;
4337  TProof::GetParameter(fInput, "PROOF_FeedbackPeriod", fFeedbackPeriod);
4338  fFeedbackTimer = new TTimer;
4339  fFeedbackTimer->SetObject(this);
4341 
4342  fFeedback = fb;
4343 }
4344 
4345 ////////////////////////////////////////////////////////////////////////////////
4346 /// Stop feedback.
4347 
4349 {
4350  if (fFeedbackTimer == 0) return;
4351 
4352  PDB(kFeedback,1) Info("StopFeedback","Stop Timer");
4353 
4355 }
4356 
4357 ////////////////////////////////////////////////////////////////////////////////
4358 /// Handle timer event.
4359 
4361 {
4362  PDB(kFeedback,2) Info("HandleTimer","Entry");
4363 
4364  // If in sequential (0-slave-PROOF) mode we do not have a packetizer
4365  // so we also send the info to update the progress bar.
4366  if (gProofServ) {
4367  Bool_t sendm = kFALSE;
4369  if (gProofServ->IsMaster() && !gProofServ->IsParallel()) {
4370  sendm = kTRUE;
4371  if (gProofServ->GetProtocol() > 25) {
4372  m << GetProgressStatus();
4373  } else if (gProofServ->GetProtocol() > 11) {
4375  m << fTotalEvents << ps->GetEntries() << ps->GetBytesRead()
4376  << (Float_t) -1. << (Float_t) ps->GetProcTime()
4377  << (Float_t) ps->GetRate() << (Float_t) -1.;
4378  } else {
4380  }
4381  }
4382  if (sendm) gProofServ->GetSocket()->Send(m);
4383  }
4384 
4385  if (fFeedback == 0) return kFALSE;
4386 
4387  TList *fb = new TList;
4388  fb->SetOwner(kFALSE);
4389 
4390  if (fOutput == 0) {
4392  }
4393 
4394  if (fOutput) {
4395  TIter next(fFeedback);
4396  while( TObjString *name = (TObjString*) next() ) {
4397  // TODO: find object in memory ... maybe allow only in fOutput ?
4398  TObject *o = fOutput->FindObject(name->GetName());
4399  if (o != 0) fb->Add(o);
4400  }
4401  }
4402 
4403  PDB(kFeedback,2) Info("HandleTimer","Sending %d objects", fb->GetSize());
4404 
4406  m << fb;
4407 
4408  // send message to client;
4409  if (gProofServ) gProofServ->GetSocket()->Send(m);
4410 
4411  delete fb;
4412 
4414 
4415  return kFALSE; // ignored?
4416 }
4417 
4418 ////////////////////////////////////////////////////////////////////////////////
4419 /// Handle tree header request.
4420 
4422 {
4424 
4425  TDSet *dset;
4426  (*mess) >> dset;
4427  dset->Reset();
4428  TDSetElement *e = dset->Next();
4429  Long64_t entries = 0;
4430  TFile *f = 0;
4431  TTree *t = 0;
4432  if (!e) {
4433  PDB(kGlobal, 1) Info("HandleGetTreeHeader", "empty TDSet");
4434  } else {
4435  f = TFile::Open(e->GetFileName());
4436  t = 0;
4437  if (f) {
4438  t = (TTree*) f->Get(e->GetObjName());
4439  if (t) {
4440  t->SetMaxVirtualSize(0);
4441  t->DropBaskets();
4442  entries = t->GetEntries();
4443 
4444  // compute #entries in all the files
4445  while ((e = dset->Next()) != 0) {
4446  TFile *f1 = TFile::Open(e->GetFileName());
4447  if (f1) {
4448  TTree *t1 = (TTree*) f1->Get(e->GetObjName());
4449  if (t1) {
4450  entries += t1->GetEntries();
4451  delete t1;
4452  }
4453  delete f1;
4454  }
4455  }
4456  t->SetMaxEntryLoop(entries); // this field will hold the total number of entries ;)
4457  }
4458  }
4459  }
4460  if (t)
4461  answ << TString("Success") << t;
4462  else
4463  answ << TString("Failed") << t;
4464 
4465  fSocket->Send(answ);
4466 
4467  SafeDelete(t);
4468  SafeDelete(f);
4469 }
4470 
4471 
4472 //------------------------------------------------------------------------------
4473 
4475 
4476 ////////////////////////////////////////////////////////////////////////////////
4477 /// Process specified TDSet on PROOF. Runs on super master.
4478 /// The return value is -1 in case of error and TSelector::GetStatus() in
4479 /// in case of success.
4480 
4481 Long64_t TProofPlayerSuperMaster::Process(TDSet *dset, const char *selector_file,
4482  Option_t *option, Long64_t nentries,
4483  Long64_t first)
4484 {
4486  PDB(kGlobal,1) Info("Process","Enter");
4487 
4488  TProofSuperMaster *proof = dynamic_cast<TProofSuperMaster*>(GetProof());
4489  if (!proof) return -1;
4490 
4491  delete fOutput;
4492  fOutput = new THashList;
4493 
4495 
4496  if (!SendSelector(selector_file)) {
4497  Error("Process", "sending selector %s", selector_file);
4498  return -1;
4499  }
4500 
4501  TCleanup clean(this);
4502  SetupFeedback();
4503 
4504  if (proof->IsMaster()) {
4505 
4506  // make sure the DSet is valid
4507  if (!dset->ElementsValid()) {
4508  proof->ValidateDSet(dset);
4509  if (!dset->ElementsValid()) {
4510  Error("Process", "could not validate TDSet");
4511  return -1;
4512  }
4513  }
4514 
4515  TList msds;
4516  msds.SetOwner(); // This will delete TPairs
4517 
4518  TList keyholder; // List to clean up key part of the pairs
4519  keyholder.SetOwner();
4520  TList valueholder; // List to clean up value part of the pairs
4521  valueholder.SetOwner();
4522 
4523  // Construct msd list using the slaves
4524  TIter nextslave(proof->GetListOfActiveSlaves());
4525  while (TSlave *sl = dynamic_cast<TSlave*>(nextslave())) {
4526  TList *submasters = 0;
4527  TPair *msd = dynamic_cast<TPair*>(msds.FindObject(sl->GetMsd()));
4528  if (!msd) {
4529  submasters = new TList;
4530  submasters->SetName(sl->GetMsd());
4531  keyholder.Add(submasters);
4532  TList *setelements = new TSortedList(kSortDescending);
4533  setelements->SetName(TString(sl->GetMsd())+"_Elements");
4534  valueholder.Add(setelements);
4535  msds.Add(new TPair(submasters, setelements));
4536  } else {
4537  submasters = dynamic_cast<TList*>(msd->Key());
4538  }
4539  if (submasters) submasters->Add(sl);
4540  }
4541 
4542  // Add TDSetElements to msd list
4543  Long64_t cur = 0; //start of next element
4544  TIter nextelement(dset->GetListOfElements());
4545  while (TDSetElement *elem = dynamic_cast<TDSetElement*>(nextelement())) {
4546 
4547  if (elem->GetNum()<1) continue; // get rid of empty elements
4548 
4549  if (nentries !=-1 && cur>=first+nentries) {
4550  // we are done
4551  break;
4552  }
4553 
4554  if (cur+elem->GetNum()-1<first) {
4555  //element is before first requested entry
4556  cur+=elem->GetNum();
4557  continue;
4558  }
4559 
4560  if (cur<first) {
4561  //modify element to get proper start
4562  elem->SetNum(elem->GetNum()-(first-cur));
4563  elem->SetFirst(elem->GetFirst()+first-cur);
4564  cur=first;
4565  }
4566 
4567  if (nentries==-1 || cur+elem->GetNum()<=first+nentries) {
4568  cur+=elem->GetNum();
4569  } else {
4570  //modify element to get proper end
4571  elem->SetNum(first+nentries-cur);
4572  cur=first+nentries;
4573  }
4574 
4575  TPair *msd = dynamic_cast<TPair*>(msds.FindObject(elem->GetMsd()));
4576  if (!msd) {
4577  Error("Process", "data requires mass storage domain '%s'"
4578  " which is not accessible in this proof session",
4579  elem->GetMsd());
4580  return -1;
4581  } else {
4582  TList *elements = dynamic_cast<TList*>(msd->Value());
4583  if (elements) elements->Add(elem);
4584  }
4585  }
4586 
4587  TList usedmasters;
4588  TIter nextmsd(msds.MakeIterator());
4589  while (TPair *msd = dynamic_cast<TPair*>(nextmsd())) {
4590  TList *submasters = dynamic_cast<TList*>(msd->Key());
4591  TList *setelements = dynamic_cast<TList*>(msd->Value());
4592 
4593  // distribute elements over the masters
4594  Int_t nmasters = submasters ? submasters->GetSize() : -1;
4595  Int_t nelements = setelements ? setelements->GetSize() : -1;
4596  for (Int_t i=0; i<nmasters; i++) {
4597 
4598  Long64_t nent = 0;
4599  TDSet set(dset->GetType(), dset->GetObjName(),
4600  dset->GetDirectory());
4601  for (Int_t j = (i*nelements)/nmasters;
4602  j < ((i+1)*nelements)/nmasters;
4603  j++) {
4604  TDSetElement *elem = setelements ?
4605  dynamic_cast<TDSetElement*>(setelements->At(j)) : (TDSetElement *)0;
4606  if (elem) {
4607  set.Add(elem->GetFileName(), elem->GetObjName(),
4608  elem->GetDirectory(), elem->GetFirst(),
4609  elem->GetNum(), elem->GetMsd());
4610  nent += elem->GetNum();
4611  } else {
4612  Warning("Process", "not a TDSetElement object");
4613  }
4614  }
4615 
4616  if (set.GetListOfElements()->GetSize()>0) {
4617  TMessage mesg(kPROOF_PROCESS);
4618  TString fn(gSystem->BaseName(selector_file));
4619  TString opt = option;
4620  mesg << &set << fn << fInput << opt << Long64_t(-1) << Long64_t(0);
4621 
4622  TSlave *sl = dynamic_cast<TSlave*>(submasters->At(i));
4623  if (sl) {
4624  PDB(kGlobal,1) Info("Process",
4625  "Sending TDSet with %d elements to submaster %s",
4626  set.GetListOfElements()->GetSize(),
4627  sl->GetOrdinal());
4628  sl->GetSocket()->Send(mesg);
4629  usedmasters.Add(sl);
4630 
4631  // setup progress info
4632  fSlaves.AddLast(sl);
4636  fSlaveTotals[fSlaveTotals.GetSize()-1] = nent;
4644  fSlaveEvtRti[fSlaveEvtRti.GetSize()-1] = -1.;
4646  fSlaveMBRti[fSlaveMBRti.GetSize()-1] = -1.;
4648  fSlaveActW[fSlaveActW.GetSize()-1] = 0;
4650  fSlaveTotS[fSlaveTotS.GetSize()-1] = 0;
4652  fSlaveEffS[fSlaveEffS.GetSize()-1] = 0.;
4653  } else {
4654  Warning("Process", "not a TSlave object");
4655  }
4656  }
4657  }
4658  }
4659 
4660  if ( !IsClient() ) HandleTimer(0);
4661  PDB(kGlobal,1) Info("Process","Calling Collect");
4662  proof->Collect(&usedmasters);
4663  HandleTimer(0);
4664 
4665  }
4666 
4667  StopFeedback();
4668 
4669  PDB(kGlobal,1) Info("Process","Calling Merge Output");
4670  MergeOutput();
4671 
4672  TPerfStats::Stop();
4673 
4674  return 0;
4675 }
4676 
4677 ////////////////////////////////////////////////////////////////////////////////
4678 /// Report progress.
4679 
4681 {
4682  Int_t idx = fSlaves.IndexOf(sl);
4683  fSlaveProgress[idx] = processed;
4684  if (fSlaveTotals[idx] != total)
4685  Warning("Progress", "total events has changed for slave %s", sl->GetName());
4686  fSlaveTotals[idx] = total;
4687 
4688  Long64_t tot = 0;
4689  Int_t i;
4690  for (i = 0; i < fSlaveTotals.GetSize(); i++) tot += fSlaveTotals[i];
4691  Long64_t proc = 0;
4692  for (i = 0; i < fSlaveProgress.GetSize(); i++) proc += fSlaveProgress[i];
4693 
4694  Progress(tot, proc);
4695 }
4696 
4697 ////////////////////////////////////////////////////////////////////////////////
4698 /// Report progress.
4699 
4701  Long64_t processed, Long64_t bytesread,
4702  Float_t initTime, Float_t procTime,
4703  Float_t evtrti, Float_t mbrti)
4704 {
4705  PDB(kGlobal,2)
4706  Info("Progress","%s: %lld %lld %f %f %f %f", sl->GetName(),
4707  processed, bytesread, initTime, procTime, evtrti, mbrti);
4708 
4709  Int_t idx = fSlaves.IndexOf(sl);
4710  if (fSlaveTotals[idx] != total)
4711  Warning("Progress", "total events has changed for slave %s", sl->GetName());
4712  fSlaveTotals[idx] = total;
4713  fSlaveProgress[idx] = processed;
4714  fSlaveBytesRead[idx] = bytesread;
4715  fSlaveInitTime[idx] = (initTime > -1.) ? initTime : fSlaveInitTime[idx];
4716  fSlaveProcTime[idx] = (procTime > -1.) ? procTime : fSlaveProcTime[idx];
4717  fSlaveEvtRti[idx] = (evtrti > -1.) ? evtrti : fSlaveEvtRti[idx];
4718  fSlaveMBRti[idx] = (mbrti > -1.) ? mbrti : fSlaveMBRti[idx];
4719 
4720  Int_t i;
4721  Long64_t tot = 0;
4722  Long64_t proc = 0;
4723  Long64_t bytes = 0;
4724  Float_t init = -1.;
4725  Float_t ptime = -1.;
4726  Float_t erti = 0.;
4727  Float_t srti = 0.;
4728  Int_t nerti = 0;
4729  Int_t nsrti = 0;
4730  for (i = 0; i < fSlaveTotals.GetSize(); i++) {
4731  tot += fSlaveTotals[i];
4732  if (i < fSlaveProgress.GetSize())
4733  proc += fSlaveProgress[i];
4734  if (i < fSlaveBytesRead.GetSize())
4735  bytes += fSlaveBytesRead[i];
4736  if (i < fSlaveInitTime.GetSize())
4737  if (fSlaveInitTime[i] > -1. && (init < 0. || fSlaveInitTime[i] < init))
4738  init = fSlaveInitTime[i];
4739  if (i < fSlaveProcTime.GetSize())
4740  if (fSlaveProcTime[i] > -1. && (ptime < 0. || fSlaveProcTime[i] > ptime))
4741  ptime = fSlaveProcTime[i];
4742  if (i < fSlaveEvtRti.GetSize())
4743  if (fSlaveEvtRti[i] > -1.) {
4744  erti += fSlaveEvtRti[i];
4745  nerti++;
4746  }
4747  if (i < fSlaveMBRti.GetSize())
4748  if (fSlaveMBRti[i] > -1.) {
4749  srti += fSlaveMBRti[i];
4750  nsrti++;
4751  }
4752  }
4753  srti = (nsrti > 0) ? srti / nerti : 0.;
4754 
4755  Progress(tot, proc, bytes, init, ptime, erti, srti);
4756 }
4757 
4758 ////////////////////////////////////////////////////////////////////////////////
4759 /// Progress signal.
4760 
4762 {
4763  if (pi) {
4764  PDB(kGlobal,2)
4765  Info("Progress","%s: %lld %lld %lld %f %f %f %f %d %f", wrk->GetOrdinal(),
4766  pi->fTotal, pi->fProcessed, pi->fBytesRead,
4767  pi->fInitTime, pi->fProcTime, pi->fEvtRateI, pi->fMBRateI,
4768  pi->fActWorkers, pi->fEffSessions);
4769 
4770  Int_t idx = fSlaves.IndexOf(wrk);
4771  if (fSlaveTotals[idx] != pi->fTotal)
4772  Warning("Progress", "total events has changed for worker %s", wrk->GetName());
4773  fSlaveTotals[idx] = pi->fTotal;
4774  fSlaveProgress[idx] = pi->fProcessed;
4775  fSlaveBytesRead[idx] = pi->fBytesRead;
4776  fSlaveInitTime[idx] = (pi->fInitTime > -1.) ? pi->fInitTime : fSlaveInitTime[idx];
4777  fSlaveProcTime[idx] = (pi->fProcTime > -1.) ? pi->fProcTime : fSlaveProcTime[idx];
4778  fSlaveEvtRti[idx] = (pi->fEvtRateI > -1.) ? pi->fEvtRateI : fSlaveEvtRti[idx];
4779  fSlaveMBRti[idx] = (pi->fMBRateI > -1.) ? pi->fMBRateI : fSlaveMBRti[idx];
4780  fSlaveActW[idx] = (pi->fActWorkers > -1) ? pi->fActWorkers : fSlaveActW[idx];
4781  fSlaveTotS[idx] = (pi->fTotSessions > -1) ? pi->fTotSessions : fSlaveTotS[idx];
4782  fSlaveEffS[idx] = (pi->fEffSessions > -1.) ? pi->fEffSessions : fSlaveEffS[idx];
4783 
4784  Int_t i;
4785  Int_t nerti = 0;
4786  Int_t nsrti = 0;
4787  TProofProgressInfo pisum(0, 0, 0, -1., -1., 0., 0., 0, 0, 0.);
4788  for (i = 0; i < fSlaveTotals.GetSize(); i++) {
4789  pisum.fTotal += fSlaveTotals[i];
4790  if (i < fSlaveProgress.GetSize())
4791  pisum.fProcessed += fSlaveProgress[i];
4792  if (i < fSlaveBytesRead.GetSize())
4793  pisum.fBytesRead += fSlaveBytesRead[i];
4794  if (i < fSlaveInitTime.GetSize())
4795  if (fSlaveInitTime[i] > -1. && (pisum.fInitTime < 0. || fSlaveInitTime[i] < pisum.fInitTime))
4796  pisum.fInitTime = fSlaveInitTime[i];
4797  if (i < fSlaveProcTime.GetSize())
4798  if (fSlaveProcTime[i] > -1. && (pisum.fProcTime < 0. || fSlaveProcTime[i] > pisum.fProcTime))
4799  pisum.fProcTime = fSlaveProcTime[i];
4800  if (i < fSlaveEvtRti.GetSize())
4801  if (fSlaveEvtRti[i] > -1.) {
4802  pisum.fEvtRateI += fSlaveEvtRti[i];
4803  nerti++;
4804  }
4805  if (i < fSlaveMBRti.GetSize())
4806  if (fSlaveMBRti[i] > -1.) {
4807  pisum.fMBRateI += fSlaveMBRti[i];
4808  nsrti++;
4809  }
4810  if (i < fSlaveActW.GetSize())
4811  pisum.fActWorkers += fSlaveActW[i];
4812  if (i < fSlaveTotS.GetSize())
4813  if (fSlaveTotS[i] > -1 && (pisum.fTotSessions < 0. || fSlaveTotS[i] > pisum.fTotSessions))
4814  pisum.fTotSessions = fSlaveTotS[i];
4815  if (i < fSlaveEffS.GetSize())
4816  if (fSlaveEffS[i] > -1. && (pisum.fEffSessions < 0. || fSlaveEffS[i] > pisum.fEffSessions))
4817  pisum.fEffSessions = fSlaveEffS[i];
4818  }
4819  pisum.fMBRateI = (nsrti > 0) ? pisum.fMBRateI / nerti : 0.;
4820 
4821  Progress(&pisum);
4822  }
4823 }
4824 
4825 ////////////////////////////////////////////////////////////////////////////////
4826 /// Send progress and feedback to client.
4827 
4829 {
4830  if (fFeedbackTimer == 0) return kFALSE; // timer stopped already
4831 
4832  Int_t i;
4833  Long64_t tot = 0;
4834  Long64_t proc = 0;
4835  Long64_t bytes = 0;
4836  Float_t init = -1.;
4837  Float_t ptime = -1.;
4838  Float_t erti = 0.;
4839  Float_t srti = 0.;
4840  Int_t nerti = 0;
4841  Int_t nsrti = 0;
4842  for (i = 0; i < fSlaveTotals.GetSize(); i++) {
4843  tot += fSlaveTotals[i];
4844  if (i < fSlaveProgress.GetSize())
4845  proc += fSlaveProgress[i];
4846  if (i < fSlaveBytesRead.GetSize())
4847  bytes += fSlaveBytesRead[i];
4848  if (i < fSlaveInitTime.GetSize())
4849  if (fSlaveInitTime[i] > -1. && (