Logo ROOT   6.16/01
Reference Guide
TProofPlayer.cxx
Go to the documentation of this file.
1// @(#)root/proofplayer:$Id$
2// Author: Maarten Ballintijn 07/01/02
3
4/*************************************************************************
5 * Copyright (C) 1995-2001, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12/** \class TProofPlayer
13\ingroup proofkernel
14
15Internal class steering processing in PROOF.
16Instances of the TProofPlayer class are created on the worker nodes
17per session and do the processing.
18Instances of its subclass - TProofPlayerRemote are created per each
19query on the master(s) and on the client. On the master(s),
20TProofPlayerRemote coordinate processing, check the dataset, create
21the packetizer and take care of merging the results of the workers.
22The instance on the client collects information on the input
23(dataset and selector), it invokes the Begin() method and finalizes
24the query by calling Terminate().
25
26*/
27
28#include "TProofDraw.h"
29#include "TProofPlayer.h"
30#include "THashList.h"
31#include "TEnv.h"
32#include "TEventIter.h"
33#include "TVirtualPacketizer.h"
34#include "TSelector.h"
35#include "TSocket.h"
36#include "TProofServ.h"
37#include "TProof.h"
38#include "TProofOutputFile.h"
39#include "TProofSuperMaster.h"
40#include "TSlave.h"
41#include "TClass.h"
42#include "TROOT.h"
43#include "TError.h"
44#include "TException.h"
45#include "MessageTypes.h"
46#include "TMessage.h"
47#include "TDSetProxy.h"
48#include "TString.h"
49#include "TSystem.h"
50#include "TFile.h"
51#include "TFileCollection.h"
52#include "TFileInfo.h"
53#include "TFileMerger.h"
54#include "TProofDebug.h"
55#include "TTimer.h"
56#include "TMap.h"
57#include "TPerfStats.h"
58#include "TStatus.h"
59#include "TEventList.h"
60#include "TProofLimitsFinder.h"
61#include "THashList.h"
62#include "TSortedList.h"
63#include "TTree.h"
64#include "TEntryList.h"
65#include "TDSet.h"
66#include "TDrawFeedback.h"
67#include "TNamed.h"
68#include "TObjString.h"
69#include "TQueryResult.h"
70#include "TMD5.h"
71#include "TMethodCall.h"
72#include "TObjArray.h"
73#include "TH1.h"
74#include "TVirtualMonitoring.h"
75#include "TParameter.h"
77#include "TStopwatch.h"
78
79// Timeout exception
80#define kPEX_STOPPED 1001
81#define kPEX_ABORTED 1002
82
83// To flag an abort condition: use a local static variable to avoid
84// warnings about problems with longjumps
86
87class TAutoBinVal : public TNamed {
88private:
89 Double_t fXmin, fXmax, fYmin, fYmax, fZmin, fZmax;
90
91public:
92 TAutoBinVal(const char *name, Double_t xmin, Double_t xmax, Double_t ymin,
93 Double_t ymax, Double_t zmin, Double_t zmax) : TNamed(name,"")
94 {
95 fXmin = xmin; fXmax = xmax;
96 fYmin = ymin; fYmax = ymax;
97 fZmin = zmin; fZmax = zmax;
98 }
99 void GetAll(Double_t& xmin, Double_t& xmax, Double_t& ymin,
100 Double_t& ymax, Double_t& zmin, Double_t& zmax)
101 {
102 xmin = fXmin; xmax = fXmax;
103 ymin = fYmin; ymax = fYmax;
104 zmin = fZmin; zmax = fZmax;
105 }
106
107};
108
109//
110// Special timer to dispatch pending events while processing
111////////////////////////////////////////////////////////////////////////////////
112
113class TDispatchTimer : public TTimer {
114private:
115 TProofPlayer *fPlayer;
116
117public:
118 TDispatchTimer(TProofPlayer *p) : TTimer(1000, kFALSE), fPlayer(p) { }
119
120 Bool_t Notify();
121};
122////////////////////////////////////////////////////////////////////////////////
123/// Handle expiration of the timer associated with dispatching pending
124/// events while processing. We must act as fast as possible here, so
125/// we just set a flag submitting a request for dispatching pending events
126
127Bool_t TDispatchTimer::Notify()
128{
129 if (gDebug > 0) printf("TDispatchTimer::Notify: called!\n");
130
131 fPlayer->SetBit(TProofPlayer::kDispatchOneEvent);
132
133 // Needed for the next shot
134 Reset();
135 return kTRUE;
136}
137
138//
139// Special timer to notify reach of max packet proc time
140////////////////////////////////////////////////////////////////////////////////
141
142class TProctimeTimer : public TTimer {
143private:
144 TProofPlayer *fPlayer;
145
146public:
147 TProctimeTimer(TProofPlayer *p, Long_t to) : TTimer(to, kFALSE), fPlayer(p) { }
148
149 Bool_t Notify();
150};
151////////////////////////////////////////////////////////////////////////////////
152/// Handle expiration of the timer associated with dispatching pending
153/// events while processing. We must act as fast as possible here, so
154/// we just set a flag submitting a request for dispatching pending events
155
156Bool_t TProctimeTimer::Notify()
157{
158 if (gDebug > 0) printf("TProctimeTimer::Notify: called!\n");
159
160 fPlayer->SetBit(TProofPlayer::kMaxProcTimeReached);
161
162 // One shot only
163 return kTRUE;
164}
165
166//
167// Special timer to handle stop/abort request via exception raising
168////////////////////////////////////////////////////////////////////////////////
169
170class TStopTimer : public TTimer {
171private:
172 Bool_t fAbort;
173 TProofPlayer *fPlayer;
174
175public:
176 TStopTimer(TProofPlayer *p, Bool_t abort, Int_t to);
177
178 Bool_t Notify();
179};
180
181////////////////////////////////////////////////////////////////////////////////
182/// Constructor for the timer to stop/abort processing.
183/// The 'timeout' is in seconds.
184/// Make sure that 'to' make sense, i.e. not larger than 10 days;
185/// the minimum value is 10 ms (0 does not seem to start the timer ...).
186
187TStopTimer::TStopTimer(TProofPlayer *p, Bool_t abort, Int_t to)
188 : TTimer(((to <= 0 || to > 864000) ? 10 : to * 1000), kFALSE)
189{
190 if (gDebug > 0)
191 Info ("TStopTimer","enter: %d, timeout: %d", abort, to);
192
193 fPlayer = p;
194 fAbort = abort;
195
196 if (gDebug > 1)
197 Info ("TStopTimer","timeout set to %s ms", fTime.AsString());
198}
199
200////////////////////////////////////////////////////////////////////////////////
201/// Handle the signal coming from the expiration of the timer
202/// associated with an abort or stop request.
203/// We raise an exception which will be processed in the
204/// event loop.
205
206Bool_t TStopTimer::Notify()
207{
208 if (gDebug > 0) printf("TStopTimer::Notify: called!\n");
209
210 if (fAbort)
212 else
214
215 return kTRUE;
216}
217
218//------------------------------------------------------------------------------
219
221
223
224////////////////////////////////////////////////////////////////////////////////
225/// Default ctor.
226
228 : fAutoBins(0), fOutput(0), fSelector(0), fCreateSelObj(kTRUE), fSelectorClass(0),
229 fFeedbackTimer(0), fFeedbackPeriod(2000),
230 fEvIter(0), fSelStatus(0),
231 fTotalEvents(0), fReadBytesRun(0), fReadCallsRun(0), fProcessedRun(0),
232 fQueryResults(0), fQuery(0), fPreviousQuery(0), fDrawQueries(0),
233 fMaxDrawQueries(1), fStopTimer(0), fDispatchTimer(0),
234 fProcTimeTimer(0), fProcTime(0),
235 fOutputFile(0),
236 fSaveMemThreshold(-1), fSavePartialResults(kFALSE), fSaveResultsPerPacket(kFALSE)
237{
238 fInput = new TList;
245
246 static Bool_t initLimitsFinder = kFALSE;
247 if (!initLimitsFinder && gProofServ && !gProofServ->IsMaster()) {
249 initLimitsFinder = kTRUE;
250 }
251
252}
253
254////////////////////////////////////////////////////////////////////////////////
255/// Destructor.
256
258{
259 fInput->Clear("nodelete");
261 // The output list is owned by fSelector and destroyed in there
270}
271
272////////////////////////////////////////////////////////////////////////////////
273/// Set processing bit according to 'on'
274
276{
277 if (on)
279 else
281}
282
283////////////////////////////////////////////////////////////////////////////////
284/// Stop the process after this event. If timeout is positive, start
285/// a timer firing after timeout seconds to hard-stop time-expensive
286/// events.
287
289{
290 if (gDebug > 0)
291 Info ("StopProcess","abort: %d, timeout: %d", abort, timeout);
292
293 if (fEvIter != 0)
294 fEvIter->StopProcess(abort);
295 Long_t to = 1;
296 if (abort == kTRUE) {
298 } else {
300 to = timeout;
301 }
302 // Start countdown, if needed
303 if (to > 0)
304 SetStopTimer(kTRUE, abort, to);
305}
306
307////////////////////////////////////////////////////////////////////////////////
308/// Enable/disable the timer to dispatch pening events while processing.
309
311{
314 if (on) {
315 fDispatchTimer = new TDispatchTimer(this);
317 }
318}
319
320////////////////////////////////////////////////////////////////////////////////
321/// Enable/disable the timer to stop/abort processing.
322/// The 'timeout' is in seconds.
323
325{
326 std::lock_guard<std::mutex> lock(fStopTimerMtx);
327
328 // Clean-up the timer
330 if (on) {
331 // create timer
332 fStopTimer = new TStopTimer(this, abort, timeout);
333 // Start the countdown
334 fStopTimer->Start();
335 if (gDebug > 0)
336 Info ("SetStopTimer", "%s timer STARTED (timeout: %d)",
337 (abort ? "ABORT" : "STOP"), timeout);
338 } else {
339 if (gDebug > 0)
340 Info ("SetStopTimer", "timer STOPPED");
341 }
342}
343
344////////////////////////////////////////////////////////////////////////////////
345/// Add query result to the list, making sure that there are no
346/// duplicates.
347
349{
350 if (!q) {
351 Warning("AddQueryResult","query undefined - do nothing");
352 return;
353 }
354
355 // Treat differently normal and draw queries
356 if (!(q->IsDraw())) {
357 if (!fQueryResults) {
358 fQueryResults = new TList;
360 } else {
361 TIter nxr(fQueryResults);
362 TQueryResult *qr = 0;
363 TQueryResult *qp = 0;
364 while ((qr = (TQueryResult *) nxr())) {
365 // If same query, remove old version and break
366 if (*qr == *q) {
368 delete qr;
369 break;
370 }
371 // Record position according to start time
372 if (qr->GetStartTime().Convert() <= q->GetStartTime().Convert())
373 qp = qr;
374 }
375
376 if (!qp) {
378 } else {
380 }
381 }
382 } else if (IsClient()) {
383 // If max reached, eliminate first the oldest one
385 TIter nxr(fQueryResults);
386 TQueryResult *qr = 0;
387 while ((qr = (TQueryResult *) nxr())) {
388 // If same query, remove old version and break
389 if (qr->IsDraw()) {
390 fDrawQueries--;
392 delete qr;
393 break;
394 }
395 }
396 }
397 // Add new draw query
399 fDrawQueries++;
400 if (!fQueryResults)
401 fQueryResults = new TList;
403 }
404 }
405}
406
407////////////////////////////////////////////////////////////////////////////////
408/// Remove all query result instances referenced 'ref' from
409/// the list of results.
410
412{
413 if (fQueryResults) {
414 TIter nxq(fQueryResults);
415 TQueryResult *qr = 0;
416 while ((qr = (TQueryResult *) nxq())) {
417 if (qr->Matches(ref)) {
419 delete qr;
420 }
421 }
422 }
423}
424
425////////////////////////////////////////////////////////////////////////////////
426/// Get query result instances referenced 'ref' from
427/// the list of results.
428
430{
431 if (fQueryResults) {
432 if (ref && strlen(ref) > 0) {
433 TIter nxq(fQueryResults);
434 TQueryResult *qr = 0;
435 while ((qr = (TQueryResult *) nxq())) {
436 if (qr->Matches(ref))
437 return qr;
438 }
439 } else {
440 // Get last
441 return (TQueryResult *) fQueryResults->Last();
442 }
443 }
444
445 // Nothing found
446 return (TQueryResult *)0;
447}
448
449////////////////////////////////////////////////////////////////////////////////
450/// Set current query and save previous value.
451
453{
455 fQuery = q;
456}
457
458////////////////////////////////////////////////////////////////////////////////
459/// Add object to input list.
460
462{
463 fInput->Add(inp);
464}
465
466////////////////////////////////////////////////////////////////////////////////
467/// Clear input list.
468
470{
471 fInput->Clear();
472}
473
474////////////////////////////////////////////////////////////////////////////////
475/// Get output object by name.
476
478{
479 if (fOutput)
480 return fOutput->FindObject(name);
481 return 0;
482}
483
484////////////////////////////////////////////////////////////////////////////////
485/// Get output list.
486
488{
489 TList *ol = fOutput;
490 if (!ol && fQuery)
491 ol = fQuery->GetOutputList();
492 return ol;
493}
494
495////////////////////////////////////////////////////////////////////////////////
496/// Reinitialize fSelector using the selector files in the query result.
497/// Needed when Finalize is called after a Process execution for the same
498/// selector name.
499
501{
502 Int_t rc = 0;
503
504 // Make sure we have a query
505 if (!qr) {
506 Info("ReinitSelector", "query undefined - do nothing");
507 return -1;
508 }
509
510 // Selector name
511 TString selec = qr->GetSelecImp()->GetName();
512 if (selec.Length() <= 0) {
513 Info("ReinitSelector", "selector name undefined - do nothing");
514 return -1;
515 }
516
517 // Find out if this is a standard selection used for Draw actions
518 Bool_t stdselec = TSelector::IsStandardDraw(selec);
519
520 // Find out if this is a precompiled selector: in such a case we do not
521 // have the code in TMacros, so we must rely on local libraries
522 Bool_t compselec = (selec.Contains(".") || stdselec) ? kFALSE : kTRUE;
523
524 // If not, find out if it needs to be expanded
525 TString ipathold;
526 if (!stdselec && !compselec) {
527 // Check checksums for the versions of the selector files
528 Bool_t expandselec = kTRUE;
529 TString dir, ipath;
530 char *selc = gSystem->Which(TROOT::GetMacroPath(), selec, kReadPermission);
531 if (selc) {
532 // Check checksums
533 TMD5 *md5icur = 0, *md5iold = 0, *md5hcur = 0, *md5hold = 0;
534 // Implementation files
535 md5icur = TMD5::FileChecksum(selc);
536 md5iold = qr->GetSelecImp()->Checksum();
537 // Header files
538 TString selh(selc);
539 Int_t dot = selh.Last('.');
540 if (dot != kNPOS) selh.Remove(dot);
541 selh += ".h";
543 md5hcur = TMD5::FileChecksum(selh);
544 md5hold = qr->GetSelecHdr()->Checksum();
545
546 // If nothing has changed nothing to do
547 if (md5hcur && md5hold && md5icur && md5iold)
548 if (*md5hcur == *md5hold && *md5icur == *md5iold)
549 expandselec = kFALSE;
550
551 SafeDelete(md5icur);
552 SafeDelete(md5hcur);
553 SafeDelete(md5iold);
554 SafeDelete(md5hold);
555 if (selc) delete [] selc;
556 }
557
558 Bool_t ok = kTRUE;
559 // Expand selector files, if needed
560 if (expandselec) {
561
562 ok = kFALSE;
563 // Expand files in a temporary directory
564 TUUID u;
565 dir = Form("%s/%s",gSystem->TempDirectory(),u.AsString());
566 if (!(gSystem->MakeDirectory(dir))) {
567
568 // Export implementation file
569 selec = Form("%s/%s",dir.Data(),selec.Data());
570 qr->GetSelecImp()->SaveSource(selec);
571
572 // Export header file
573 TString seleh = Form("%s/%s",dir.Data(),qr->GetSelecHdr()->GetName());
574 qr->GetSelecHdr()->SaveSource(seleh);
575
576 // Adjust include path
577 ipathold = gSystem->GetIncludePath();
578 ipath = Form("-I%s %s", dir.Data(), gSystem->GetIncludePath());
579 gSystem->SetIncludePath(ipath.Data());
580
581 ok = kTRUE;
582 }
583 }
584 TString opt(qr->GetOptions());
585 Ssiz_t id = opt.Last('#');
586 if (id != kNPOS && id < opt.Length() - 1)
587 selec += opt(id + 1, opt.Length());
588
589 if (!ok) {
590 Info("ReinitSelector", "problems locating or exporting selector files");
591 return -1;
592 }
593 }
594
595 // Cleanup previous stuff
597 fSelectorClass = 0;
598
599 // Init the selector now
600 Int_t iglevelsave = gErrorIgnoreLevel;
601 if (compselec)
602 // Silent error printout on first attempt
604
605 if ((fSelector = TSelector::GetSelector(selec))) {
606 if (compselec)
607 gErrorIgnoreLevel = iglevelsave; // restore ignore level
608 fSelectorClass = fSelector->IsA();
610
611 } else {
612 if (compselec) {
613 gErrorIgnoreLevel = iglevelsave; // restore ignore level
614 // Retry by loading first the libraries listed in TQueryResult, if any
615 if (strlen(qr->GetLibList()) > 0) {
616 TString sl(qr->GetLibList());
617 TObjArray *oa = sl.Tokenize(" ");
618 if (oa) {
619 Bool_t retry = kFALSE;
620 TIter nxl(oa);
621 TObjString *os = 0;
622 while ((os = (TObjString *) nxl())) {
623 TString lib = gSystem->BaseName(os->GetName());
624 if (lib != "lib") {
625 lib.ReplaceAll("-l", "lib");
626 if (gSystem->Load(lib) == 0)
627 retry = kTRUE;
628 }
629 }
630 // Retry now, if the case
631 if (retry)
633 }
634 }
635 }
636 if (!fSelector) {
637 if (compselec)
638 Info("ReinitSelector", "compiled selector re-init failed:"
639 " automatic reload unsuccessful:"
640 " please load manually the correct library");
641 rc = -1;
642 }
643 }
644 if (fSelector) {
645 // Draw needs to reinit temp histos
647 if (stdselec) {
648 ((TProofDraw *)fSelector)->DefVar();
649 } else {
650 // variables may have been initialized in Begin()
651 fSelector->Begin(0);
652 }
653 }
654
655 // Restore original include path, if needed
656 if (ipathold.Length() > 0)
657 gSystem->SetIncludePath(ipathold.Data());
658
659 return rc;
660}
661
662////////////////////////////////////////////////////////////////////////////////
663/// Incorporate output object (may not be used in this class).
664
666{
667 MayNotUse("AddOutputObject");
668 return -1;
669}
670
671////////////////////////////////////////////////////////////////////////////////
672/// Incorporate output list (may not be used in this class).
673
675{
676 MayNotUse("AddOutput");
677}
678
679////////////////////////////////////////////////////////////////////////////////
680/// Store output list (may not be used in this class).
681
683{
684 MayNotUse("StoreOutput");
685}
686
687////////////////////////////////////////////////////////////////////////////////
688/// Store feedback list (may not be used in this class).
689
691{
692 MayNotUse("StoreFeedback");
693}
694
695////////////////////////////////////////////////////////////////////////////////
696/// Report progress (may not be used in this class).
697
698void TProofPlayer::Progress(Long64_t /*total*/, Long64_t /*processed*/)
699{
700 MayNotUse("Progress");
701}
702
703////////////////////////////////////////////////////////////////////////////////
704/// Report progress (may not be used in this class).
705
706void TProofPlayer::Progress(Long64_t /*total*/, Long64_t /*processed*/,
707 Long64_t /*bytesread*/,
708 Float_t /*evtRate*/, Float_t /*mbRate*/,
709 Float_t /*evtrti*/, Float_t /*mbrti*/)
710{
711 MayNotUse("Progress");
712}
713
714////////////////////////////////////////////////////////////////////////////////
715/// Report progress (may not be used in this class).
716
718{
719 MayNotUse("Progress");
720}
721
722////////////////////////////////////////////////////////////////////////////////
723/// Set feedback list (may not be used in this class).
724
726{
727 MayNotUse("Feedback");
728}
729
730////////////////////////////////////////////////////////////////////////////////
731/// Draw feedback creation proxy. When accessed via TProof avoids
732/// link dependency on libProofPlayer.
733
735{
736 return new TDrawFeedback(p);
737}
738
739////////////////////////////////////////////////////////////////////////////////
740/// Set draw feedback option.
741
743{
744 if (f)
745 f->SetOption(opt);
746}
747
748////////////////////////////////////////////////////////////////////////////////
749/// Delete draw feedback object.
750
752{
753 delete f;
754}
755
756////////////////////////////////////////////////////////////////////////////////
757/// Save the partial results of this query to a dedicated file under the user
758/// data directory. The file name has the form
759/// <session_tag>.q<query_seq_num>.root
760/// The file pat and the file are created if not existing already.
761/// Only objects in the outputlist not being TProofOutputFile are saved.
762/// The packets list 'packets' is saved if given.
763/// Trees not attached to any file are attached to the open file.
764/// If 'queryend' is kTRUE evrything is written out (TTrees included).
765/// The actual saving action is controlled by 'force' and by fSavePartialResults /
766/// fSaveResultsPerPacket:
767///
768/// fSavePartialResults = kFALSE/kTRUE no-saving/saving
769/// fSaveResultsPerPacket = kFALSE/kTRUE save-per-query/save-per-packet
770///
771/// The function CheckMemUsage sets fSavePartialResults = 1 if fSaveMemThreshold > 0 and
772/// ProcInfo_t::fMemResident >= fSaveMemThreshold: from that point on partial results
773/// are always saved and expensive calls to TSystem::GetProcInfo saved.
774/// The switch fSaveResultsPerPacket is instead controlled by the user or admin
775/// who can also force saving in all cases; parameter PROOF_SavePartialResults or
776/// RC env ProofPlayer.SavePartialResults .
777/// However, if 'force' is kTRUE, fSavePartialResults and fSaveResultsPerPacket
778/// are ignored.
779/// Return -1 in case of problems, 0 otherwise.
780
782{
783 Bool_t save = (force || (fSavePartialResults &&
784 (queryend || fSaveResultsPerPacket))) ? kTRUE : kFALSE;
785 if (!save) {
786 PDB(kOutput, 2)
787 Info("SavePartialResults", "partial result saving disabled");
788 return 0;
789 }
790
791 // Sanity check
792 if (!gProofServ) {
793 Error("SavePartialResults", "gProofServ undefined: something really wrong going on!!!");
794 return -1;
795 }
796 if (!fOutput) {
797 Error("SavePartialResults", "fOutput undefined: something really wrong going on!!!");
798 return -1;
799 }
800
801 PDB(kOutput, 1)
802 Info("SavePartialResults", "start saving partial results {%d,%d,%d,%d}",
804
805 // Get list of processed packets from the iterator
806 PDB(kOutput, 2) Info("SavePartialResults", "fEvIter: %p", fEvIter);
807
808 TList *packets = (fEvIter) ? fEvIter->GetPackets() : 0;
809 PDB(kOutput, 2) Info("SavePartialResults", "list of packets: %p, sz: %d",
810 packets, (packets ? packets->GetSize(): -1));
811
812 // Open the file
813 const char *oopt = "UPDATE";
814 // Check if the file has already been defined
815 TString baseName(fOutputFilePath);
816 if (fOutputFilePath.IsNull()) {
817 baseName.Form("output-%s.q%d.root", gProofServ->GetTopSessionTag(), gProofServ->GetQuerySeqNum());
818 if (gProofServ->GetDataDirOpts() && strlen(gProofServ->GetDataDirOpts()) > 0) {
819 fOutputFilePath.Form("%s/%s?%s", gProofServ->GetDataDir(), baseName.Data(),
821 } else {
822 fOutputFilePath.Form("%s/%s", gProofServ->GetDataDir(), baseName.Data());
823 }
824 Info("SavePartialResults", "file with (partial) output: '%s'", fOutputFilePath.Data());
825 oopt = "RECREATE";
826 }
827 // Open the file in write mode
828 if (!(fOutputFile = TFile::Open(fOutputFilePath, oopt)) ||
830 Error("SavePartialResults", "cannot open '%s' for writing", fOutputFilePath.Data());
832 return -1;
833 }
834
835 // Save current directory
836 TDirectory *curdir = gDirectory;
837 fOutputFile->cd();
838
839 // Write first the packets list, if required
840 if (packets) {
841 TDirectory *packetsDir = fOutputFile->mkdir("packets");
842 if (packetsDir) packetsDir->cd();
844 fOutputFile->cd();
845 }
846
847 Bool_t notempty = kFALSE;
848 // Write out the output list
849 TList torm;
850 TIter nxo(fOutput);
851 TObject *o = 0;
852 while ((o = nxo())) {
853 // Skip output file drivers
854 if (o->InheritsFrom(TProofOutputFile::Class())) continue;
855 // Skip control objets
856 if (!strncmp(o->GetName(), "PROOF_", 6)) continue;
857 // Skip data members mapping
859 // Skip missing file info
860 if (!strcmp(o->GetName(), "MissingFiles")) continue;
861 // Trees need a special treatment
862 if (o->InheritsFrom("TTree")) {
863 TTree *t = (TTree *) o;
864 TDirectory *d = t->GetDirectory();
865 // If the tree is not attached to any file ...
866 if (!d || (d && !d->InheritsFrom("TFile"))) {
867 // ... we attach it
869 }
870 if (t->GetDirectory() == fOutputFile) {
871 if (queryend) {
872 // ... we write it out
874 // At least something in the file
875 notempty = kTRUE;
876 // Flag for removal from the outputlist
877 torm.Add(o);
878 // Prevent double-deletion attempts
879 t->SetDirectory(0);
880 } else {
881 // ... or we set in automatic flush mode
882 t->SetAutoFlush();
883 }
884 }
885 } else if (queryend || fSaveResultsPerPacket) {
886 // Save overwriting what's already there
888 // At least something in the file
889 notempty = kTRUE;
890 // Flag for removal from the outputlist
891 if (queryend) torm.Add(o);
892 }
893 }
894
895 // Restore previous directory
896 gDirectory = curdir;
897
898 // Close the file if required
899 if (notempty) {
900 if (!fOutput->FindObject(baseName)) {
901 TProofOutputFile *po = 0;
902 // Get directions
903 TNamed *nm = (TNamed *) fInput->FindObject("PROOF_DefaultOutputOption");
904 TString oname = (nm) ? nm->GetTitle() : fOutputFilePath.Data();
905 if (nm && oname.BeginsWith("ds:")) {
906 oname.Replace(0, 3, "");
907 TString qtag =
909 oname.ReplaceAll("<qtag>", qtag);
910 // Create the TProofOutputFile for dataset creation
911 po = new TProofOutputFile(baseName, "DRO", oname.Data());
912 } else {
913 Bool_t hasddir = kFALSE;
914 // Create the TProofOutputFile for automatic merging
915 po = new TProofOutputFile(baseName, "M");
916 if (oname.BeginsWith("of:")) oname.Replace(0, 3, "");
917 if (gProofServ->IsTopMaster()) {
918 if (!strcmp(TUrl(oname, kTRUE).GetProtocol(), "file")) {
919 TString dsrv;
921 TProofServ::FilterLocalroot(oname, dsrv);
922 oname.Insert(0, dsrv);
923 }
924 } else {
925 if (nm) {
926 // The name has been sent by the client: resolve local place holders
927 oname.ReplaceAll("<file>", baseName);
928 } else {
929 // We did not get any indication; the final file will be in the datadir on
930 // the top master and it will be resolved there
931 oname.Form("<datadir>/%s", baseName.Data());
932 hasddir = kTRUE;
933 }
934 }
935 po->SetOutputFileName(oname.Data());
936 if (hasddir)
937 // Reset the bit, so that <datadir> has a chance to be resolved in AddOutputObject
939 po->SetName(gSystem->BaseName(oname.Data()));
940 }
942 fOutput->Add(po);
943 // Flag the nature of this file
945 }
946 }
949
950 // If last call, cleanup the output list from objects saved to file
951 if (queryend && torm.GetSize() > 0) {
952 TIter nxrm(&torm);
953 while ((o = nxrm())) { fOutput->Remove(o); }
954 }
955 torm.SetOwner(kFALSE);
956
957 PDB(kOutput, 1)
958 Info("SavePartialResults", "partial results saved to file");
959 // We are done
960 return 0;
961}
962
963////////////////////////////////////////////////////////////////////////////////
964/// Make sure that a valid selector object
965/// Return -1 in case of problems, 0 otherwise
966
967Int_t TProofPlayer::AssertSelector(const char *selector_file)
968{
969 if (selector_file && strlen(selector_file)) {
971
972 // Get selector files from cache
974 if (gProofServ) {
977 }
978
979 fSelector = TSelector::GetSelector(selector_file);
980
981 if (gProofServ) {
984 }
985
986 if (!fSelector) {
987 Error("AssertSelector", "cannot load: %s", selector_file );
988 return -1;
989 }
990
992 Info("AssertSelector", "Processing via filename (%s)", selector_file);
993 } else if (!fSelector) {
994 Error("AssertSelector", "no TSelector object define : cannot continue!");
995 return -1;
996 } else {
997 Info("AssertSelector", "Processing via TSelector object");
998 }
999 // Done
1000 return 0;
1001}
1002////////////////////////////////////////////////////////////////////////////////
1003/// Update fProgressStatus
1004
1006{
1007 if (fProgressStatus) {
1015 fProcessedRun = 0;
1016 }
1017}
1018
1019////////////////////////////////////////////////////////////////////////////////
1020/// Process specified TDSet on PROOF worker.
1021/// The return value is -1 in case of error and TSelector::GetStatus()
1022/// in case of success.
1023
1024Long64_t TProofPlayer::Process(TDSet *dset, const char *selector_file,
1025 Option_t *option, Long64_t nentries,
1027{
1028 PDB(kGlobal,1) Info("Process","Enter");
1029
1031 fOutput = 0;
1032
1033 TCleanup clean(this);
1034
1035 fSelectorClass = 0;
1036 TString wmsg;
1037 TRY {
1038 if (AssertSelector(selector_file) != 0 || !fSelector) {
1039 Error("Process", "cannot assert the selector object");
1040 return -1;
1041 }
1042
1043 fSelectorClass = fSelector->IsA();
1044 Int_t version = fSelector->Version();
1045 if (version == 0 && IsClient()) fSelector->GetOutputList()->Clear();
1046
1048
1049 if (gProofServ)
1051
1052 fSelStatus = new TStatus;
1054
1055 fSelector->SetOption(option);
1057
1058 // If in sequential (0-PROOF) mode validate the data set to get
1059 // the number of entries
1061 if (fTotalEvents < 0 && gProofServ &&
1063 dset->Validate();
1064 dset->Reset();
1065 TDSetElement *e = 0;
1066 while ((e = dset->Next())) {
1067 fTotalEvents += e->GetNum();
1068 }
1069 }
1070
1071 dset->Reset();
1072
1073 // Set parameters controlling the iterator behaviour
1074 Int_t useTreeCache = 1;
1075 if (TProof::GetParameter(fInput, "PROOF_UseTreeCache", useTreeCache) == 0) {
1076 if (useTreeCache > -1 && useTreeCache < 2)
1077 gEnv->SetValue("ProofPlayer.UseTreeCache", useTreeCache);
1078 }
1079 Long64_t cacheSize = -1;
1080 if (TProof::GetParameter(fInput, "PROOF_CacheSize", cacheSize) == 0) {
1081 TString sz = TString::Format("%lld", cacheSize);
1082 gEnv->SetValue("ProofPlayer.CacheSize", sz.Data());
1083 }
1084 // Parallel unzipping
1085 Int_t useParallelUnzip = 0;
1086 if (TProof::GetParameter(fInput, "PROOF_UseParallelUnzip", useParallelUnzip) == 0) {
1087 if (useParallelUnzip > -1 && useParallelUnzip < 2)
1088 gEnv->SetValue("ProofPlayer.UseParallelUnzip", useParallelUnzip);
1089 }
1090 // OS file caching (Mac Os X only)
1091 Int_t dontCacheFiles = 0;
1092 if (TProof::GetParameter(fInput, "PROOF_DontCacheFiles", dontCacheFiles) == 0) {
1093 if (dontCacheFiles == 1)
1094 gEnv->SetValue("ProofPlayer.DontCacheFiles", 1);
1095 }
1097
1098 // Control file object swap
1099 // <how>*10 + <force>
1100 // <how> = 0 end of run
1101 // 1 after each packet
1102 // <force> = 0 no, swap only if memory threshold is reached
1103 // 1 swap in all cases, accordingly to <how>
1104 Int_t opt = 0;
1105 if (TProof::GetParameter(fInput, "PROOF_SavePartialResults", opt) != 0) {
1106 opt = gEnv->GetValue("ProofPlayer.SavePartialResults", 0);
1107 }
1108 fSaveResultsPerPacket = (opt >= 10) ? kTRUE : kFALSE;
1109 fSavePartialResults = (opt%10 > 0) ? kTRUE : kFALSE;
1110 Info("Process", "save partial results? %d per-packet? %d", fSavePartialResults, fSaveResultsPerPacket);
1111
1112 // Memory threshold for file object swap
1113 Float_t memfrac = gEnv->GetValue("ProofPlayer.SaveMemThreshold", -1.);
1114 if (memfrac > 0.) {
1115 // The threshold is per core
1116 SysInfo_t si;
1117 if (gSystem->GetSysInfo(&si) == 0) {
1118 fSaveMemThreshold = (Long_t) ((memfrac * si.fPhysRam * 1024.) / si.fCpus);
1119 Info("Process", "memory threshold for saving objects to file set to %ld kB",
1121 } else {
1122 Error("Process", "cannot get SysInfo_t (!)");
1123 }
1124 }
1125
1126 if (version == 0) {
1127 PDB(kLoop,1) Info("Process","Call Begin(0)");
1128 fSelector->Begin(0);
1129 } else {
1130 if (IsClient()) {
1131 // on client (for local run)
1132 PDB(kLoop,1) Info("Process","Call Begin(0)");
1133 fSelector->Begin(0);
1134 }
1136 PDB(kLoop,1) Info("Process","Call SlaveBegin(0)");
1137 fSelector->SlaveBegin(0); // Init is called explicitly
1138 // from GetNextEvent()
1139 }
1140 }
1141
1142 } CATCH(excode) {
1144 Error("Process","exception %d caught", excode);
1146 return -1;
1147 } ENDTRY;
1148
1149 // Save the results, if needed, closing the file
1150 if (SavePartialResults(kFALSE) < 0)
1151 Warning("Process", "problems seetting up file-object swapping");
1152
1153 // Create feedback lists, if required
1154 SetupFeedback();
1155
1158
1159 PDB(kLoop,1)
1160 Info("Process","Looping over Process()");
1161
1162 // get the byte read counter at the beginning of processing
1165 fProcessedRun = 0;
1166 // force the first monitoring info
1169
1170 // Start asynchronous timer to dispatch pending events
1172
1173 // Loop over range
1174 gAbort = kFALSE;
1175 Long64_t entry;
1178
1179 TRY {
1180
1181 Int_t mrc = -1;
1182 // Get the frequency for checking memory consumption and logging information
1183 Long64_t memlogfreq = -1;
1184 if (((mrc = TProof::GetParameter(fInput, "PROOF_MemLogFreq", memlogfreq))) != 0) memlogfreq = -1;
1185 Long64_t singleshot = 1;
1186 Bool_t warnHWMres = kTRUE, warnHWMvir = kTRUE;
1187 TString lastMsg("(unfortunately no detailed info is available about current packet)");
1188
1189 // Initial memory footprint
1190 if (!CheckMemUsage(singleshot, warnHWMres, warnHWMvir, wmsg)) {
1191 Error("Process", "%s", wmsg.Data());
1192 wmsg.Insert(0, TString::Format("ERROR:%s, after SlaveBegin(), ", gProofServ->GetOrdinal()));
1193 fSelStatus->Add(wmsg.Data());
1194 if (gProofServ) {
1197 }
1200 } else if (!wmsg.IsNull()) {
1201 Warning("Process", "%s", wmsg.Data());
1202 }
1203
1204 TPair *currentElem = 0;
1205 // The event loop on the worker
1206 Long64_t fst = -1, num;
1207 Long_t maxproctime = -1;
1208 Bool_t newrun = kFALSE;
1209 while ((fEvIter->GetNextPacket(fst, num) != -1) &&
1212 // This is needed by the inflate infrastructure to calculate
1213 // sleeping times
1215
1216 // Give the possibility to the selector to access additional info in the
1217 // incoming packet
1218 if (dset->Current()) {
1219 if (!currentElem) {
1220 currentElem = new TPair(new TObjString("PROOF_CurrentElement"), dset->Current());
1221 fInput->Add(currentElem);
1222 } else {
1223 if (currentElem->Value() != dset->Current()) {
1224 currentElem->SetValue(dset->Current());
1225 } else if (dset->Current()->TestBit(TDSetElement::kNewRun)) {
1227 }
1228 }
1230 if (dset->TestBit(TDSet::kEmpty)) {
1231 lastMsg = "check logs for possible stacktrace - last cycle:";
1232 } else {
1233 TDSetElement *elem = dynamic_cast<TDSetElement *>(currentElem->Value());
1234 TString fn = (elem) ? elem->GetFileName() : "<undef>";
1235 lastMsg.Form("while processing dset:'%s', file:'%s'"
1236 " - check logs for possible stacktrace - last event:", dset->GetName(), fn.Data());
1237 }
1238 TProofServ::SetLastMsg(lastMsg);
1239 }
1240 // Set the max proc time, if any
1241 if (dset->Current()->GetMaxProcTime() >= 0.)
1242 maxproctime = (Long_t) (1000 * dset->Current()->GetMaxProcTime());
1243 newrun = (dset->Current()->TestBit(TDSetElement::kNewPacket)) ? kTRUE : kFALSE;
1244 }
1245
1248 // Setup packet proc time measurement
1249 if (maxproctime > 0) {
1250 if (!fProcTimeTimer) fProcTimeTimer = new TProctimeTimer(this, maxproctime);
1251 fProcTimeTimer->Start(maxproctime, kTRUE); // One shot
1252 if (!fProcTime) fProcTime = new TStopwatch();
1253 fProcTime->Reset(); // Reset counters
1254 }
1255 Long64_t refnum = num;
1256 if (refnum < 0 && maxproctime <= 0) {
1257 wmsg.Form("neither entries nor max proc time specified:"
1258 " risk of infinite loop: processing aborted");
1259 Error("Process", "%s", wmsg.Data());
1260 if (gProofServ) {
1261 wmsg.Insert(0, TString::Format("ERROR:%s, entry:%lld, ",
1264 }
1267 break;
1268 }
1269 while (refnum < 0 || num--) {
1270
1271 // Did we use all our time?
1273 fProcTime->Stop();
1274 if (!newrun && !TestBit(TProofPlayer::kMaxProcTimeExtended) && refnum > 0) {
1275 // How much are we left with?
1276 Float_t xleft = (refnum > num) ? (Float_t) num / (Float_t) (refnum) : 1.;
1277 if (xleft < 0.2) {
1278 // Give another try, 1.5 times the remaining measured expected time
1279 Long_t mpt = (Long_t) (1500 * num / ((Double_t)(refnum - num) / fProcTime->RealTime()));
1281 fProcTimeTimer->Start(mpt, kTRUE); // One shot
1283 }
1284 }
1286 Info("Process", "max proc time reached (%ld msecs): packet processing stopped:\n%s",
1287 maxproctime, lastMsg.Data());
1288
1289 break;
1290 }
1291 }
1292
1295
1296 // Get the netry number, taking into account entry or event lists
1297 entry = fEvIter->GetEntryNumber(fst);
1298 fst++;
1299
1300 // Set the last entry
1302
1303 if (fSelector->Version() == 0) {
1304 PDB(kLoop,3)
1305 Info("Process","Call ProcessCut(%lld)", entry);
1306 if (fSelector->ProcessCut(entry)) {
1307 PDB(kLoop,3)
1308 Info("Process","Call ProcessFill(%lld)", entry);
1309 fSelector->ProcessFill(entry);
1310 }
1311 } else {
1312 PDB(kLoop,3)
1313 Info("Process","Call Process(%lld)", entry);
1314 fSelector->Process(entry);
1317 break;
1318 } else if (fSelector->GetAbort() == TSelector::kAbortFile) {
1319 Info("Process", "packet processing aborted following the"
1320 " selector settings:\n%s", lastMsg.Data());
1323 }
1324 }
1326
1327 // Check the memory footprint, if required
1328 if (memlogfreq > 0 && (GetEventsProcessed() + fProcessedRun)%memlogfreq == 0) {
1329 if (!CheckMemUsage(memlogfreq, warnHWMres, warnHWMvir, wmsg)) {
1330 Error("Process", "%s", wmsg.Data());
1331 if (gProofServ) {
1332 wmsg.Insert(0, TString::Format("ERROR:%s, entry:%lld, ",
1333 gProofServ->GetOrdinal(), entry));
1335 }
1339 break;
1340 } else {
1341 if (!wmsg.IsNull()) {
1342 Warning("Process", "%s", wmsg.Data());
1343 if (gProofServ) {
1344 wmsg.Insert(0, TString::Format("WARNING:%s, entry:%lld, ",
1345 gProofServ->GetOrdinal(), entry));
1347 }
1348 }
1349 }
1350 }
1354 }
1356 if (fSelStatus->TestBit(TStatus::kNotOk) || gROOT->IsInterrupted()) break;
1357
1358 // Make sure that the selector abort status is reset
1360 fSelector->Abort("status reset", TSelector::kContinue);
1361 }
1362 }
1363
1364 } CATCH(excode) {
1365 if (excode == kPEX_STOPPED) {
1366 Info("Process","received stop-process signal");
1368 } else if (excode == kPEX_ABORTED) {
1369 gAbort = kTRUE;
1370 Info("Process","received abort-process signal");
1372 } else {
1373 Error("Process","exception %d caught", excode);
1374 // Perhaps we need a dedicated status code here ...
1375 gAbort = kTRUE;
1377 }
1379 } ENDTRY;
1380
1381 // Clean-up the envelop for the current element
1382 TPair *currentElem = 0;
1383 if ((currentElem = (TPair *) fInput->FindObject("PROOF_CurrentElement"))) {
1384 if ((currentElem = (TPair *) fInput->Remove(currentElem))) {
1385 delete currentElem->Key();
1386 delete currentElem;
1387 }
1388 }
1389
1390 // Final memory footprint
1391 Long64_t singleshot = 1;
1392 Bool_t warnHWMres = kTRUE, warnHWMvir = kTRUE;
1393 Bool_t shrc = CheckMemUsage(singleshot, warnHWMres, warnHWMvir, wmsg);
1394 if (!wmsg.IsNull()) Warning("Process", "%s (%s)", wmsg.Data(), shrc ? "warn" : "hwm");
1395
1396 PDB(kGlobal,2)
1397 Info("Process","%lld events processed", fProgressStatus->GetEntries());
1398
1399 if (gMonitoringWriter) {
1403 }
1404
1405 // Stop active timers
1407 if (fStopTimer != 0)
1409 if (fFeedbackTimer != 0)
1410 HandleTimer(0);
1411
1412 StopFeedback();
1413
1414 // Save the results, if needed, closing the file
1415 if (SavePartialResults(kTRUE) < 0)
1416 Warning("Process", "problems saving the results to file");
1417
1419
1420 // Finalize
1421
1422 if (fExitStatus != kAborted) {
1423
1424 TIter nxo(GetOutputList());
1425 TObject *o = 0;
1426 while ((o = nxo())) {
1427 // Special treatment for files
1428 if (o->IsA() == TProofOutputFile::Class()) {
1430 of->Print();
1432 const char *dir = of->GetDir();
1433 if (!dir || (dir && strlen(dir) <= 0)) {
1435 } else if (dir && strlen(dir) > 0) {
1436 TUrl u(dir);
1437 if (!strcmp(u.GetHost(), "localhost") || !strcmp(u.GetHost(), "127.0.0.1") ||
1438 !strcmp(u.GetHost(), "localhost.localdomain")) {
1440 of->SetDir(u.GetUrl(kTRUE));
1441 }
1442 of->Print();
1443 }
1444 }
1445 }
1446
1448
1450 if (fSelector->Version() == 0) {
1451 PDB(kLoop,1) Info("Process","Call Terminate()");
1453 } else {
1454 PDB(kLoop,1) Info("Process","Call SlaveTerminate()");
1457 PDB(kLoop,1) Info("Process","Call Terminate()");
1459 }
1460 }
1461 }
1462
1463 // Add Selector status in the output list so it can be returned to the client as done
1464 // by Tree::Process (see ROOT-748). The status from the various workers will be added.
1465 fOutput->Add(new TParameter<Long64_t>("PROOF_SelectorStatus", (Long64_t) fSelector->GetStatus()));
1466
1467 if (gProofServ && !gProofServ->IsParallel()) { // put all the canvases onto the output list
1468 TIter nxc(gROOT->GetListOfCanvases());
1469 while (TObject *c = nxc())
1470 fOutput->Add(c);
1471 }
1472 }
1473
1474 if (gProofServ)
1476
1477 return 0;
1478}
1479
1480////////////////////////////////////////////////////////////////////////////////
1481/// Process specified TDSet on PROOF worker with TSelector object
1482/// The return value is -1 in case of error and TSelector::GetStatus()
1483/// in case of success.
1484
1486 Option_t *option, Long64_t nentries,
1488{
1489 if (!selector) {
1490 Error("Process", "selector object undefiend!");
1491 return -1;
1492 }
1493
1495 fSelector = selector;
1497 return Process(dset, (const char *)0, option, nentries, first);
1498}
1499
1500////////////////////////////////////////////////////////////////////////////////
1501/// Not implemented: meaningful only in the remote player. Returns kFALSE.
1502
1504{
1505 return kFALSE;
1506}
1507
1508////////////////////////////////////////////////////////////////////////////////
1509/// Check the memory usage, if requested.
1510/// Return kTRUE if OK, kFALSE if above 95% of at least one between virtual or
1511/// resident limits are depassed.
1512
1514 Bool_t &w80v, TString &wmsg)
1515{
1517 if (mfreq > 0 && processed%mfreq == 0) {
1518 // Record the memory information
1519 ProcInfo_t pi;
1520 if (!gSystem->GetProcInfo(&pi)){
1521 wmsg = "";
1522 if (gProofServ)
1523 Info("CheckMemUsage|Svc", "Memory %ld virtual %ld resident event %lld",
1524 pi.fMemVirtual, pi.fMemResident, processed);
1525 // Save info in TStatus
1526 fSelStatus->SetMemValues(pi.fMemVirtual, pi.fMemResident);
1527 // Apply limit on virtual memory, if any: warn if above 80%, stop if above 95% of max
1528 if (TProofServ::GetVirtMemMax() > 0) {
1529 if (pi.fMemVirtual > TProofServ::GetMemStop() * TProofServ::GetVirtMemMax()) {
1530 wmsg.Form("using more than %d%% of allowed virtual memory (%ld kB)"
1531 " - STOP processing", (Int_t) (TProofServ::GetMemStop() * 100), pi.fMemVirtual);
1532 return kFALSE;
1533 } else if (pi.fMemVirtual > TProofServ::GetMemHWM() * TProofServ::GetVirtMemMax() && w80v) {
1534 // Refine monitoring
1535 mfreq = 1;
1536 wmsg.Form("using more than %d%% of allowed virtual memory (%ld kB)",
1537 (Int_t) (TProofServ::GetMemHWM() * 100), pi.fMemVirtual);
1538 w80v = kFALSE;
1539 }
1540 }
1541 // Apply limit on resident memory, if any: warn if above 80%, stop if above 95% of max
1542 if (TProofServ::GetResMemMax() > 0) {
1543 if (pi.fMemResident > TProofServ::GetMemStop() * TProofServ::GetResMemMax()) {
1544 wmsg.Form("using more than %d%% of allowed resident memory (%ld kB)"
1545 " - STOP processing", (Int_t) (TProofServ::GetMemStop() * 100), pi.fMemResident);
1546 return kFALSE;
1547 } else if (pi.fMemResident > TProofServ::GetMemHWM() * TProofServ::GetResMemMax() && w80r) {
1548 // Refine monitoring
1549 mfreq = 1;
1550 if (wmsg.Length() > 0) {
1551 wmsg.Form("using more than %d%% of allowed both virtual and resident memory ({%ld,%ld} kB)",
1552 (Int_t) (TProofServ::GetMemHWM() * 100), pi.fMemVirtual, pi.fMemResident);
1553 } else {
1554 wmsg.Form("using more than %d%% of allowed resident memory (%ld kB)",
1555 (Int_t) (TProofServ::GetMemHWM() * 100), pi.fMemResident);
1556 }
1557 w80r = kFALSE;
1558 }
1559 }
1560 // In saving-partial-results mode flag the saving regime when reached to save expensive calls
1561 // to TSystem::GetProcInfo in SavePartialResults
1562 if (fSaveMemThreshold > 0 && pi.fMemResident >= fSaveMemThreshold) fSavePartialResults = kTRUE;
1563 }
1564 }
1565 // Done
1566 return kTRUE;
1567}
1568
1569////////////////////////////////////////////////////////////////////////////////
1570/// Finalize query (may not be used in this class).
1571
1573{
1574 MayNotUse("Finalize");
1575 return -1;
1576}
1577
1578////////////////////////////////////////////////////////////////////////////////
1579/// Finalize query (may not be used in this class).
1580
1582{
1583 MayNotUse("Finalize");
1584 return -1;
1585}
1586////////////////////////////////////////////////////////////////////////////////
1587/// Merge output (may not be used in this class).
1588
1590{
1591 MayNotUse("MergeOutput");
1592 return;
1593}
1594
1595////////////////////////////////////////////////////////////////////////////////
1596
1598{
1600 fOutput->Add(olsdm);
1601}
1602
1603////////////////////////////////////////////////////////////////////////////////
1604/// Update automatic binning parameters for given object "name".
1605
1609 Double_t& zmin, Double_t& zmax)
1610{
1611 if ( fAutoBins == 0 ) {
1612 fAutoBins = new THashList;
1613 }
1614
1615 TAutoBinVal *val = (TAutoBinVal*) fAutoBins->FindObject(name);
1616
1617 if ( val == 0 ) {
1618 //look for info in higher master
1619 if (gProofServ && !gProofServ->IsTopMaster()) {
1620 TString key = name;
1622 }
1623
1624 val = new TAutoBinVal(name,xmin,xmax,ymin,ymax,zmin,zmax);
1625 fAutoBins->Add(val);
1626 } else {
1627 val->GetAll(xmin,xmax,ymin,ymax,zmin,zmax);
1628 }
1629}
1630
1631////////////////////////////////////////////////////////////////////////////////
1632/// Get next packet (may not be used in this class).
1633
1635{
1636 MayNotUse("GetNextPacket");
1637 return 0;
1638}
1639
1640////////////////////////////////////////////////////////////////////////////////
1641/// Set up feedback (may not be used in this class).
1642
1644{
1645 MayNotUse("SetupFeedback");
1646}
1647
1648////////////////////////////////////////////////////////////////////////////////
1649/// Stop feedback (may not be used in this class).
1650
1652{
1653 MayNotUse("StopFeedback");
1654}
1655
1656////////////////////////////////////////////////////////////////////////////////
1657/// Draw (may not be used in this class).
1658
1659Long64_t TProofPlayer::DrawSelect(TDSet * /*set*/, const char * /*varexp*/,
1660 const char * /*selection*/, Option_t * /*option*/,
1661 Long64_t /*nentries*/, Long64_t /*firstentry*/)
1662{
1663 MayNotUse("DrawSelect");
1664 return -1;
1665}
1666
1667////////////////////////////////////////////////////////////////////////////////
1668/// Handle tree header request.
1669
1671{
1672 MayNotUse("HandleGetTreeHeader|");
1673}
1674
1675////////////////////////////////////////////////////////////////////////////////
1676/// Receive histo from slave.
1677
1679{
1680 TObject *obj = mess->ReadObject(mess->GetClass());
1681 if (obj->InheritsFrom(TH1::Class())) {
1682 TH1 *h = (TH1*)obj;
1683 h->SetDirectory(0);
1684 TH1 *horg = (TH1*)gDirectory->GetList()->FindObject(h->GetName());
1685 if (horg)
1686 horg->Add(h);
1687 else
1688 h->SetDirectory(gDirectory);
1689 }
1690}
1691
1692////////////////////////////////////////////////////////////////////////////////
1693/// Draw the object if it is a canvas.
1694/// Return 0 in case of success, 1 if it is not a canvas or libProofDraw
1695/// is not available.
1696
1698{
1699 static Int_t (*gDrawCanvasHook)(TObject *) = 0;
1700
1701 // Load the library the first time
1702 if (!gDrawCanvasHook) {
1703 // Load library needed for graphics ...
1704 TString drawlib = "libProofDraw";
1705 char *p = 0;
1706 if ((p = gSystem->DynamicPathName(drawlib, kTRUE))) {
1707 delete[] p;
1708 if (gSystem->Load(drawlib) != -1) {
1709 // Locate DrawCanvas
1710 Func_t f = 0;
1711 if ((f = gSystem->DynFindSymbol(drawlib,"DrawCanvas")))
1712 gDrawCanvasHook = (Int_t (*)(TObject *))(f);
1713 else
1714 Warning("DrawCanvas", "can't find DrawCanvas");
1715 } else
1716 Warning("DrawCanvas", "can't load %s", drawlib.Data());
1717 } else
1718 Warning("DrawCanvas", "can't locate %s", drawlib.Data());
1719 }
1720 if (gDrawCanvasHook && obj)
1721 return (*gDrawCanvasHook)(obj);
1722 // No drawing hook or object undefined
1723 return 1;
1724}
1725
1726////////////////////////////////////////////////////////////////////////////////
1727/// Parse the arguments from var, sel and opt and fill the selector and
1728/// object name accordingly.
1729/// Return 0 in case of success, 1 if libProofDraw is not available.
1730
1731Int_t TProofPlayer::GetDrawArgs(const char *var, const char *sel, Option_t *opt,
1732 TString &selector, TString &objname)
1733{
1734 static Int_t (*gGetDrawArgsHook)(const char *, const char *, Option_t *,
1735 TString &, TString &) = 0;
1736
1737 // Load the library the first time
1738 if (!gGetDrawArgsHook) {
1739 // Load library needed for graphics ...
1740 TString drawlib = "libProofDraw";
1741 char *p = 0;
1742 if ((p = gSystem->DynamicPathName(drawlib, kTRUE))) {
1743 delete[] p;
1744 if (gSystem->Load(drawlib) != -1) {
1745 // Locate GetDrawArgs
1746 Func_t f = 0;
1747 if ((f = gSystem->DynFindSymbol(drawlib,"GetDrawArgs")))
1748 gGetDrawArgsHook = (Int_t (*)(const char *, const char *, Option_t *,
1749 TString &, TString &))(f);
1750 else
1751 Warning("GetDrawArgs", "can't find GetDrawArgs");
1752 } else
1753 Warning("GetDrawArgs", "can't load %s", drawlib.Data());
1754 } else
1755 Warning("GetDrawArgs", "can't locate %s", drawlib.Data());
1756 }
1757 if (gGetDrawArgsHook)
1758 return (*gGetDrawArgsHook)(var, sel, opt, selector, objname);
1759 // No parser hook or object undefined
1760 return 1;
1761}
1762
1763////////////////////////////////////////////////////////////////////////////////
1764/// Create/destroy a named canvas for feedback
1765
1767{
1768 static void (*gFeedBackCanvasHook)(const char *, Bool_t) = 0;
1769
1770 // Load the library the first time
1771 if (!gFeedBackCanvasHook) {
1772 // Load library needed for graphics ...
1773 TString drawlib = "libProofDraw";
1774 char *p = 0;
1775 if ((p = gSystem->DynamicPathName(drawlib, kTRUE))) {
1776 delete[] p;
1777 if (gSystem->Load(drawlib) != -1) {
1778 // Locate FeedBackCanvas
1779 Func_t f = 0;
1780 if ((f = gSystem->DynFindSymbol(drawlib,"FeedBackCanvas")))
1781 gFeedBackCanvasHook = (void (*)(const char *, Bool_t))(f);
1782 else
1783 Warning("FeedBackCanvas", "can't find FeedBackCanvas");
1784 } else
1785 Warning("FeedBackCanvas", "can't load %s", drawlib.Data());
1786 } else
1787 Warning("FeedBackCanvas", "can't locate %s", drawlib.Data());
1788 }
1789 if (gFeedBackCanvasHook) (*gFeedBackCanvasHook)(name, create);
1790 // No parser hook or object undefined
1791 return;
1792}
1793
1794////////////////////////////////////////////////////////////////////////////////
1795/// Return the size in bytes of the cache
1796
1798{
1799 if (fEvIter) return fEvIter->GetCacheSize();
1800 return -1;
1801}
1802
1803////////////////////////////////////////////////////////////////////////////////
1804/// Return the number of entries in the learning phase
1805
1807{
1808 if (fEvIter) return fEvIter->GetLearnEntries();
1809 return -1;
1810}
1811
1812////////////////////////////////////////////////////////////////////////////////
1813/// Switch on/off merge timer
1814
1816{
1817 if (on) {
1818 if (!fMergeSTW) fMergeSTW = new TStopwatch();
1819 PDB(kGlobal,1)
1820 Info("SetMerging", "ON: mergers: %d", fProof->fMergersCount);
1821 if (fNumMergers <= 0 && fProof->fMergersCount > 0)
1823 } else if (fMergeSTW) {
1824 fMergeSTW->Stop();
1825 Float_t rt = fMergeSTW->RealTime();
1826 PDB(kGlobal,1)
1827 Info("SetMerging", "OFF: rt: %f, mergers: %d", rt, fNumMergers);
1828 if (fQuery) {
1830 // On the master (or in Lite()) we set the merging time and the numebr of mergers
1831 fQuery->SetMergeTime(rt);
1833 } else {
1834 // In a standard client we save the transfer-to-client time
1835 fQuery->SetRecvTime(rt);
1836 }
1837 PDB(kGlobal,2) fQuery->Print("F");
1838 }
1839 }
1840}
1841
1842//------------------------------------------------------------------------------
1843
1845
1846////////////////////////////////////////////////////////////////////////////////
1847/// Process the specified TSelector object 'nentries' times.
1848/// Used to test the PROOF interator mechanism for cycle-driven selectors in a
1849/// local session.
1850/// The return value is -1 in case of error and TSelector::GetStatus()
1851/// in case of success.
1852
1854 Long64_t nentries, Option_t *option)
1855{
1856 if (!selector) {
1857 Error("Process", "selector object undefiend!");
1858 return -1;
1859 }
1860
1861 TDSetProxy *set = new TDSetProxy("", "", "");
1862 set->SetBit(TDSet::kEmpty);
1863 set->SetBit(TDSet::kIsLocal);
1864 Long64_t rc = Process(set, selector, option, nentries);
1865 SafeDelete(set);
1866
1867 // Done
1868 return rc;
1869}
1870
1871////////////////////////////////////////////////////////////////////////////////
1872/// Process the specified TSelector file 'nentries' times.
1873/// Used to test the PROOF interator mechanism for cycle-driven selectors in a
1874/// local session.
1875/// Process specified TDSet on PROOF worker with TSelector object
1876/// The return value is -1 in case of error and TSelector::GetStatus()
1877/// in case of success.
1878
1880 Long64_t nentries, Option_t *option)
1881{
1882 TDSetProxy *set = new TDSetProxy("", "", "");
1883 set->SetBit(TDSet::kEmpty);
1884 set->SetBit(TDSet::kIsLocal);
1885 Long64_t rc = Process(set, selector, option, nentries);
1886 SafeDelete(set);
1887
1888 // Done
1889 return rc;
1890}
1891
1892
1893//------------------------------------------------------------------------------
1894
1896
1897////////////////////////////////////////////////////////////////////////////////
1898/// Destructor.
1899
1901{
1902 SafeDelete(fOutput); // owns the output list
1904
1905 // Objects stored in maps are already deleted when merging the feedback
1908
1910}
1911
1912////////////////////////////////////////////////////////////////////////////////
1913/// Init the packetizer
1914/// Return 0 on success (fPacketizer is correctly initialized), -1 on failure.
1915
1917 Long64_t first, const char *defpackunit,
1918 const char *defpackdata)
1919{
1921 PDB(kGlobal,1) Info("Process","Enter");
1922 fDSet = dset;
1924
1925 // This is done here to pickup on the fly changes
1926 Int_t honebyone = 1;
1927 if (TProof::GetParameter(fInput, "PROOF_MergeTH1OneByOne", honebyone) != 0)
1928 honebyone = gEnv->GetValue("ProofPlayer.MergeTH1OneByOne", 1);
1929 fMergeTH1OneByOne = (honebyone == 1) ? kTRUE : kFALSE;
1930
1931 Bool_t noData = dset->TestBit(TDSet::kEmpty) ? kTRUE : kFALSE;
1932
1933 TString packetizer;
1934 TList *listOfMissingFiles = 0;
1935
1936 TMethodCall callEnv;
1937 TClass *cl;
1938 noData = dset->TestBit(TDSet::kEmpty) ? kTRUE : kFALSE;
1939
1940 if (noData) {
1941
1942 if (TProof::GetParameter(fInput, "PROOF_Packetizer", packetizer) != 0)
1943 packetizer = defpackunit;
1944 else
1945 Info("InitPacketizer", "using alternate packetizer: %s", packetizer.Data());
1946
1947 // Get linked to the related class
1948 cl = TClass::GetClass(packetizer);
1949 if (cl == 0) {
1950 Error("InitPacketizer", "class '%s' not found", packetizer.Data());
1952 return -1;
1953 }
1954
1955 // Init the constructor
1956 callEnv.InitWithPrototype(cl, cl->GetName(),"TList*,Long64_t,TList*,TProofProgressStatus*");
1957 if (!callEnv.IsValid()) {
1958 Error("InitPacketizer",
1959 "cannot find correct constructor for '%s'", cl->GetName());
1961 return -1;
1962 }
1963 callEnv.ResetParam();
1965 callEnv.SetParam((Long64_t) nentries);
1966 callEnv.SetParam((Long_t) fInput);
1967 callEnv.SetParam((Long_t) fProgressStatus);
1968
1969 } else if (dset->TestBit(TDSet::kMultiDSet)) {
1970
1971 // We have to process many datasets in one go, keeping them separate
1973 // We have been asked to stop
1974 Error("InitPacketizer", "received stop/abort request");
1976 return -1;
1977 }
1978
1979 // The multi packetizer
1980 packetizer = "TPacketizerMulti";
1981
1982 // Get linked to the related class
1983 cl = TClass::GetClass(packetizer);
1984 if (cl == 0) {
1985 Error("InitPacketizer", "class '%s' not found", packetizer.Data());
1987 return -1;
1988 }
1989
1990 // Init the constructor
1991 callEnv.InitWithPrototype(cl, cl->GetName(),"TDSet*,TList*,Long64_t,Long64_t,TList*,TProofProgressStatus*");
1992 if (!callEnv.IsValid()) {
1993 Error("InitPacketizer", "cannot find correct constructor for '%s'", cl->GetName());
1995 return -1;
1996 }
1997 callEnv.ResetParam();
1998 callEnv.SetParam((Long_t) dset);
2000 callEnv.SetParam((Long64_t) first);
2001 callEnv.SetParam((Long64_t) nentries);
2002 callEnv.SetParam((Long_t) fInput);
2003 callEnv.SetParam((Long_t) fProgressStatus);
2004
2005 // We are going to test validity during the packetizer initialization
2006 dset->SetBit(TDSet::kValidityChecked);
2007 dset->ResetBit(TDSet::kSomeInvalid);
2008
2009 } else {
2010
2011 // Lookup - resolve the end-point urls to optmize the distribution.
2012 // The lookup was previously called in the packetizer's constructor.
2013 // A list for the missing files may already have been added to the
2014 // output list; otherwise, if needed it will be created inside
2015 if ((listOfMissingFiles = (TList *)fInput->FindObject("MissingFiles"))) {
2016 // Move it to the output list
2017 fInput->Remove(listOfMissingFiles);
2018 } else {
2019 listOfMissingFiles = new TList;
2020 }
2021 // Do the lookup; we only skip it if explicitly requested so.
2022 TString lkopt;
2023 if (TProof::GetParameter(fInput, "PROOF_LookupOpt", lkopt) != 0 || lkopt != "none")
2024 dset->Lookup(kTRUE, &listOfMissingFiles);
2025
2027 // We have been asked to stop
2028 Error("InitPacketizer", "received stop/abort request");
2030 return -1;
2031 }
2032
2033 if (!(dset->GetListOfElements()) ||
2034 !(dset->GetListOfElements()->GetSize())) {
2035 if (gProofServ)
2036 gProofServ->SendAsynMessage("InitPacketizer: No files from the data set were found - Aborting");
2037 Error("InitPacketizer", "No files from the data set were found - Aborting");
2039 if (listOfMissingFiles) {
2040 listOfMissingFiles->SetOwner();
2041 fOutput->Remove(listOfMissingFiles);
2042 SafeDelete(listOfMissingFiles);
2043 }
2044 return -1;
2045 }
2046
2047 if (TProof::GetParameter(fInput, "PROOF_Packetizer", packetizer) != 0)
2048 // Using standard packetizer TAdaptivePacketizer
2049 packetizer = defpackdata;
2050 else
2051 Info("InitPacketizer", "using alternate packetizer: %s", packetizer.Data());
2052
2053 // Get linked to the related class
2054 cl = TClass::GetClass(packetizer);
2055 if (cl == 0) {
2056 Error("InitPacketizer", "class '%s' not found", packetizer.Data());
2058 return -1;
2059 }
2060
2061 // Init the constructor
2062 callEnv.InitWithPrototype(cl, cl->GetName(),"TDSet*,TList*,Long64_t,Long64_t,TList*,TProofProgressStatus*");
2063 if (!callEnv.IsValid()) {
2064 Error("InitPacketizer", "cannot find correct constructor for '%s'", cl->GetName());
2066 return -1;
2067 }
2068 callEnv.ResetParam();
2069 callEnv.SetParam((Long_t) dset);
2071 callEnv.SetParam((Long64_t) first);
2072 callEnv.SetParam((Long64_t) nentries);
2073 callEnv.SetParam((Long_t) fInput);
2074 callEnv.SetParam((Long_t) fProgressStatus);
2075
2076 // We are going to test validity during the packetizer initialization
2077 dset->SetBit(TDSet::kValidityChecked);
2078 dset->ResetBit(TDSet::kSomeInvalid);
2079 }
2080
2081 // Get an instance of the packetizer
2082 Long_t ret = 0;
2083 callEnv.Execute(ret);
2084 if ((fPacketizer = (TVirtualPacketizer *)ret) == 0) {
2085 Error("InitPacketizer", "cannot construct '%s'", cl->GetName());
2087 return -1;
2088 }
2089
2090 if (!fPacketizer->IsValid()) {
2091 Error("InitPacketizer",
2092 "instantiated packetizer object '%s' is invalid", cl->GetName());
2095 return -1;
2096 }
2097
2098 // In multi mode retrieve the list of missing files
2099 if (!noData && dset->TestBit(TDSet::kMultiDSet)) {
2100 if ((listOfMissingFiles = (TList *) fInput->FindObject("MissingFiles"))) {
2101 // Remove it; it will be added to the output list
2102 fInput->Remove(listOfMissingFiles);
2103 }
2104 }
2105
2106 if (!noData) {
2107 // Add invalid elements to the list of missing elements
2108 TDSetElement *elem = 0;
2109 if (dset->TestBit(TDSet::kSomeInvalid)) {
2110 TIter nxe(dset->GetListOfElements());
2111 while ((elem = (TDSetElement *)nxe())) {
2112 if (!elem->GetValid()) {
2113 if (!listOfMissingFiles)
2114 listOfMissingFiles = new TList;
2115 listOfMissingFiles->Add(elem->GetFileInfo(dset->GetType()));
2116 dset->Remove(elem, kFALSE);
2117 }
2118 }
2119 // The invalid elements have been removed
2121 }
2122
2123 // Record the list of missing or invalid elements in the output list
2124 if (listOfMissingFiles && listOfMissingFiles->GetSize() > 0) {
2125 TIter missingFiles(listOfMissingFiles);
2126 TString msg;
2127 if (gDebug > 0) {
2128 TFileInfo *fi = 0;
2129 while ((fi = (TFileInfo *) missingFiles.Next())) {
2130 if (fi->GetCurrentUrl()) {
2131 msg = Form("File not found: %s - skipping!",
2132 fi->GetCurrentUrl()->GetUrl());
2133 } else {
2134 msg = Form("File not found: %s - skipping!", fi->GetName());
2135 }
2137 }
2138 }
2139 // Make sure it will be sent back
2140 if (!GetOutput("MissingFiles")) {
2141 listOfMissingFiles->SetName("MissingFiles");
2142 AddOutputObject(listOfMissingFiles);
2143 }
2144 TStatus *tmpStatus = (TStatus *)GetOutput("PROOF_Status");
2145 if (!tmpStatus) AddOutputObject((tmpStatus = new TStatus()));
2146
2147 // Estimate how much data are missing
2148 Int_t ngood = dset->GetListOfElements()->GetSize();
2149 Int_t nbad = listOfMissingFiles->GetSize();
2150 Double_t xb = Double_t(nbad) / Double_t(ngood + nbad);
2151 msg = Form(" About %.2f %c of the requested files (%d out of %d) were missing or unusable; details in"
2152 " the 'missingFiles' list", xb * 100., '%', nbad, nbad + ngood);
2153 tmpStatus->Add(msg.Data());
2154 msg = Form(" +++\n"
2155 " +++ About %.2f %c of the requested files (%d out of %d) are missing or unusable; details in"
2156 " the 'MissingFiles' list\n"
2157 " +++", xb * 100., '%', nbad, nbad + ngood);
2159 } else {
2160 // Cleanup
2161 SafeDelete(listOfMissingFiles);
2162 }
2163 }
2164
2165 // Done
2166 return 0;
2167}
2168
2169////////////////////////////////////////////////////////////////////////////////
2170/// Process specified TDSet on PROOF.
2171/// This method is called on client and on the PROOF master.
2172/// The return value is -1 in case of an error and TSelector::GetStatus() in
2173/// in case of success.
2174
2175Long64_t TProofPlayerRemote::Process(TDSet *dset, const char *selector_file,
2176 Option_t *option, Long64_t nentries,
2178{
2179 PDB(kGlobal,1) Info("Process", "Enter");
2180
2181 fDSet = dset;
2183
2184 if (!fProgressStatus) {
2185 Error("Process", "No progress status");
2186 return -1;
2187 }
2189
2190 // delete fOutput;
2191 if (!fOutput)
2192 fOutput = new THashList;
2193 else
2194 fOutput->Clear();
2195
2197
2198 if (fProof->IsMaster()){
2200 } else {
2202 }
2203
2204 TStopwatch elapsed;
2205
2206 // Define filename
2207 TString fn;
2208 fSelectorFileName = selector_file;
2209
2210 if (fCreateSelObj) {
2211 if(!SendSelector(selector_file)) return -1;
2212 fn = gSystem->BaseName(selector_file);
2213 } else {
2214 fn = selector_file;
2215 }
2216
2218
2219 // Parse option
2220 Bool_t sync = (fProof->GetQueryMode(option) == TProof::kSync);
2221
2222 TList *inputtmp = 0; // List of temporary input objects
2223 TDSet *set = dset;
2224 if (fProof->IsMaster()) {
2225
2226 PDB(kPacketizer,1) Info("Process","Create Proxy TDSet");
2227 set = new TDSetProxy( dset->GetType(), dset->GetObjName(),
2228 dset->GetDirectory() );
2229 if (dset->TestBit(TDSet::kEmpty))
2230 set->SetBit(TDSet::kEmpty);
2231
2232 if (InitPacketizer(dset, nentries, first, "TPacketizerUnit", "TPacketizer") != 0) {
2233 Error("Process", "cannot init the packetizer");
2235 return -1;
2236 }
2237
2238 // Reset start, this is now managed by the packetizer
2239 first = 0;
2240
2241 // Negative memlogfreq disable checks.
2242 // If 0 is passed we try to have 100 messages about memory
2243 // Otherwise we use the frequency passed.
2244 Int_t mrc = -1;
2245 Long64_t memlogfreq = -1, mlf;
2246 if (gSystem->Getenv("PROOF_MEMLOGFREQ")) {
2247 TString clf(gSystem->Getenv("PROOF_MEMLOGFREQ"));
2248 if (clf.IsDigit()) { memlogfreq = clf.Atoi(); mrc = 0; }
2249 }
2250 if ((mrc = TProof::GetParameter(fProof->GetInputList(), "PROOF_MemLogFreq", mlf)) == 0) memlogfreq = mlf;
2251 if (memlogfreq == 0) {
2252 memlogfreq = fPacketizer->GetTotalEntries()/(fProof->GetParallel()*100);
2253 if (memlogfreq <= 0) memlogfreq = 1;
2254 }
2255 if (mrc == 0) fProof->SetParameter("PROOF_MemLogFreq", memlogfreq);
2256
2257
2258 // Send input data, if any
2259 TString emsg;
2260 if (TProof::SendInputData(fQuery, fProof, emsg) != 0)
2261 Warning("Process", "could not forward input data: %s", emsg.Data());
2262
2263 // Attach to the transient histogram with the assigned packets, if required
2264 if (fInput->FindObject("PROOF_StatsHist") != 0) {
2265 if (!(fProcPackets = (TH1I *) fOutput->FindObject("PROOF_ProcPcktHist"))) {
2266 Warning("Process", "could not attach to histogram 'PROOF_ProcPcktHist'");
2267 } else {
2268 PDB(kLoop,1)
2269 Info("Process", "attached to histogram 'PROOF_ProcPcktHist' to record"
2270 " packets being processed");
2271 }
2272 }
2273
2274 } else {
2275
2276 // Check whether we have to enforce the use of submergers
2277 if (gEnv->Lookup("Proof.UseMergers") && !fInput->FindObject("PROOF_UseMergers")) {
2278 Int_t smg = gEnv->GetValue("Proof.UseMergers",-1);
2279 if (smg >= 0) {
2280 fInput->Add(new TParameter<Int_t>("PROOF_UseMergers", smg));
2281 if (gEnv->Lookup("Proof.MergersByHost")) {
2282 Int_t mbh = gEnv->GetValue("Proof.MergersByHost",0);
2283 if (mbh != 0) {
2284 // Administrator settings have the priority
2285 TObject *o = 0;
2286 if ((o = fInput->FindObject("PROOF_MergersByHost"))) { fInput->Remove(o); delete o; }
2287 fInput->Add(new TParameter<Int_t>("PROOF_MergersByHost", mbh));
2288 }
2289 }
2290 }
2291 }
2292
2293 // For a new query clients should make sure that the temporary
2294 // output list is empty
2295 if (fOutputLists) {
2297 delete fOutputLists;
2298 fOutputLists = 0;
2299 }
2300
2301 if (!sync) {
2303 Printf(" ");
2304 Info("Process","starting new query");
2305 }
2306
2307 // Define fSelector in Client if processing with filename
2308 if (fCreateSelObj) {
2310 if (!(fSelector = TSelector::GetSelector(selector_file))) {
2311 if (!sync)
2313 return -1;
2314 }
2315 }
2316
2317 fSelectorClass = 0;
2318 fSelectorClass = fSelector->IsA();
2319
2320 // Add fSelector to inputlist if processing with object
2321 if (!fCreateSelObj) {
2322 // In any input list was set into the selector move it to the PROOF
2323 // input list, because we do not want to stream the selector one
2324 if (fSelector->GetInputList() && fSelector->GetInputList()->GetSize() > 0) {
2326 TObject *o = 0;
2327 while ((o = nxi())) {
2328 if (!fInput->FindObject(o)) {
2329 fInput->Add(o);
2330 if (!inputtmp) {
2331 inputtmp = new TList;
2332 inputtmp->SetOwner(kFALSE);
2333 }
2334 inputtmp->Add(o);
2335 }
2336 }
2337 }
2339 }
2340 // Set the input list for initialization
2342 fSelector->SetOption(option);
2344
2345 PDB(kLoop,1) Info("Process","Call Begin(0)");
2346 fSelector->Begin(0);
2347
2348 // Reset the input list to avoid double streaming and related problems (saving
2349 // the TQueryResult)
2351
2352 // Send large input data objects, if any
2354
2355 if (!sync)
2357 }
2358
2359 TCleanup clean(this);
2360 SetupFeedback();
2361
2362 TString opt = option;
2363
2364 // Old servers need a dedicated streamer
2365 if (fProof->fProtocol < 13)
2366 dset->SetWriteV3(kTRUE);
2367
2368 // Workers will get the entry ranges from the packetizer
2371
2372 // Entry- or Event- list ?
2373 TEntryList *enl = (!fProof->IsMaster()) ? dynamic_cast<TEntryList *>(set->GetEntryList())
2374 : (TEntryList *)0;
2375 TEventList *evl = (!fProof->IsMaster() && !enl) ? dynamic_cast<TEventList *>(set->GetEntryList())
2376 : (TEventList *)0;
2377 if (fProof->fProtocol > 14) {
2378 if (fProcessMessage) delete fProcessMessage;
2380 mesg << set << fn << fInput << opt << num << fst << evl << sync << enl;
2381 (*fProcessMessage) << set << fn << fInput << opt << num << fst << evl << sync << enl;
2382 } else {
2383 mesg << set << fn << fInput << opt << num << fst << evl << sync;
2384 if (enl)
2385 // Not supported remotely
2386 Warning("Process","entry lists not supported by the server");
2387 }
2388
2389 // Reset the merging progress information
2391
2392 Int_t nb = fProof->Broadcast(mesg);
2393 PDB(kGlobal,1) Info("Process", "Broadcast called: %d workers notified", nb);
2394 if (fProof->IsLite()) fProof->fNotIdle += nb;
2395
2396 // Reset streamer choice
2397 if (fProof->fProtocol < 13)
2398 dset->SetWriteV3(kFALSE);
2399
2400 // Redirect logs from master to special log frame
2401 if (IsClient())
2403
2404 if (!IsClient()){
2405 // Signal the start of finalize for the memory log grepping
2406 Info("Process|Svc", "Start merging Memory information");
2407 }
2408
2409 if (!sync) {
2410 if (IsClient()) {
2411 // Asynchronous query: just make sure that asynchronous input
2412 // is enabled and return the prompt
2413 PDB(kGlobal,1) Info("Process","Asynchronous processing:"
2414 " activating CollectInputFrom");
2415 fProof->Activate();
2416
2417 // Receive the acknowledgement and query sequential number
2418 fProof->Collect();
2419
2420 return fProof->fSeqNum;
2421
2422 } else {
2423 PDB(kGlobal,1) Info("Process","Calling Collect");
2424 fProof->Collect();
2425
2426 HandleTimer(0); // force an update of final result
2427 // This forces a last call to TPacketizer::HandleTimer via the second argument
2428 // (the first is ignored). This is needed when some events were skipped so that
2429 // the total number of entries is not the one requested. The packetizer has no
2430 // way in such a case to understand that processing is finished: it must be told.
2431 if (fPacketizer) {
2433 // The progress timer will now stop itself at the next call
2435 // Store process info
2436 elapsed.Stop();
2437 if (fQuery)
2440 elapsed.RealTime());
2441 }
2442 StopFeedback();
2443
2444 return Finalize(kFALSE,sync);
2445 }
2446 } else {
2447
2448 PDB(kGlobal,1) Info("Process","Synchronous processing: calling Collect");
2449 fProof->Collect();
2450 if (!(fProof->IsSync())) {
2451 // The server required to switch to asynchronous mode
2452 Info("Process", "switching to the asynchronous mode ...");
2453 return fProof->fSeqNum;
2454 }
2455
2456 // Restore prompt logging, for clients (Collect leaves things as they were
2457 // at the time it was called)
2458 if (IsClient())
2460
2461 if (!IsClient()) {
2462 // Force an update of final result
2463 HandleTimer(0);
2464 // This forces a last call to TPacketizer::HandleTimer via the second argument
2465 // (the first is ignored). This is needed when some events were skipped so that
2466 // the total number of entries is not the one requested. The packetizer has no
2467 // way in such a case to understand that processing is finished: it must be told.
2468 if (fPacketizer) {
2470 // The progress timer will now stop itself at the next call
2472 // Store process info
2473 if (fQuery)
2477 }
2478 } else {
2479 // Set the input list: maybe required at termination
2481 }
2482 StopFeedback();
2483
2484 Long64_t rc = -1;
2486 rc = Finalize(kFALSE,sync);
2487
2488 // Remove temporary input objects, if any
2489 if (inputtmp) {
2490 TIter nxi(inputtmp);
2491 TObject *o = 0;
2492 while ((o = nxi())) fInput->Remove(o);
2493 SafeDelete(inputtmp);
2494 }
2495
2496 // Done
2497 return rc;
2498 }
2499}
2500
2501////////////////////////////////////////////////////////////////////////////////
2502/// Process specified TDSet on PROOF.
2503/// This method is called on client and on the PROOF master.
2504/// The return value is -1 in case of an error and TSelector::GetStatus() in
2505/// in case of success.
2506
2508 Option_t *option, Long64_t nentries,
2510{
2511 if (!selector) {
2512 Error("Process", "selector object undefined");
2513 return -1;
2514 }
2515
2516 // Define fSelector in Client
2517 if (IsClient() && (selector != fSelector)) {
2519 fSelector = selector;
2520 }
2521
2523 Long64_t rc = Process(dset, selector->ClassName(), option, nentries, first);
2525
2526 // Done
2527 return rc;
2528}
2529
2530////////////////////////////////////////////////////////////////////////////////
2531/// Prepares the given list of new workers to join a progressing process.
2532/// Returns kTRUE on success, kFALSE otherwise.
2533
2535{
2536 if (!fProcessMessage || !fProof || !fPacketizer) {
2537 Error("Process", "Should not happen: fProcessMessage=%p fProof=%p fPacketizer=%p",
2539 return kFALSE;
2540 }
2541
2542 if (!workers || !fProof->IsMaster()) {
2543 Error("Process", "Invalid call");
2544 return kFALSE;
2545 }
2546
2547 PDB(kGlobal, 1)
2548 Info("Process", "Preparing %d new worker(s) to process", workers->GetEntries());
2549
2550 // Sends the file associated to the TSelector, if necessary
2551 if (fCreateSelObj) {
2552 PDB(kGlobal, 2)
2553 Info("Process", "Sending selector file %s", fSelectorFileName.Data());
2555 Error("Process", "Problems in sending selector file %s", fSelectorFileName.Data());
2556 return kFALSE;
2557 }
2558 }
2559
2560 if (fProof->IsLite()) fProof->fNotIdle += workers->GetSize();
2561
2562 PDB(kGlobal, 2)
2563 Info("Process", "Adding new workers to the packetizer");
2564 if (fPacketizer->AddWorkers(workers) == -1) {
2565 Error("Process", "Cannot add new workers to the packetizer!");
2566 return kFALSE; // TODO: make new wrks inactive
2567 }
2568
2569 PDB(kGlobal, 2)
2570 Info("Process", "Broadcasting process message to new workers");
2571 fProof->Broadcast(*fProcessMessage, workers);
2572
2573 // Don't call Collect(): we came here from a global Collect() already which
2574 // will take care of new workers as well
2575
2576 return kTRUE;
2577
2578}
2579
2580////////////////////////////////////////////////////////////////////////////////
2581/// Merge output in files
2582
2584{
2585 PDB(kOutput,1) Info("MergeOutputFiles", "enter: fOutput size: %d", fOutput->GetSize());
2586 PDB(kOutput,2) fOutput->ls();
2587
2588 TList *rmList = 0;
2589 if (fMergeFiles) {
2590 TIter nxo(fOutput);
2591 TObject *o = 0;
2592 TProofOutputFile *pf = 0;
2593 while ((o = nxo())) {
2594 if ((pf = dynamic_cast<TProofOutputFile*>(o))) {
2595
2596 PDB(kOutput,2) pf->Print();
2597
2598 if (pf->IsMerge()) {
2599
2600 // Point to the merger
2601 Bool_t localMerge = (pf->GetTypeOpt() == TProofOutputFile::kLocal) ? kTRUE : kFALSE;
2602 TFileMerger *filemerger = pf->GetFileMerger(localMerge);
2603 if (!filemerger) {
2604 Error("MergeOutputFiles", "file merger is null in TProofOutputFile! Protocol error?");
2605 pf->Print();
2606 continue;
2607 }
2608 // If only one instance the list in the merger is not yet created: do it now
2609 if (!pf->IsMerged()) {
2610 PDB(kOutput,2) pf->Print();
2611 TString fileLoc = TString::Format("%s/%s", pf->GetDir(), pf->GetFileName());
2612 filemerger->AddFile(fileLoc);
2613 }
2614 // Datadir
2615 TString ddir, ddopts;
2616 if (gProofServ) {
2617 ddir.Form("%s/", gProofServ->GetDataDir());
2619 }
2620 // Set the output file
2621 TString outfile(pf->GetOutputFileName());
2622 if (outfile.Contains("<datadir>/")) {
2623 outfile.ReplaceAll("<datadir>/", ddir.Data());
2624 if (!ddopts.IsNull())
2625 outfile += TString::Format("?%s", ddopts.Data());
2626 pf->SetOutputFileName(outfile);
2627 }
2628 if ((gProofServ && gProofServ->IsTopMaster()) || (fProof && fProof->IsLite())) {
2630 TString srv;
2632 TUrl usrv(srv);
2633 Bool_t localFile = kFALSE;
2634 if (pf->IsRetrieve()) {
2635 // This file will be retrieved by the client: we created it in the data dir
2636 // and save the file URL on the client in the title
2637 if (outfile.BeginsWith("client:")) outfile.Replace(0, 7, "");
2638 TString bn = gSystem->BaseName(TUrl(outfile.Data(), kTRUE).GetFile());
2639 // The output file path on the master
2640 outfile.Form("%s%s", ddir.Data(), bn.Data());
2641 // Save the client path in the title if not defined yet
2642 if (strlen(pf->GetTitle()) <= 0) pf->SetTitle(bn);
2643 // The file is local
2644 localFile = kTRUE;
2645 } else {
2646 // Check if the file is on the master or elsewhere
2647 if (outfile.BeginsWith("master:")) outfile.Replace(0, 7, "");
2648 // Check locality
2649 TUrl uof(outfile.Data(), kTRUE);
2650 TString lfn;
2651 ftyp = TFile::GetType(uof.GetUrl(), "RECREATE", &lfn);
2652 if (ftyp == TFile::kLocal && !srv.IsNull()) {
2653 // Check if is a different server
2654 if (uof.GetPort() > 0 && usrv.GetPort() > 0 &&
2655 usrv.GetPort() != uof.GetPort()) ftyp = TFile::kNet;
2656 }
2657 // If it is really local set the file name
2658 if (ftyp == TFile::kLocal) outfile = lfn;
2659 // The file maybe local
2660 if (ftyp == TFile::kLocal || ftyp == TFile::kFile) localFile = kTRUE;
2661 }
2662 // The remote output file name (the one to be used by the client)
2663 TString outfilerem(outfile);
2664 // For local files we add the local server
2665 if (localFile) {
2666 // Remove prefix, if any, if included and if Xrootd
2667 TProofServ::FilterLocalroot(outfilerem, srv);
2668 outfilerem.Insert(0, srv);
2669 }
2670 // Save the new remote output filename
2671 pf->SetOutputFileName(outfilerem);
2672 // Align the filename
2673 pf->SetFileName(gSystem->BaseName(outfilerem));
2674 }
2675 if (!filemerger->OutputFile(outfile)) {
2676 Error("MergeOutputFiles", "cannot open the output file");
2677 continue;
2678 }
2679 // Merge
2680 PDB(kSubmerger,2) filemerger->PrintFiles("");
2681 if (!filemerger->Merge()) {
2682 Error("MergeOutputFiles", "cannot merge the output files");
2683 continue;
2684 }
2685 // Remove the files
2686 TList *fileList = filemerger->GetMergeList();
2687 if (fileList) {
2688 TIter next(fileList);
2689 TObjString *url = 0;
2690 while((url = (TObjString*)next())) {
2691 TUrl u(url->GetName());
2692 if (!strcmp(u.GetProtocol(), "file")) {
2693 gSystem->Unlink(u.GetFile());
2694 } else {
2695 gSystem->Unlink(url->GetName());
2696 }
2697 }
2698 }
2699 // Reset the merger
2700 filemerger->Reset();
2701
2702 } else {
2703
2704 // If not yet merged (for example when having only 1 active worker,
2705 // we need to create the dataset by calling Merge on an effectively empty list
2706 if (!pf->IsMerged()) {
2707 TList dumlist;
2708 dumlist.Add(new TNamed("dum", "dum"));
2709 dumlist.SetOwner(kTRUE);
2710 pf->Merge(&dumlist);
2711 }
2712 // Point to the dataset
2714 if (!fc) {
2715 Error("MergeOutputFiles", "file collection is null in TProofOutputFile! Protocol error?");
2716 pf->Print();
2717 continue;
2718 }
2719 // Add the collection to the output list for registration and/or to be returned
2720 // to the client
2721 fOutput->Add(fc);
2722 // Do not cleanup at destruction
2723 pf->ResetFileCollection();
2724 // Tell the main thread to register this dataset, if needed
2725 if (pf->IsRegister()) {
2726 TString opt;
2727 if ((pf->GetTypeOpt() & TProofOutputFile::kOverwrite)) opt += "O";
2728 if ((pf->GetTypeOpt() & TProofOutputFile::kVerify)) opt += "V";
2729 if (!fOutput->FindObject("PROOFSERV_RegisterDataSet"))
2730 fOutput->Add(new TNamed("PROOFSERV_RegisterDataSet", ""));
2731 TString tag = TString::Format("DATASET_%s", pf->GetTitle());
2732 fOutput->Add(new TNamed(tag, opt));
2733 }
2734 // Remove this object from the output list and schedule it for distruction
2735 fOutput->Remove(pf);
2736 if (!rmList) rmList = new TList;
2737 rmList->Add(pf);
2738 PDB(kOutput,2) fOutput->Print();
2739 }
2740 }
2741 }
2742 }
2743
2744 // Remove objects scheduled for removal
2745 if (rmList && rmList->GetSize() > 0) {
2746 TIter nxo(rmList);
2747 TObject *o = 0;
2748 while((o = nxo())) {
2749 fOutput->Remove(o);
2750 }
2751 rmList->SetOwner(kTRUE);
2752 delete rmList;
2753 }
2754
2755 PDB(kOutput,1) Info("MergeOutputFiles", "done!");
2756
2757 // Done
2758 return kTRUE;
2759}
2760
2761
2762////////////////////////////////////////////////////////////////////////////////
2763/// Set the selector's data members:
2764/// find the mapping of data members to otuput list entries in the output list
2765/// and apply it.
2766
2768{
2771 if (!olsdm) {
2772 PDB(kOutput,1) Warning("SetSelectorDataMembersFromOutputList",
2773 "failed to find map object in output list!");
2774 return;
2775 }
2776
2777 olsdm->SetDataMembers(fSelector);
2778}
2779
2780////////////////////////////////////////////////////////////////////////////////
2781
2783{
2784 // Finalize a query.
2785 // Returns -1 in case of an error, 0 otherwise.
2786
2787 if (IsClient()) {
2788 if (fOutputLists == 0) {
2789 if (force)
2790 if (fQuery)
2791 return fProof->Finalize(Form("%s:%s", fQuery->GetTitle(),
2792 fQuery->GetName()), force);
2793 } else {
2794 // Make sure the all objects are in the output list
2795 PDB(kGlobal,1) Info("Finalize","Calling Merge Output to finalize the output list");
2796 MergeOutput();
2797 }
2798 }
2799
2800 Long64_t rv = 0;
2801 if (fProof->IsMaster()) {
2802
2803 // Fill information for monitoring and stop it
2804 TStatus *status = (TStatus *) fOutput->FindObject("PROOF_Status");
2805 if (!status) {
2806 // The query was aborted: let's add some info in the output list
2807 status = new TStatus();
2808 fOutput->Add(status);
2809 TString emsg = TString::Format("Query aborted after %lld entries", GetEventsProcessed());
2810 status->Add(emsg);
2811 }
2812 status->SetExitStatus((Int_t) GetExitStatus());
2813
2814 PDB(kOutput,1) Info("Finalize","Calling Merge Output");
2815 // Some objects (e.g. histos in autobin) may not have been merged yet
2816 // do it now
2817 MergeOutput();
2818
2819 fOutput->SetOwner();
2820
2821 // Add the active-wrks-vs-proctime info from the packetizer
2822 if (fPacketizer) {
2824 if (pperf) fOutput->Add(pperf);
2826 if (parms) {
2827 TIter nxo(parms);
2828 TObject *o = 0;
2829 while ((o = nxo())) fOutput->Add(o);
2830 }
2831
2832 // If other invalid elements were found during processing, add them to the
2833 // list of missing elements
2834 TDSetElement *elem = 0;
2837 TList *listOfMissingFiles = (TList *) fOutput->FindObject("MissingFiles");
2838 if (!listOfMissingFiles) {
2839 listOfMissingFiles = new TList;
2840 listOfMissingFiles->SetName("MissingFiles");
2841 }
2843 while ((elem = (TDSetElement *)nxe()))
2844 listOfMissingFiles->Add(elem->GetFileInfo(type));
2845 if (!fOutput->FindObject(listOfMissingFiles)) fOutput->Add(listOfMissingFiles);
2846 }
2847 }
2848
2850 // Save memory usage on master
2851 Long_t vmaxmst, rmaxmst;
2852 TPerfStats::GetMemValues(vmaxmst, rmaxmst);
2853 status->SetMemValues(vmaxmst, rmaxmst, kTRUE);
2854
2856
2857 } else {
2858 if (fExitStatus != kAborted) {
2859
2860 if (!sync) {
2861 // Reinit selector (with multi-sessioning we must do this until
2862 // TSelector::GetSelector() is optimized to i) avoid reloading of an
2863 // unchanged selector and ii) invalidate existing instances of
2864 // reloaded selector)
2865 if (ReinitSelector(fQuery) == -1) {
2866 Info("Finalize", "problems reinitializing selector \"%s\"",
2867 fQuery->GetSelecImp()->GetName());
2868 return -1;
2869 }
2870 }
2871
2872 if (fPacketizer)
2873 if (TList *failedPackets = fPacketizer->GetFailedPackets()) {
2875 failedPackets->SetName("FailedPackets");
2876 AddOutputObject(failedPackets);
2877
2878 TStatus *status = (TStatus *)GetOutput("PROOF_Status");
2879 if (!status) AddOutputObject((status = new TStatus()));
2880 status->Add("Some packets were not processed! Check the the"
2881 " 'FailedPackets' list in the output list");
2882 }
2883
2884 // Some input parameters may be needed in Terminate
2886
2888 if (output) {
2889 TIter next(fOutput);
2890 while(TObject* obj = next()) {
2891 if (fProof->IsParallel() || DrawCanvas(obj) == 1)
2892 // Either parallel or not a canvas or not able to display it:
2893 // just add to the list
2894 output->Add(obj);
2895 }
2896 } else {
2897 Warning("Finalize", "undefined output list in the selector! Protocol error?");
2898 }
2899
2900 // We need to do this because the output list can be modified in TSelector::Terminate
2901 // in a way to invalidate existing objects; so we clean the links when still valid and
2902 // we re-copy back later
2904 fOutput->Clear("nodelete");
2905
2906 // Map output objects to selector members
2908
2909 PDB(kLoop,1) Info("Finalize","Call Terminate()");
2910 // This is the end of merging
2912 // We measure the merge time
2914 // Call Terminate now
2916
2917 rv = fSelector->GetStatus();
2918
2919 // Copy the output list back and clean the selector's list
2920 TIter it(output);
2921 while(TObject* o = it()) {
2922 fOutput->Add(o);
2923 }
2924
2925 // Save the output list in the current query, if any
2926 if (fQuery) {
2928 // Set in finalized state (cannot be done twice)
2930 } else {
2931 Warning("Finalize","current TQueryResult object is undefined!");
2932 }
2933
2934 if (!fCreateSelObj) {
2937 if (output) output->Remove(fSelector);
2938 fSelector = 0;
2939 }
2940
2941 // We have transferred copy of the output objects in TQueryResult,
2942 // so now we can cleanup the selector, making sure that we do not
2943 // touch the output objects
2944 if (output) { output->SetOwner(kFALSE); output->Clear("nodelete"); }
2946
2947 // Delete fOutput (not needed anymore, cannot be finalized twice),
2948 // making sure that the objects saved in TQueryResult are not deleted
2950 fOutput->Clear("nodelete");
2952
2953 } else {
2954
2955 // Cleanup
2956 fOutput->SetOwner();
2958 if (!fCreateSelObj) fSelector = 0;
2959 }
2960 }
2961 PDB(kGlobal,1) Info("Process","exit");
2962
2963 if (!IsClient()) {
2964 Info("Finalize", "finalization on %s finished", gProofServ->GetPrefix());
2965 }
2967
2968 return rv;
2969}
2970
2971////////////////////////////////////////////////////////////////////////////////
2972/// Finalize the results of a query already processed.
2973
2975{
2976 PDB(kGlobal,1) Info("Finalize(TQueryResult *)","Enter");
2977
2978 if (!IsClient()) {
2979 Info("Finalize(TQueryResult *)",
2980 "method to be executed only on the clients");
2981 return -1;
2982 }
2983
2984 if (!qr) {
2985 Info("Finalize(TQueryResult *)", "query undefined");
2986 return -1;
2987 }
2988
2989 if (qr->IsFinalized()) {
2990 Info("Finalize(TQueryResult *)", "query already finalized");
2991 return -1;
2992 }
2993
2994 // Reset the list
2995 if (!fOutput)
2996 fOutput = new THashList;
2997 else
2998 fOutput->Clear();
2999
3000 // Make sure that the temporary output list is empty
3001 if (fOutputLists) {
3003 delete fOutputLists;
3004 fOutputLists = 0;
3005 }
3006
3007 // Re-init the selector
3009
3010 // Import the output list
3011 TList *tmp = (TList *) qr->GetOutputList();
3012 if (!tmp) {
3014 Info("Finalize(TQueryResult *)", "outputlist is empty");
3015 return -1;
3016 }
3017 TList *out = fOutput;
3018 if (fProof->fProtocol < 11)
3019 out = new TList;
3020 TIter nxo(tmp);
3021 TObject *o = 0;
3022 while ((o = nxo()))
3023 out->Add(o->Clone());
3024
3025 // Adopts the list
3026 if (fProof->fProtocol < 11) {
3027 out->SetOwner();
3028 StoreOutput(out);
3029 }
3031
3033
3034 // Finalize it
3035 SetCurrentQuery(qr);
3036 Long64_t rc = Finalize();
3038
3039 return rc;
3040}
3041
3042////////////////////////////////////////////////////////////////////////////////
3043/// Send the selector file(s) to master or worker nodes.
3044
3046{
3047 // Check input
3048 if (!selector_file) {
3049 Info("SendSelector", "Invalid input: selector (file) name undefined");
3050 return kFALSE;
3051 }
3052
3053 if (!strchr(gSystem->BaseName(selector_file), '.')) {
3054 if (gDebug > 1)
3055 Info("SendSelector", "selector name '%s' does not contain a '.':"
3056 " nothing to send, it will be loaded from a library", selector_file);
3057 return kTRUE;
3058 }
3059
3060 // Extract the fine name first
3061 TString selec = selector_file;
3062 TString aclicMode;
3063 TString arguments;
3064 TString io;
3065 selec = gSystem->SplitAclicMode(selec, aclicMode, arguments, io);
3066
3067 // Expand possible envs or '~'
3068 gSystem->ExpandPathName(selec);
3069
3070 // Update the macro path
3072 TString np(gSystem->DirName(selec));
3073 if (!np.IsNull()) {
3074 np += ":";
3075 if (!mp.BeginsWith(np) && !mp.Contains(":"+np)) {
3076 Int_t ip = (mp.BeginsWith(".:")) ? 2 : 0;
3077 mp.Insert(ip, np);
3079 if (gDebug > 0)
3080 Info("SendSelector", "macro path set to '%s'", TROOT::GetMacroPath());
3081 }
3082 }
3083
3084 // Header file
3085 TString header = selec;
3086 header.Remove(header.Last('.'));
3087 header += ".h";
3088 if (gSystem->AccessPathName(header, kReadPermission)) {
3089 TString h = header;
3090 header.Remove(header.Last('.'));
3091 header += ".hh";
3092 if (gSystem->AccessPathName(header, kReadPermission)) {
3093 Info("SendSelector",
3094 "header file not found: tried: %s %s", h.Data(), header.Data());
3095 return kFALSE;
3096 }
3097 }
3098
3099 // Send files now
3101 Info("SendSelector", "problems sending implementation file %s", selec.Data());
3102 return kFALSE;
3103 }
3104 if (fProof->SendFile(header, (TProof::kBinary | TProof::kForward | TProof::kCp)) == -1) {
3105 Info("SendSelector", "problems sending header file %s", header.Data());
3106 return kFALSE;
3107 }
3108
3109 return kTRUE;
3110}
3111
3112////////////////////////////////////////////////////////////////////////////////
3113/// Merge objects in output the lists.
3114
3116{
3117 PDB(kOutput,1) Info("MergeOutput","Enter");
3118
3119 TObject *obj = 0;
3120 if (fOutputLists) {
3121
3122 TIter next(fOutputLists);
3123
3124 TList *list;
3125 while ( (list = (TList *) next()) ) {
3126
3127 if (!(obj = fOutput->FindObject(list->GetName()))) {
3128 obj = list->First();
3129 list->Remove(obj);
3130 fOutput->Add(obj);
3131 }
3132
3133 if ( list->IsEmpty() ) continue;
3134
3135 TMethodCall callEnv;
3136 if (obj->IsA())
3137 callEnv.InitWithPrototype(obj->IsA(), "Merge", "TCollection*");
3138 if (callEnv.IsValid()) {
3139 callEnv.SetParam((Long_t) list);
3140 callEnv.Execute(obj);
3141 } else {
3142 // No Merge interface, return individual objects
3143 while ( (obj = list->First()) ) {
3144 fOutput->Add(obj);
3145 list->Remove(obj);
3146 }
3147 }
3148 }
3150
3151 } else {
3152
3153 PDB(kOutput,1) Info("MergeOutput","fOutputLists empty");
3154 }
3155
3156 if (!IsClient() || fProof->IsLite()) {
3157 // Merge the output files created on workers, if any
3159 }
3160
3161 // If there are TProofOutputFile objects we have to make sure that the internal
3162 // information is consistent for the cases where this object is going to be merged
3163 // again (e.g. when using submergers or in a multi-master setup). This may not be
3164 // the case because the first coming in is taken as reference and it has the
3165 // internal dir and raw dir of the originating worker.
3166 TString key;
3167 TNamed *nm = 0;
3168 TList rmlist;
3169 TIter nxo(fOutput);
3170 while ((obj = nxo())) {
3171 TProofOutputFile *pf = dynamic_cast<TProofOutputFile *>(obj);
3172 if (pf) {
3173 if (gProofServ) {
3174 PDB(kOutput,2) Info("MergeOutput","found TProofOutputFile '%s'", obj->GetName());
3175 TString dir(pf->GetOutputFileName());
3176 PDB(kOutput,2) Info("MergeOutput","outputfilename: '%s'", dir.Data());
3177 // The dir
3178 if (dir.Last('/') != kNPOS) dir.Remove(dir.Last('/')+1);
3179 PDB(kOutput,2) Info("MergeOutput","dir: '%s'", dir.Data());
3180 pf->SetDir(dir);
3181 // The raw dir; for xrootd based system we include the 'localroot', if any
3182 TUrl u(dir);
3183 dir = u.GetFile();
3184 TString pfx = gEnv->GetValue("Path.Localroot","");
3185 if (!pfx.IsNull() &&
3186 (!strcmp(u.GetProtocol(), "root") || !strcmp(u.GetProtocol(), "xrd")))
3187 dir.Insert(0, pfx);
3188 PDB(kOutput,2) Info("MergeOutput","rawdir: '%s'", dir.Data());
3189 pf->SetDir(dir, kTRUE);
3190 // The worker ordinal
3192 // The saved output file name, if any
3193 key.Form("PROOF_OutputFileName_%s", pf->GetFileName());
3194 if ((nm = (TNamed *) fOutput->FindObject(key.Data()))) {
3195 pf->SetOutputFileName(nm->GetTitle());
3196 rmlist.Add(nm);
3198 pf->SetOutputFileName(0);
3200 }
3201 // The filename (order is important to exclude '.merger' from the key)
3202 dir = pf->GetFileName();
3204 dir += ".merger";
3205 pf->SetMerged(kFALSE);
3206 } else {
3207 if (dir.EndsWith(".merger")) dir.Remove(dir.Last('.'));
3208 }
3209 pf->SetFileName(dir);
3210 } else if (fProof->IsLite()) {
3211 // The ordinal
3212 pf->SetWorkerOrdinal("0");
3213 // The dir
3215 // The filename and raw dir
3216 TUrl u(pf->GetOutputFileName(), kTRUE);
3218 pf->SetDir(gSystem->DirName(u.GetFile()), kTRUE);
3219 // Notify the output path
3220 Printf("\nOutput file: %s", pf->GetOutputFileName());
3221 }
3222 } else {
3223 PDB(kOutput,2) Info("MergeOutput","output object '%s' is not a TProofOutputFile", obj->GetName());
3224 }
3225 }
3226
3227 // Remove temporary objects from fOutput
3228 if (rmlist.GetSize() > 0) {
3229 TIter nxrm(&rmlist);
3230 while ((obj = nxrm()))
3231 fOutput->Remove(obj);
3232 rmlist.SetOwner(kTRUE);
3233 }
3234
3235 // If requested (typically in case of submerger to count possible side-effects in that process)
3236 // save the measured memory usage
3237 if (saveMemValues) {
3239 // Save memory usage on master
3240 Long_t vmaxmst, rmaxmst;
3241 TPerfStats::GetMemValues(vmaxmst, rmaxmst);
3242 TStatus *status = (TStatus *) fOutput->FindObject("PROOF_Status");
3243 if (status) status->SetMemValues(vmaxmst, rmaxmst, kFALSE);
3244 }
3245
3246 PDB(kOutput,1) fOutput->Print();
3247 PDB(kOutput,1) Info("MergeOutput","leave (%d object(s))", fOutput->GetSize());
3248}
3249
3250////////////////////////////////////////////////////////////////////////////////
3251/// Progress signal.
3252
3254{
3255 if (IsClient()) {
3256 fProof->Progress(total, processed);
3257 } else {
3258 // Send to the previous tier
3260 m << total << processed;
3262 }
3263}
3264
3265////////////////////////////////////////////////////////////////////////////////
3266/// Progress signal.
3267
3269 Long64_t bytesread,
3270 Float_t initTime, Float_t procTime,
3271 Float_t evtrti, Float_t mbrti)
3272{
3273 PDB(kGlobal,1)
3274 Info("Progress","%lld %lld %lld %f %f %f %f", total, processed, bytesread,
3275 initTime, procTime, evtrti, mbrti);
3276
3277 if (IsClient()) {
3278 fProof->Progress(total, processed, bytesread, initTime, procTime, evtrti, mbrti);
3279 } else {
3280 // Send to the previous tier
3282 m << total << processed << bytesread << initTime << procTime << evtrti << mbrti;
3284 }
3285}
3286
3287////////////////////////////////////////////////////////////////////////////////
3288/// Progress signal.
3289
3291{
3292 if (pi) {
3293 PDB(kGlobal,1)
3294 Info("Progress","%lld %lld %lld %f %f %f %f %d %f", pi->fTotal, pi->fProcessed, pi->fBytesRead,
3295 pi->fInitTime, pi->fProcTime, pi->fEvtRateI, pi->fMBRateI,
3296 pi->fActWorkers, pi->fEffSessions);
3297
3298 if (IsClient()) {
3299 fProof->Progress(pi->fTotal, pi->fProcessed, pi->fBytesRead,
3300 pi->fInitTime, pi->fProcTime,
3301 pi->fEvtRateI, pi->fMBRateI,
3302 pi->fActWorkers, pi->fTotSessions, pi->fEffSessions);
3303 } else {
3304 // Send to the previous tier
3306 m << pi;
3308 }
3309 } else {
3310 Warning("Progress","TProofProgressInfo object undefined!");
3311 }
3312}
3313
3314
3315////////////////////////////////////////////////////////////////////////////////
3316/// Feedback signal.
3317
3319{
3320 fProof->Feedback(objs);
3321}
3322
3323////////////////////////////////////////////////////////////////////////////////
3324/// Stop process after this event.
3325
3327{
3328 if (fPacketizer != 0)
3330 if (abort == kTRUE)
3332 else
3334}
3335
3336////////////////////////////////////////////////////////////////////////////////
3337/// Incorporate the received object 'obj' into the output list fOutput.
3338/// The latter is created if not existing.
3339/// This method short cuts 'StoreOutput + MergeOutput' optimizing the memory
3340/// consumption.
3341/// Returns -1 in case of error, 1 if the object has been merged into another
3342/// one (so that its ownership has not been taken and can be deleted), and 0
3343/// otherwise.
3344
3346{
3347 PDB(kOutput,1)
3348 Info("AddOutputObject","Enter: %p (%s)", obj, obj ? obj->ClassName() : "undef");
3349
3350 // We must something to process
3351 if (!obj) {
3352 PDB(kOutput,1) Info("AddOutputObject","Invalid input (obj == 0x0)");
3353 return -1;
3354 }
3355
3356 // Create the output list, if not yet done
3357 if (!fOutput)
3358 fOutput = new THashList;
3359
3360 // Flag about merging
3361 Bool_t merged = kTRUE;
3362
3363 // Process event lists first
3364 TList *elists = dynamic_cast<TList *> (obj);
3365 if (elists && !strcmp(elists->GetName(), "PROOF_EventListsList")) {
3366
3367 // Create a global event list, result of merging the event lists
3368 // coresponding to the various data set elements
3369 TEventList *evlist = new TEventList("PROOF_EventList");
3370
3371 // Iterate the list of event list segments
3372 TIter nxevl(elists);
3373 TEventList *evl = 0;
3374 while ((evl = dynamic_cast<TEventList *> (nxevl()))) {
3375
3376 // Find the file offset (fDSet is the current TDSet instance)
3377 // locating the element by name
3378 TIter nxelem(fDSet->GetListOfElements());
3379 TDSetElement *elem = 0;
3380 while ((elem = dynamic_cast<TDSetElement *> (nxelem()))) {
3381 if (!strcmp(elem->GetFileName(), evl->GetName()))
3382 break;
3383 }
3384 if (!elem) {
3385 Error("AddOutputObject", "Found an event list for %s, but no object with"
3386 " the same name in the TDSet", evl->GetName());
3387 continue;
3388 }
3389 Long64_t offset = elem->GetTDSetOffset();
3390
3391 // Shift the list by the number of first event in that file
3392 Long64_t *arr = evl->GetList();
3393 Int_t num = evl->GetN();
3394 if (arr && offset > 0)
3395 for (Int_t i = 0; i < num; i++)
3396 arr[i] += offset;
3397
3398 // Add to the global event list
3399 evlist->Add(evl);
3400 }
3401
3402 // Incorporate the resulting global list in fOutput
3403 SetLastMergingMsg(evlist);
3404 Incorporate(evlist, fOutput, merged);
3405 NotifyMemory(evlist);
3406
3407 // Delete the global list if merged
3408 if (merged)
3409 SafeDelete(evlist);
3410
3411 // The original object has been transformed in something else; we do
3412 // not have ownership on it
3413 return 1;
3414 }
3415
3416 // Check if we need to merge files
3417 TProofOutputFile *pf = dynamic_cast<TProofOutputFile*>(obj);
3418 if (pf) {
3420 if (!IsClient() || fProof->IsLite()) {
3421 if (pf->IsMerge()) {
3422 Bool_t hasfout = (pf->GetOutputFileName() &&
3423 strlen(pf->GetOutputFileName()) > 0 &&
3425 Bool_t setfout = (!hasfout || TestBit(TVirtualProofPlayer::kIsSubmerger)) ? kTRUE : kFALSE;
3426 if (setfout) {
3427
3428 TString ddir, ddopts;
3429 if (gProofServ) {
3430 ddir.Form("%s/", gProofServ->GetDataDir());
3432 }
3433 // Set the output file
3434 TString outfile(pf->GetOutputFileName());
3435 outfile.ReplaceAll("<datadir>/", ddir.Data());
3436 if (!ddopts.IsNull()) outfile += TString::Format("?%s", ddopts.Data());
3437 pf->SetOutputFileName(outfile);
3438
3439 if (gProofServ) {
3440 // If submerger, save first the existing filename, if any
3442 TString key = TString::Format("PROOF_OutputFileName_%s", pf->GetFileName());
3443 if (!fOutput->FindObject(key.Data()))
3444 fOutput->Add(new TNamed(key.Data(), pf->GetOutputFileName()));
3445 }
3446 TString of;
3448 if (of.IsNull()) {
3449 // Assume an xroot server running on the machine
3450 of.Form("root://%s/", gSystem->HostName());
3451 if (gSystem->Getenv("XRDPORT")) {
3452 TString sp(gSystem->Getenv("XRDPORT"));
3453 if (sp.IsDigit())
3454 of.Form("root://%s:%s/", gSystem->HostName(), sp.Data());
3455 }
3456 }
3457 TString sessionPath(gProofServ->GetSessionDir());
3458 TProofServ::FilterLocalroot(sessionPath, of);
3459 of += TString::Format("%s/%s", sessionPath.Data(), pf->GetFileName());
3461 if (!of.EndsWith(".merger")) of += ".merger";
3462 } else {
3463 if (of.EndsWith(".merger")) of.Remove(of.Last('.'));
3464 }
3465 pf->SetOutputFileName(of);
3466 }
3467 }
3468 // Notify
3469 PDB(kOutput, 1) pf->Print();
3470 }
3471 } else {
3472 // On clients notify the output path
3473 Printf("Output file: %s", pf->GetOutputFileName());
3474 }
3475 }
3476
3477 // For other objects we just run the incorporation procedure
3478 SetLastMergingMsg(obj);
3479 Incorporate(obj, fOutput, merged);
3480 NotifyMemory(obj);
3481
3482 // We are done
3483 return (merged ? 1 : 0);
3484}
3485
3486////////////////////////////////////////////////////////////////////////////////
3487/// Control output redirection to TProof::fLogFileW
3488
3490{
3491 if (on && fProof && fProof->fLogFileW) {
3494 } else if (!on) {
3495 if (fErrorHandler) {
3498 }
3499 }
3500}
3501
3502////////////////////////////////////////////////////////////////////////////////
3503/// Incorporate the content of the received output list 'out' into the final
3504/// output list fOutput. The latter is created if not existing.
3505/// This method short cuts 'StoreOutput + MergeOutput' limiting the memory
3506/// consumption.
3507
3509{
3510 PDB(kOutput,1) Info("AddOutput","Enter");
3511
3512 // We must something to process
3513 if (!out) {
3514 PDB(kOutput,1) Info("AddOutput","Invalid input (out == 0x0)");
3515 return;
3516 }
3517
3518 // Create the output list, if not yet done
3519 if (!fOutput)
3520 fOutput = new THashList;
3521
3522 // Process event lists first
3523 Bool_t merged = kTRUE;
3524 TList *elists = dynamic_cast<TList *> (out->FindObject("PROOF_EventListsList"));
3525 if (elists) {
3526
3527 // Create a global event list, result of merging the event lists
3528 // corresponding to the various data set elements
3529 TEventList *evlist = new TEventList("PROOF_EventList");
3530
3531 // Iterate the list of event list segments
3532 TIter nxevl(elists);
3533 TEventList *evl = 0;
3534 while ((evl = dynamic_cast<TEventList *> (nxevl()))) {
3535
3536 // Find the file offset (fDSet is the current TDSet instance)
3537 // locating the element by name
3538 TIter nxelem(fDSet->GetListOfElements());
3539 TDSetElement *elem = 0;
3540 while ((elem = dynamic_cast<TDSetElement *> (nxelem()))) {
3541 if (!strcmp(elem->GetFileName(), evl->GetName()))
3542 break;
3543 }
3544 if (!elem) {
3545 Error("AddOutput", "Found an event list for %s, but no object with"
3546 " the same name in the TDSet", evl->GetName());
3547 continue;
3548 }
3549 Long64_t offset = elem->GetTDSetOffset();
3550
3551 // Shift the list by the number of first event in that file
3552 Long64_t *arr = evl->GetList();
3553 Int_t num = evl->GetN();
3554 if (arr && offset > 0)
3555 for (Int_t i = 0; i < num; i++)
3556 arr[i] += offset;
3557
3558 // Add to the global event list
3559 evlist->Add(evl);
3560 }
3561
3562 // Remove and delete the events lists object to avoid spoiling iteration
3563 // during next steps
3564 out->Remove(elists);
3565 delete elists;
3566
3567 // Incorporate the resulting global list in fOutput
3568 SetLastMergingMsg(evlist);
3569 Incorporate(evlist, fOutput, merged);
3570 NotifyMemory(evlist);
3571 }
3572
3573 // Iterate on the remaining objects in the received list
3574 TIter nxo(out);
3575 TObject *obj = 0;
3576 while ((obj = nxo())) {
3577 SetLastMergingMsg(obj);
3578 Incorporate(obj, fOutput, merged);
3579 // If not merged, drop from the temporary list, as the ownership
3580 // passes to fOutput
3581 if (!merged)
3582 out->Remove(obj);
3583 NotifyMemory(obj);
3584 }
3585
3586 // Done
3587 return;
3588}
3589
3590////////////////////////////////////////////////////////////////////////////////
3591/// Printout the memory record after merging object 'obj'
3592/// This record is used by the memory monitor
3593
3595{
3596 if (fProof && (!IsClient() || fProof->IsLite())){
3597 ProcInfo_t pi;
3598 if (!gSystem->GetProcInfo(&pi)){
3599 // For PROOF-Lite we redirect this output to a the open log file so that the
3600 // memory monitor can pick these messages up
3602 Info("NotifyMemory|Svc", "Memory %ld virtual %ld resident after merging object %s",
3603 pi.fMemVirtual, pi.fMemResident, obj->GetName());
3604 RedirectOutput(0);
3605 }
3606 // Record also values for monitoring
3608 }
3609}
3610
3611////////////////////////////////////////////////////////////////////////////////
3612/// Set the message to be notified in case of exception
3613
3615{
3616 TString lastMsg = TString::Format("while merging object '%s'", obj->GetName());
3617 TProofServ::SetLastMsg(lastMsg);
3618}
3619
3620////////////////////////////////////////////////////////////////////////////////
3621/// Incorporate object 'newobj' in the list 'outlist'.
3622/// The object is merged with an object of the same name already existing in
3623/// the list, or just added.
3624/// The boolean merged is set to kFALSE when the object is just added to 'outlist';
3625/// this happens if the Merge() method does not exist or if a object named as 'obj'
3626/// is not already in the list. If the obj is not 'merged' than it should not be
3627/// deleted, unless outlist is not owner of its objects.
3628/// Return 0 on success, -1 on error.
3629
3631{
3632 merged = kTRUE;
3633
3634 PDB(kOutput,1)
3635 Info("Incorporate", "enter: obj: %p (%s), list: %p",
3636 newobj, newobj ? newobj->ClassName() : "undef", outlist);
3637
3638 // The object and list must exist
3639 if (!newobj || !outlist) {
3640 Error("Incorporate","Invalid inputs: obj: %p, list: %p", newobj, outlist);
3641 return -1;
3642 }
3643
3644 // Special treatment for histograms in autobin mode
3645 Bool_t specialH =
3647 if (specialH && newobj->InheritsFrom(TH1::Class())) {
3648 if (!HandleHistogram(newobj, merged)) {
3649 if (merged) {
3650 PDB(kOutput,1) Info("Incorporate", "histogram object '%s' merged", newobj->GetName());
3651 } else {
3652 PDB(kOutput,1) Info("Incorporate", "histogram object '%s' added to the"
3653 " appropriate list for delayed merging", newobj->GetName());
3654 }
3655 return 0;
3656 }
3657 }
3658
3659 // Check if an object with the same name exists already
3660 TObject *obj = outlist->FindObject(newobj->GetName());
3661
3662 // If no, add the new object and return
3663 if (!obj) {
3664 outlist->Add(newobj);
3665 merged = kFALSE;
3666 // Done
3667 return 0;
3668 }
3669
3670 // Locate the Merge(TCollection *) method
3671 TMethodCall callEnv;
3672 if (obj->IsA())
3673 callEnv.InitWithPrototype(obj->IsA(), "Merge", "TCollection*");
3674 if (callEnv.IsValid()) {
3675 // Found: put the object in a one-element list
3676 static TList *xlist = new TList;
3677 xlist->Add(newobj);
3678 // Call the method
3679 callEnv.SetParam((Long_t) xlist);
3680 callEnv.Execute(obj);
3681 // Ready for next call
3682 xlist->Clear();
3683 } else {
3684 // Not found: return individual objects
3685 outlist->Add(newobj);
3686 merged = kFALSE;
3687 }
3688
3689 // Done
3690 return 0;
3691}
3692
3693////////////////////////////////////////////////////////////////////////////////
3694/// Low statistic histograms need a special treatment when using autobin
3695
3697{
3698 TH1 *h = dynamic_cast<TH1 *>(obj);
3699 if (!h) {
3700 // Not an histo
3701 return obj;
3702 }
3703
3704 // This is only used if we return (TObject *)0 and there is only one case
3705 // when we set this to kTRUE
3706 merged = kFALSE;
3707
3708 // Does is still needs binning ?
3709 Bool_t tobebinned = (h->GetBuffer()) ? kTRUE : kFALSE;
3710
3711 // Number of entries
3712 Int_t nent = h->GetBufferLength();
3713 PDB(kOutput,2) Info("HandleHistogram", "h:%s ent:%d, buffer size: %d",
3714 h->GetName(), nent, h->GetBufferSize());
3715
3716 // Attach to the list in the outputlists, if any
3717 TList *list = 0;
3718 if (!fOutputLists) {
3719 PDB(kOutput,2) Info("HandleHistogram", "create fOutputLists");
3720 fOutputLists = new TList;
3722 }
3723 list = (TList *) fOutputLists->FindObject(h->GetName());
3724
3725 TH1 *href = 0;
3726 if (tobebinned) {
3727
3728 // The histogram needs to be projected in a reasonable range: we
3729 // do this at the end with all the histos, so we need to create
3730 // a list here
3731 if (!list) {
3732 // Create the list
3733 list = new TList;
3734 list->SetName(h->GetName());
3735 list->SetOwner();
3736 fOutputLists->Add(list);
3737 // Move in it any previously merged object from the output list
3738 if (fOutput && (href = (TH1 *) fOutput->FindObject(h->GetName()))) {
3739 fOutput->Remove(href);
3740 list->Add(href);
3741 }
3742 }
3743 TIter nxh(list);
3744 while ((href = (TH1 *) nxh())) {
3745 if (href->GetBuffer() && href->GetBufferLength() < nent) break;
3746 }
3747 if (href) {
3748 list->AddBefore(href, h);
3749 } else {
3750 list->Add(h);
3751 }
3752 // Done
3753 return (TObject *)0;
3754
3755 } else {
3756
3757 if (list) {
3758 TIter nxh(list);
3759 while ((href = (TH1 *) nxh())) {
3760 if (href->GetBuffer() || href->GetEntries() < nent) break;
3761 }
3762 if (href) {
3763 list->AddBefore(href, h);
3764 } else {
3765 list->Add(h);
3766 }
3767 // Done
3768 return (TObject *)0;
3769
3770 } else {
3771 // Check if we can 'Add' the histogram to an existing one; this is more efficient
3772 // then using Merge
3773 TH1 *hout = (TH1*) fOutput->FindObject(h->GetName());
3774 if (hout) {
3775 // Remove the existing histo from the output list ...
3776 fOutput->Remove(hout);
3777 // ... and create either the list to merge in one-go at the end
3778 // (more efficient than merging one by one) or, if too big, merge
3779 // these two and start the 'one-by-one' technology
3780 Int_t hsz = h->GetNbinsX() * h->GetNbinsY() * h->GetNbinsZ();
3782 list = new TList;
3783 list->Add(hout);
3784 h->Merge(list);
3785 list->SetOwner();
3786 delete list;
3787 return h;
3788 } else {
3789 list = new TList;
3790 list->SetName(h->GetName());
3791 list->SetOwner();
3792 fOutputLists->Add(list);
3793 // Add the existing and the incoming histos
3794 list->Add(hout);
3795 list->Add(h);
3796 // Done
3797 return (TObject *)0;
3798 }
3799 } else {
3800 // This is the first one; add it to the output list
3801 fOutput->Add(h);
3802 return (TObject *)0;
3803 }
3804 }
3805 }
3806}
3807
3808////////////////////////////////////////////////////////////////////////////////
3809/// Return kTRUE is the histograms 'h0' and 'h1' have the same binning and ranges
3810/// on the axis (i.e. if they can be just Add-ed for merging).
3811
3813{
3814 Bool_t rc = kFALSE;
3815 if (!h0 || !h1) return rc;
3816
3817 TAxis *a0 = 0, *a1 = 0;
3818
3819 // Check X
3820 a0 = h0->GetXaxis();
3821 a1 = h1->GetXaxis();
3822 if (a0->GetNbins() == a1->GetNbins())
3823 if (TMath::Abs(a0->GetXmax() - a1->GetXmax()) < 1.e-9)
3824 if (TMath::Abs(a0->GetXmin() - a1->GetXmin()) < 1.e-9) rc = kTRUE;
3825
3826 // Check Y, if needed
3827 if (h0->GetDimension() > 1) {
3828 rc = kFALSE;
3829 a0 = h0->GetYaxis();
3830 a1 = h1->GetYaxis();
3831 if (a0->GetNbins() == a1->GetNbins())
3832 if (TMath::Abs(a0->GetXmax() - a1->GetXmax()) < 1.e-9)
3833 if (TMath::Abs(a0->GetXmin() - a1->GetXmin()) < 1.e-9) rc = kTRUE;
3834 }
3835
3836 // Check Z, if needed
3837 if (h0->GetDimension() > 2) {
3838 rc = kFALSE;
3839 a0 = h0->GetZaxis();
3840 a1 = h1->GetZaxis();
3841 if (a0->GetNbins() == a1->GetNbins())
3842 if (TMath::Abs(a0->GetXmax() - a1->GetXmax()) < 1.e-9)
3843 if (TMath::Abs(a0->GetXmin() - a1->GetXmin()) < 1.e-9) rc = kTRUE;
3844 }
3845
3846 // Done
3847 return rc;
3848}
3849
3850////////////////////////////////////////////////////////////////////////////////
3851/// Store received output list.
3852
3854{
3855 PDB(kOutput,1) Info("StoreOutput","Enter");
3856
3857 if ( out == 0 ) {
3858 PDB(kOutput,1) Info("StoreOutput","Leave (empty)");
3859 return;
3860 }
3861
3862 TIter next(out);
3863 out->SetOwner(kFALSE); // take ownership of the contents
3864
3865 if (fOutputLists == 0) {
3866 PDB(kOutput,2) Info("StoreOutput","Create fOutputLists");
3867 fOutputLists = new TList;
3869 }
3870 // process eventlists first
3871 TList* lists = dynamic_cast<TList*> (out->FindObject("PROOF_EventListsList"));
3872 if (lists) {
3873 out->Remove(lists);
3874 TEventList *mainList = new TEventList("PROOF_EventList");
3875 out->Add(mainList);
3876 TIter it(lists);
3877 TEventList *aList;
3878 while ( (aList = dynamic_cast<TEventList*> (it())) ) {
3879 // find file offset
3881 TDSetElement *elem;
3882 while ( (elem = dynamic_cast<TDSetElement*> (nxe())) ) {
3883 if (strcmp(elem->GetFileName(), aList->GetName()) == 0)
3884 break;
3885 }
3886 if (!elem) {
3887 Error("StoreOutput", "found the EventList for %s, but no object with that name "
3888 "in the TDSet", aList->GetName());
3889 continue;
3890 }
3891 Long64_t offset = elem->GetTDSetOffset();
3892
3893 // shift the list by the number of first event in that file
3894 Long64_t *arr = aList->GetList();
3895 Int_t num = aList->GetN();
3896 if (arr && offset)
3897 for (int i = 0; i < num; i++)
3898 arr[i] += offset;
3899
3900 mainList->Add(aList); // add to the main list
3901 }
3902 delete lists;
3903 }
3904
3905 TObject *obj;
3906 while( (obj = next()) ) {
3907 PDB(kOutput,2) Info("StoreOutput","find list for '%s'", obj->GetName() );
3908
3909 TList *list = (TList *) fOutputLists->FindObject( obj->GetName() );
3910 if ( list == 0 ) {
3911 PDB(kOutput,2) Info("StoreOutput", "list for '%s' not found (creating)", obj->GetName());
3912 list = new TList;
3913 list->SetName( obj->GetName() );
3914 list->SetOwner();
3915 fOutputLists->Add( list );
3916 }
3917 list->Add( obj );
3918 }
3919
3920 delete out;
3921 PDB(kOutput,1) Info("StoreOutput", "leave");
3922}
3923
3924////////////////////////////////////////////////////////////////////////////////
3925/// Merge feedback lists.
3926
3928{
3929 PDB(kFeedback,1)
3930 Info("MergeFeedback","Enter");
3931
3932 if ( fFeedbackLists == 0 ) {
3933 PDB(kFeedback,1)
3934 Info("MergeFeedback","Leave (no output)");
3935 return 0;
3936 }
3937
3938 TList *fb = new TList; // collection of feedback objects
3939 fb->SetOwner();
3940
3941 TIter next(fFeedbackLists);
3942
3943 TMap *map;
3944 while ( (map = (TMap*) next()) ) {
3945
3946 PDB(kFeedback,2)
3947 Info("MergeFeedback", "map %s size: %d", map->GetName(), map->GetSize());
3948
3949 // turn map into list ...
3950
3951 TList *list = new TList;
3952 TIter keys(map);
3953
3954#ifndef R__TH1MERGEFIXED
3955 Int_t nbmx = -1;
3956 TObject *oref = 0;
3957#endif
3958 while ( TObject *key = keys() ) {
3959 TObject *o = map->GetValue(key);
3960 TH1 *h = dynamic_cast<TH1 *>(o);
3961#ifndef R__TH1MERGEFIXED
3962 // Temporary fix for to cope with the problem in TH1::Merge.
3963 // We need to use a reference histo the one with the largest number
3964 // of bins so that the histos from all submasters can be correctly
3965 // fit in
3966 if (h && !strncmp(o->GetName(),"PROOF_",6)) {
3967 if (h->GetNbinsX() > nbmx) {
3968 nbmx= h->GetNbinsX();
3969 oref = o;
3970 }
3971 }
3972#endif
3973 if (h) {
3974 TIter nxh(list);
3975 TH1 *href= 0;
3976 while ((href = (TH1 *)nxh())) {
3977 if (h->GetBuffer()) {
3978 if (href->GetBuffer() && href->GetBufferLength() < h->GetBufferLength()) break;
3979 } else {
3980 if (href->GetBuffer() || href->GetEntries() < h->GetEntries()) break;
3981 }
3982 }
3983 if (href) {
3984 list->AddBefore(href, h);
3985 } else {
3986 list->Add(h);
3987 }
3988 } else {
3989 list->Add(o);
3990 }
3991 }
3992
3993 // clone first object, remove from list
3994#ifdef R__TH1MERGEFIXED
3995 TObject *obj = list->First();
3996#else
3997 TObject *obj = (oref) ? oref : list->First();
3998#endif
3999 list->Remove(obj);
4000 obj = obj->Clone();
4001 fb->Add(obj);
4002
4003 if ( list->IsEmpty() ) {
4004 delete list;
4005 continue;
4006 }
4007
4008 // merge list with clone
4009 TMethodCall callEnv;
4010 if (obj->IsA())
4011 callEnv.InitWithPrototype(obj->IsA(), "Merge", "TCollection*");
4012 if (callEnv.IsValid()) {
4013 callEnv.SetParam((Long_t) list);
4014 callEnv.Execute(obj);
4015 } else {
4016 // No Merge interface, return copy of individual objects
4017 while ( (obj = list->First()) ) {
4018 fb->Add(obj->Clone());
4019 list->Remove(obj);
4020 }
4021 }
4022
4023 delete list;
4024 }
4025
4026 PDB(kFeedback,1)
4027 Info("MergeFeedback","Leave (%d object(s))", fb->GetSize());
4028
4029 return fb;
4030}
4031
4032////////////////////////////////////////////////////////////////////////////////
4033/// Store feedback results from the specified slave.
4034
4036{
4037 PDB(kFeedback,1)
4038 Info("StoreFeedback","Enter");
4039
4040 if ( out == 0 ) {
4041 PDB(kFeedback,1)
4042 Info("StoreFeedback","Leave (empty)");
4043 return;
4044 }
4045
4046 if ( IsClient() ) {
4047 // in client
4048 Feedback(out);
4049 delete out;
4050 return;
4051 }
4052
4053 if (fFeedbackLists == 0) {
4054 PDB(kFeedback,2) Info("StoreFeedback","Create fFeedbackLists");
4055 fFeedbackLists = new TList;
4057 }
4058
4059 TIter next(out);
4060 out->SetOwner(kFALSE); // take ownership of the contents
4061
4062 const char *ord = ((TSlave*) slave)->GetOrdinal();
4063
4064 TObject *obj;
4065 while( (obj = next()) ) {
4066 PDB(kFeedback,2)
4067 Info("StoreFeedback","%s: Find '%s'", ord, obj->GetName() );
4068 TMap *map = (TMap*) fFeedbackLists->FindObject(obj->GetName());
4069 if ( map == 0 ) {
4070 PDB(kFeedback,2)
4071 Info("StoreFeedback", "%s: map for '%s' not found (creating)", ord, obj->GetName());
4072 // Map must not be owner (ownership is with regards to the keys (only))
4073 map = new TMap;
4074 map->SetName(obj->GetName());
4075 fFeedbackLists->Add(map);
4076 } else {
4077 PDB(kFeedback,2)
4078 Info("StoreFeedback","%s: removing previous value", ord);
4079 if (map->GetValue(slave))
4080 delete map->GetValue(slave);
4081 map->Remove(slave);
4082 }
4083 map->Add(slave, obj);
4084 PDB(kFeedback,2)
4085 Info("StoreFeedback","%s: %s, size: %d", ord, obj->GetName(), map->GetSize());
4086 }
4087
4088 delete out;
4089 PDB(kFeedback,1)
4090 Info("StoreFeedback","Leave");
4091}
4092
4093////////////////////////////////////////////////////////////////////////////////
4094/// Setup reporting of feedback objects.
4095
4097{
4098 if (IsClient()) return; // Client does not need timer
4099
4100 fFeedback = (TList*) fInput->FindObject("FeedbackList");
4101
4102 PDB(kFeedback,1) Info("SetupFeedback","\"FeedbackList\" %sfound",
4103 fFeedback == 0 ? "NOT ":"");
4104
4105 if (fFeedback == 0 || fFeedback->GetSize() == 0) return;
4106
4107 // OK, feedback was requested, setup the timer
4109 fFeedbackPeriod = 2000;
4110 TProof::GetParameter(fInput, "PROOF_FeedbackPeriod", fFeedbackPeriod);
4111 fFeedbackTimer = new TTimer;
4114}
4115
4116////////////////////////////////////////////////////////////////////////////////
4117/// Stop reporting of feedback objects.
4118
4120{
4121 if (fFeedbackTimer == 0) return;
4122
4123 PDB(kFeedback,1) Info("StopFeedback","Stop Timer");
4124
4126}
4127
4128////////////////////////////////////////////////////////////////////////////////
4129/// Send feedback objects to client.
4130
4132{
4133 PDB(kFeedback,2) Info("HandleTimer","Entry");
4134
4135 if (fFeedbackTimer == 0) return kFALSE; // timer already switched off
4136
4137 // process local feedback objects
4138
4139 TList *fb = new TList;
4140 fb->SetOwner();
4141
4142 TIter next(fFeedback);
4143 while( TObjString *name = (TObjString*) next() ) {
4144 TObject *o = fOutput->FindObject(name->GetName());
4145 if (o != 0) {
4146 fb->Add(o->Clone());
4147 // remove the corresponding entry from the feedback list
4148 TMap *m = 0;
4149 if (fFeedbackLists &&
4150 (m = (TMap *) fFeedbackLists->FindObject(name->GetName()))) {
4152 m->DeleteValues();
4153 delete m;
4154 }
4155 }
4156 }
4157
4158 if (fb->GetSize() > 0) {
4159 StoreFeedback(this, fb); // adopts fb
4160 } else {
4161 delete fb;
4162 }
4163
4164 if (fFeedbackLists == 0) {
4165 fFeedbackTimer->Start(fFeedbackPeriod, kTRUE); // maybe next time
4166 return kFALSE;
4167 }
4168
4169 fb = MergeFeedback();
4170
4171 PDB(kFeedback,2) Info("HandleTimer","Sending %d objects", fb->GetSize());
4172
4174 m << fb;
4175
4176 // send message to client;
4178
4179 delete fb;
4180
4182
4183 return kFALSE; // ignored?
4184}
4185
4186////////////////////////////////////////////////////////////////////////////////
4187/// Get next packet for specified slave.
4188
4190{
4191 // The first call to this determines the end of initialization
4192 SetInitTime();
4193
4194 if (fProcPackets) {
4195 Int_t bin = fProcPackets->GetXaxis()->FindBin(slave->GetOrdinal());
4196 if (bin >= 0) {
4197 if (fProcPackets->GetBinContent(bin) > 0)
4198 fProcPackets->Fill(slave->GetOrdinal(), -1);
4199 }
4200 }
4201
4203
4204 if (e == 0) {
4205 PDB(kPacketizer,2)
4206 Info("GetNextPacket","%s: done!", slave->GetOrdinal());
4207 } else if (e == (TDSetElement*) -1) {
4208 PDB(kPacketizer,2)
4209 Info("GetNextPacket","%s: waiting ...", slave->GetOrdinal());
4210 } else {
4211 PDB(kPacketizer,2)
4212 Info("GetNextPacket","%s (%s): '%s' '%s' '%s' %lld %lld",
4213 slave->GetOrdinal(), slave->GetName(), e->GetFileName(),
4214 e->GetDirectory(), e->GetObjName(), e->GetFirst(), e->GetNum());
4215 if (fProcPackets) fProcPackets->Fill(slave->GetOrdinal(), 1);
4216 }
4217
4218 return e;
4219}
4220
4221////////////////////////////////////////////////////////////////////////////////
4222/// Is the player running on the client?
4223
4225{
4227}
4228
4229////////////////////////////////////////////////////////////////////////////////
4230/// Draw (support for TChain::Draw()).
4231/// Returns -1 in case of error or number of selected events in case of success.
4232
4234 const char *selection, Option_t *option,
4235 Long64_t nentries, Long64_t firstentry)
4236{
4237 if (!fgDrawInputPars) {
4239 fgDrawInputPars->Add(new TObjString("FeedbackList"));
4240 fgDrawInputPars->Add(new TObjString("PROOF_ChainWeight"));
4241 fgDrawInputPars->Add(new TObjString("PROOF_LineColor"));
4242 fgDrawInputPars->Add(new TObjString("PROOF_LineStyle"));
4243 fgDrawInputPars->Add(new TObjString("PROOF_LineWidth"));
4244 fgDrawInputPars->Add(new TObjString("PROOF_MarkerColor"));
4245 fgDrawInputPars->Add(new TObjString("PROOF_MarkerStyle"));
4246 fgDrawInputPars->Add(new TObjString("PROOF_MarkerSize"));
4247 fgDrawInputPars->Add(new TObjString("PROOF_FillColor"));
4248 fgDrawInputPars->Add(new TObjString("PROOF_FillStyle"));
4249 fgDrawInputPars->Add(new TObjString("PROOF_ListOfAliases"));
4250 }
4251
4252 TString selector, objname;
4253 if (GetDrawArgs(varexp, selection, option, selector, objname) != 0) {
4254 Error("DrawSelect", "parsing arguments");
4255 return -1;
4256 }
4257
4258 TNamed *varexpobj = new TNamed("varexp", varexp);
4259 TNamed *selectionobj = new TNamed("selection", selection);
4260
4261 // Save the current input list
4262 TObject *o = 0;
4263 TList *savedInput = new TList;
4264 TIter nxi(fInput);
4265 while ((o = nxi())) {
4266 savedInput->Add(o);
4267 TString n(o->GetName());
4268 if (fgDrawInputPars &&
4270 !n.BeginsWith("alias:")) fInput->Remove(o);
4271 }
4272
4273 fInput->Add(varexpobj);
4274 fInput->Add(selectionobj);
4275
4276 // Make sure we have an object name
4277 if (objname == "") objname = "htemp";
4278
4279 fProof->AddFeedback(objname);
4280 Long64_t r = Process(set, selector, option, nentries, firstentry);
4281 fProof->RemoveFeedback(objname);
4282
4283 fInput->Remove(varexpobj);
4284 fInput->Remove(selectionobj);
4285 if (TNamed *opt = dynamic_cast<TNamed*> (fInput->FindObject("PROOF_OPTIONS"))) {
4286 fInput->Remove(opt);
4287 delete opt;
4288 }
4289
4290 delete varexpobj;
4291 delete selectionobj;
4292
4293 // Restore the input list
4294 fInput->Clear();
4295 TIter nxsi(savedInput);
4296 while ((o = nxsi()))
4297 fInput->Add(o);
4298 savedInput->SetOwner(kFALSE);
4299 delete savedInput;
4300
4301 return r;
4302}
4303
4304////////////////////////////////////////////////////////////////////////////////
4305/// Set init time
4306
4308{
4309 if (fPacketizer)
4311}
4312
4313//------------------------------------------------------------------------------
4314
4315
4317
4318////////////////////////////////////////////////////////////////////////////////
4319/// Setup feedback.
4320
4322{
4323 TList *fb = (TList*) fInput->FindObject("FeedbackList");
4324 if (fb) {
4325 PDB(kFeedback,1)
4326 Info("SetupFeedback","\"FeedbackList\" found: %d objects", fb->GetSize());
4327 } else {
4328 PDB(kFeedback,1)
4329 Info("SetupFeedback","\"FeedbackList\" NOT found");
4330 }
4331
4332 if (fb == 0 || fb->GetSize() == 0) return;
4333
4334 // OK, feedback was requested, setup the timer
4335
4337 fFeedbackPeriod = 2000;
4338 TProof::GetParameter(fInput, "PROOF_FeedbackPeriod", fFeedbackPeriod);
4339 fFeedbackTimer = new TTimer;
4342
4343 fFeedback = fb;
4344}
4345
4346////////////////////////////////////////////////////////////////////////////////
4347/// Stop feedback.
4348
4350{
4351 if (fFeedbackTimer == 0) return;
4352
4353 PDB(kFeedback,1) Info("StopFeedback","Stop Timer");
4354
4356}
4357
4358////////////////////////////////////////////////////////////////////////////////
4359/// Handle timer event.
4360
4362{
4363 PDB(kFeedback,2) Info("HandleTimer","Entry");
4364
4365 // If in sequential (0-slave-PROOF) mode we do not have a packetizer
4366 // so we also send the info to update the progress bar.
4367 if (gProofServ) {
4368 Bool_t sendm = kFALSE;
4370 if (gProofServ->IsMaster() && !gProofServ->IsParallel()) {
4371 sendm = kTRUE;
4372 if (gProofServ->GetProtocol() > 25) {
4373 m << GetProgressStatus();
4374 } else if (gProofServ->GetProtocol() > 11) {
4376 m << fTotalEvents << ps->GetEntries() << ps->GetBytesRead()
4377 << (Float_t) -1. << (Float_t) ps->GetProcTime()
4378 << (Float_t) ps->GetRate() << (Float_t) -1.;
4379 } else {
4381 }
4382 }
4383 if (sendm) gProofServ->GetSocket()->Send(m);
4384 }
4385
4386 if (fFeedback == 0) return kFALSE;
4387
4388 TList *fb = new TList;
4389 fb->SetOwner(kFALSE);
4390
4391 if (fOutput == 0) {
4393 }
4394
4395 if (fOutput) {
4396 TIter next(fFeedback);
4397 while( TObjString *name = (TObjString*) next() ) {
4398 // TODO: find object in memory ... maybe allow only in fOutput ?
4399 TObject *o = fOutput->FindObject(name->GetName());
4400 if (o != 0) fb->Add(o);
4401 }
4402 }
4403
4404 PDB(kFeedback,2) Info("HandleTimer","Sending %d objects", fb->GetSize());
4405
4407 m << fb;
4408
4409 // send message to client;
4411
4412 delete fb;
4413
4415
4416 return kFALSE; // ignored?
4417}
4418
4419////////////////////////////////////////////////////////////////////////////////
4420/// Handle tree header request.
4421
4423{
4425
4426 TDSet *dset;
4427 (*mess) >> dset;
4428 dset->Reset();
4429 TDSetElement *e = dset->Next();
4430 Long64_t entries = 0;
4431 TFile *f = 0;
4432 TTree *t = 0;
4433 if (!e) {
4434 PDB(kGlobal, 1) Info("HandleGetTreeHeader", "empty TDSet");
4435 } else {
4436 f = TFile::Open(e->GetFileName());
4437 t = 0;
4438 if (f) {
4439 t = (TTree*) f->Get(e->GetObjName());
4440 if (t) {
4441 t->SetMaxVirtualSize(0);
4442 t->DropBaskets();
4443 entries = t->GetEntries();
4444
4445 // compute #entries in all the files
4446 while ((e = dset->Next()) != 0) {
4447 TFile *f1 = TFile::Open(e->GetFileName());
4448 if (f1) {
4449 TTree *t1 = (TTree*) f1->Get(e->GetObjName());
4450 if (t1) {
4451 entries += t1->GetEntries();
4452 delete t1;
4453 }
4454 delete f1;
4455 }
4456 }
4457 t->SetMaxEntryLoop(entries); // this field will hold the total number of entries ;)
4458 }
4459 }
4460 }
4461 if (t)
4462 answ << TString("Success") << t;
4463 else
4464 answ << TString("Failed") << t;
4465
4466 fSocket->Send(answ);
4467
4468 SafeDelete(t);
4469 SafeDelete(f);
4470}
4471
4472
4473//------------------------------------------------------------------------------
4474
4476
4477////////////////////////////////////////////////////////////////////////////////
4478/// Process specified TDSet on PROOF. Runs on super master.
4479/// The return value is -1 in case of error and TSelector::GetStatus() in
4480/// in case of success.
4481
4482Long64_t TProofPlayerSuperMaster::Process(TDSet *dset, const char *selector_file,
4483 Option_t *option, Long64_t nentries,
4485{
4487 PDB(kGlobal,1) Info("Process","Enter");
4488
4489 TProofSuperMaster *proof = dynamic_cast<TProofSuperMaster*>(GetProof());
4490 if (!proof) return -1;
4491
4492 delete fOutput;
4493 fOutput = new THashList;
4494
4496
4497 if (!SendSelector(selector_file)) {
4498 Error("Process", "sending selector %s", selector_file);
4499 return -1;
4500 }
4501
4502 TCleanup clean(this);
4503 SetupFeedback();
4504
4505 if (proof->IsMaster()) {
4506
4507 // make sure the DSet is valid
4508 if (!dset->ElementsValid()) {
4509 proof->ValidateDSet(dset);
4510 if (!dset->ElementsValid()) {
4511 Error("Process", "could not validate TDSet");
4512 return -1;
4513 }
4514 }
4515
4516 TList msds;
4517 msds.SetOwner(); // This will delete TPairs
4518
4519 TList keyholder; // List to clean up key part of the pairs
4520 keyholder.SetOwner();
4521 TList valueholder; // List to clean up value part of the pairs
4522 valueholder.SetOwner();
4523
4524 // Construct msd list using the slaves
4525 TIter nextslave(proof->GetListOfActiveSlaves());
4526 while (TSlave *sl = dynamic_cast<TSlave*>(nextslave())) {
4527 TList *submasters = 0;
4528 TPair *msd = dynamic_cast<TPair*>(msds.FindObject(sl->GetMsd()));
4529 if (!msd) {
4530 submasters = new TList;
4531 submasters->SetName(sl->GetMsd());
4532 keyholder.Add(submasters);
4533 TList *setelements = new TSortedList(kSortDescending);
4534 setelements->SetName(TString(sl->GetMsd())+"_Elements");
4535 valueholder.Add(setelements);
4536 msds.Add(new TPair(submasters, setelements));
4537 } else {
4538 submasters = dynamic_cast<TList*>(msd->Key());
4539 }
4540 if (submasters) submasters->Add(sl);
4541 }
4542
4543 // Add TDSetElements to msd list
4544 Long64_t cur = 0; //start of next element
4545 TIter nextelement(dset->GetListOfElements());
4546 while (TDSetElement *elem = dynamic_cast<TDSetElement*>(nextelement())) {
4547
4548 if (elem->GetNum()<1) continue; // get rid of empty elements
4549
4550 if (nentries !=-1 && cur>=first+nentries) {
4551 // we are done
4552 break;
4553 }
4554
4555 if (cur+elem->GetNum()-1<first) {
4556 //element is before first requested entry
4557 cur+=elem->GetNum();
4558 continue;
4559 }
4560
4561 if (cur<first) {
4562 //modify element to get proper start
4563 elem->SetNum(elem->GetNum()-(first-cur));
4564 elem->SetFirst(elem->GetFirst()+first-cur);
4565 cur=first;
4566 }
4567
4568 if (nentries==-1 || cur+elem->GetNum()<=first+nentries) {
4569 cur+=elem->GetNum();
4570 } else {
4571 //modify element to get proper end
4572 elem->SetNum(first+nentries-cur);
4573 cur=first+nentries;
4574 }
4575
4576 TPair *msd = dynamic_cast<TPair*>(msds.FindObject(elem->GetMsd()));
4577 if (!msd) {
4578 Error("Process", "data requires mass storage domain '%s'"
4579 " which is not accessible in this proof session",
4580 elem->GetMsd());
4581 return -1;
4582 } else {
4583 TList *elements = dynamic_cast<TList*>(msd->Value());
4584 if (elements) elements->Add(elem);
4585 }
4586 }
4587
4588 TList usedmasters;
4589 TIter nextmsd(msds.MakeIterator());
4590 while (TPair *msd = dynamic_cast<TPair*>(nextmsd())) {
4591 TList *submasters = dynamic_cast<TList*>(msd->Key());
4592 TList *setelements = dynamic_cast<TList*>(msd->Value());
4593
4594 // distribute elements over the masters
4595 Int_t nmasters = submasters ? submasters->GetSize() : -1;
4596 Int_t nelements = setelements ? setelements->GetSize() : -1;
4597 for (Int_t i=0; i<nmasters; i++) {
4598
4599 Long64_t nent = 0;
4600 TDSet set(dset->GetType(), dset->GetObjName(),
4601 dset->GetDirectory());
4602 for (Int_t j = (i*nelements)/nmasters;
4603 j < ((i+1)*nelements)/nmasters;
4604 j++) {
4605 TDSetElement *elem = setelements ?
4606 dynamic_cast<TDSetElement*>(setelements->At(j)) : (TDSetElement *)0;
4607 if (elem) {
4608 set.Add(elem->GetFileName(), elem->GetObjName(),
4609 elem->GetDirectory(), elem->GetFirst(),
4610 elem->GetNum(), elem->GetMsd());
4611 nent += elem->GetNum();
4612 } else {
4613 Warning("Process", "not a TDSetElement object");
4614 }
4615 }
4616
4617 if (set.GetListOfElements()->GetSize()>0) {
4619 TString fn(gSystem->BaseName(selector_file));
4620 TString opt = option;
4621 mesg << &set << fn << fInput << opt << Long64_t(-1) << Long64_t(0);
4622
4623 TSlave *sl = dynamic_cast<TSlave*>(submasters->At(i));
4624 if (sl) {
4625 PDB(kGlobal,1) Info("Process",
4626 "Sending TDSet with %d elements to submaster %s",
4627 set.GetListOfElements()->GetSize(),
4628 sl->GetOrdinal());
4629 sl->GetSocket()->Send(mesg);
4630 usedmasters.Add(sl);
4631
4632 // setup progress info
4633 fSlaves.AddLast(sl);
4637 fSlaveTotals[fSlaveTotals.GetSize()-1] = nent;
4647 fSlaveMBRti[fSlaveMBRti.GetSize()-1] = -1.;
4649 fSlaveActW[fSlaveActW.GetSize()-1] = 0;
4651 fSlaveTotS[fSlaveTotS.GetSize()-1] = 0;
4653 fSlaveEffS[fSlaveEffS.GetSize()-1] = 0.;
4654 } else {
4655 Warning("Process", "not a TSlave object");
4656 }
4657 }
4658 }
4659 }
4660
4661 if ( !IsClient() ) HandleTimer(0);
4662 PDB(kGlobal,1) Info("Process","Calling Collect");
4663 proof->Collect(&usedmasters);
4664 HandleTimer(0);
4665
4666 }
4667
4668 StopFeedback();
4669
4670 PDB(kGlobal,1) Info("Process","Calling Merge Output");
4671 MergeOutput();
4672
4674
4675 return 0;
4676}
4677
4678////////////////////////////////////////////////////////////////////////////////
4679/// Report progress.
4680
4682{
4683 Int_t idx = fSlaves.IndexOf(sl);
4684 fSlaveProgress[idx] = processed;
4685 if (fSlaveTotals[idx] != total)
4686 Warning("Progress", "total events has changed for slave %s", sl->GetName());
4687 fSlaveTotals[idx] = total;
4688
4689 Long64_t tot = 0;
4690 Int_t i;
4691 for (i = 0; i < fSlaveTotals.GetSize(); i++) tot += fSlaveTotals[i];
4692 Long64_t proc = 0;
4693 for (i = 0; i < fSlaveProgress.GetSize(); i++) proc += fSlaveProgress[i];
4694
4695 Progress(tot, proc);
4696}
4697
4698////////////////////////////////////////////////////////////////////////////////
4699/// Report progress.
4700
4702 Long64_t processed, Long64_t bytesread,
4703 Float_t initTime, Float_t procTime,
4704 Float_t evtrti, Float_t mbrti)
4705{
4706 PDB(kGlobal,2)
4707 Info("Progress","%s: %lld %lld %f %f %f %f", sl->GetName(),
4708 processed, bytesread, initTime, procTime, evtrti, mbrti);
4709
4710 Int_t idx = fSlaves.IndexOf(sl);
4711 if (fSlaveTotals[idx] != total)
4712 Warning("Progress", "total events has changed for slave %s", sl->GetName());
4713 fSlaveTotals[idx] = total;
4714 fSlaveProgress[idx] = processed;
4715 fSlaveBytesRead[idx] = bytesread;
4716 fSlaveInitTime[idx] = (initTime > -1.) ? initTime : fSlaveInitTime[idx];
4717 fSlaveProcTime[idx] = (procTime > -1.) ? procTime : fSlaveProcTime[idx];
4718 fSlaveEvtRti[idx] = (evtrti > -1.) ? evtrti : fSlaveEvtRti[idx];
4719 fSlaveMBRti[idx] = (mbrti > -1.) ? mbrti : fSlaveMBRti[idx];
4720
4721 Int_t i;
4722 Long64_t tot = 0;
4723 Long64_t proc = 0;
4724 Long64_t bytes = 0;
4725 Float_t init = -1.;
4726 Float_t ptime = -1.;
4727 Float_t erti = 0.;
4728 Float_t srti = 0.;
4729 Int_t nerti = 0;
4730 Int_t nsrti = 0;
4731 for (i = 0; i < fSlaveTotals.GetSize(); i++) {
4732 tot += fSlaveTotals[i];
4733 if (i < fSlaveProgress.GetSize())
4734 proc += fSlaveProgress[i];
4735 if (i < fSlaveBytesRead.GetSize())
4736 bytes += fSlaveBytesRead[i];
4737 if (i < fSlaveInitTime.GetSize())
4738 if (fSlaveInitTime[i] > -1. && (init < 0. || fSlaveInitTime[i] < init))
4739 init = fSlaveInitTime[i];
4740 if (i < fSlaveProcTime.GetSize())
4741 if (fSlaveProcTime[i] > -1. && (ptime < 0. || fSlaveProcTime[i] > ptime))
4742 ptime = fSlaveProcTime[i];
4743 if (i < fSlaveEvtRti.GetSize())
4744 if (fSlaveEvtRti[i] > -1.) {
4745 erti += fSlaveEvtRti[i];
4746 nerti++;
4747 }
4748 if (i < fSlaveMBRti.GetSize())
4749 if (fSlaveMBRti[i] > -1.) {
4750 srti += fSlaveMBRti[i];
4751 nsrti++;
4752 }
4753 }
4754 srti = (nsrti > 0) ? srti / nerti : 0.;
4755
4756 Progress(tot, proc, bytes, init, ptime, erti, srti);
4757}
4758
4759////////////////////////////////////////////////////////////////////////////////
4760/// Progress signal.
4761
4763{
4764 if (pi) {
4765 PDB(kGlobal,2)
4766 Info("Progress","%s: %lld %lld %lld %f %f %f %f %d %f", wrk->GetOrdinal(),
4767 pi->fTotal, pi->fProcessed, pi->fBytesRead,
4768 pi->fInitTime, pi->fProcTime, pi->fEvtRateI, pi->fMBRateI,
4769 pi->fActWorkers, pi->fEffSessions);
4770
4771 Int_t idx = fSlaves.IndexOf(wrk);
4772 if (fSlaveTotals[idx] != pi->fTotal)
4773 Warning("Progress", "total events has changed for worker %s", wrk->GetName());
4774 fSlaveTotals[idx] = pi->fTotal;
4775 fSlaveProgress[idx] = pi->fProcessed;
4776 fSlaveBytesRead[idx] = pi->fBytesRead;
4777 fSlaveInitTime[idx] = (pi->fInitTime > -1.) ? pi->fInitTime : fSlaveInitTime[idx];
4778 fSlaveProcTime[idx] = (pi->fProcTime > -1.) ? pi->fProcTime : fSlaveProcTime[idx];
4779 fSlaveEvtRti[idx] = (pi->fEvtRateI > -1.) ? pi->fEvtRateI : fSlaveEvtRti[idx];
4780 fSlaveMBRti[idx] = (pi->fMBRateI > -1.) ? pi->fMBRateI : fSlaveMBRti[idx];
4781 fSlaveActW[idx] = (pi->fActWorkers > -1) ? pi->fActWorkers : fSlaveActW[idx];
4782 fSlaveTotS[idx] = (pi->fTotSessions > -1) ? pi->fTotSessions : fSlaveTotS[idx];
4783 fSlaveEffS[idx] = (pi->fEffSessions > -1.) ? pi->fEffSessions : fSlaveEffS[idx];
4784
4785 Int_t i;
4786 Int_t nerti = 0;
4787 Int_t nsrti = 0;
4788 TProofProgressInfo pisum(0, 0, 0, -1., -1., 0., 0., 0, 0, 0.);
4789 for (i = 0; i < fSlaveTotals.GetSize(); i++) {
4790 pisum.fTotal += fSlaveTotals[i];
4791 if (i < fSlaveProgress.GetSize())
4792 pisum.fProcessed += fSlaveProgress[i];
4793 if (i < fSlaveBytesRead.GetSize())
4794 pisum.fBytesRead += fSlaveBytesRead[i];
4795 if (i < fSlaveInitTime.GetSize())
4796 if (fSlaveInitTime[i] > -1. && (pisum.fInitTime < 0. || fSlaveInitTime[i] < pisum.fInitTime))
4797 pisum.fInitTime = fSlaveInitTime[i];
4798 if (i < fSlaveProcTime.GetSize())
4799 if (fSlaveProcTime[i] > -1. && (pisum.fProcTime < 0. || fSlaveProcTime[i] > pisum.fProcTime))
4800 pisum.fProcTime = fSlaveProcTime[i];
4801 if (i < fSlaveEvtRti.GetSize())
4802 if (fSlaveEvtRti[i] > -1.) {
4803 pisum.fEvtRateI += fSlaveEvtRti[i];
4804 nerti++;
4805 }
4806 if (i < fSlaveMBRti.GetSize())
4807 if (fSlaveMBRti[i] > -1.) {
4808 pisum.fMBRateI += fSlaveMBRti[i];
4809 nsrti++;
4810 }
4811 if (i < fSlaveActW.GetSize())
4812 pisum.fActWorkers += fSlaveActW[i];
4813 if (i < fSlaveTotS.GetSize())
4814 if (fSlaveTotS[i] > -1 && (pisum.fTotSessions < 0. || fSlaveTotS[i] > pisum.fTotSessions))
4815 pisum.fTotSessions = fSlaveTotS[i];
4816 if (i < fSlaveEffS.GetSize())
4817 if (fSlaveEffS[i] > -1. && (pisum.fEffSessions < 0. || fSlaveEffS[i] > pisum.fEffSessions))
4818 pisum.fEffSessions = fSlaveEffS[i];
4819 }
4820 pisum.fMBRateI = (nsrti > 0) ? pisum.fMBRateI / nerti : 0.;
4821
4822 Progress(&pisum);
4823 }
4824}
4825
4826////////////////////////////////////////////////////////////////////////////////
4827/// Send progress and feedback to client.
4828
4830{
4831 if (fFeedbackTimer == 0) return kFALSE; // timer stopped already
4832
4833 Int_t i;
4834 Long64_t tot = 0;
4835 Long64_t proc = 0;
4836 Long64_t bytes = 0;
4837 Float_t init = -1.;
4838 Float_t ptime = -1.;
4839 Float_t erti = 0.;
4840 Float_t srti = 0.;
4841 Int_t nerti = 0;
4842 Int_t nsrti = 0;
4843 for (i = 0; i < fSlaveTotals.GetSize(); i++) {
4844 tot += fSlaveTotals[i];
4845 if (i < fSlaveProgress.GetSize())
4846 proc += fSlaveProgress[i];
4847 if (i < fSlaveBytesRead.GetSize())
4848 bytes += fSlaveBytesRead[i];
4849 if (i < fSlaveInitTime.GetSize())
4850 if (fSlaveInitTime[i] > -1. && (init < 0. || fSlaveInitTime[i] < init))
4851 init = fSlaveInitTime[i];
4852 if (i < fSlaveProcTime.GetSize())
4853 if (fSlaveProcTime[i] > -1. && (ptime < 0. || fSlaveProcTime[i] > ptime))
4854 ptime = fSlaveProcTime[i];
4855 if (i < fSlaveEvtRti.GetSize())
4856 if (fSlaveEvtRti[i] > -1.) {
4857 erti += fSlaveEvtRti[i];
4858 nerti++;
4859 }
4860 if (i < fSlaveMBRti.GetSize())
4861 if (fSlaveMBRti[i] > -1.) {
4862 srti += fSlaveMBRti[i];
4863 nsrti++;
4864 }
4865 }
4866 erti = (nerti > 0) ? erti / nerti : 0.;
4867 srti = (nsrti > 0) ? srti / nerti : 0.;
4868
4870 if (gProofServ->GetProtocol() > 25) {
4871 // Fill the message now
4872 TProofProgressInfo pi(tot, proc, bytes, init, ptime,
4873 erti, srti, -1,
4875 m << &pi;
4876 } else {
4877
4878 m << tot << proc << bytes << init << ptime << erti << srti;
4879 }
4880
4881 // send message to client;
4883
4884 if (fReturnFeedback)
4886 else
4887 return kFALSE;
4888}
4889
4890////////////////////////////////////////////////////////////////////////////////
4891/// Setup reporting of feedback objects and progress messages.
4892
4894{
4895 if (IsClient()) return; // Client does not need timer
4896
4898
4899 if (fFeedbackTimer) {
4901 return;
4902 } else {
4904 }
4905
4906 // setup the timer for progress message
4908 fFeedbackPeriod = 2000;
4909 TProof::GetParameter(fInput, "PROOF_FeedbackPeriod", fFeedbackPeriod);
4910 fFeedbackTimer = new TTimer;
4913}
void Class()
Definition: Class.C:29
@ kPROOF_PROCESS
Definition: MessageTypes.h:56
@ kPROOF_GETTREEHEADER
Definition: MessageTypes.h:66
@ kPROOF_PROGRESS
Definition: MessageTypes.h:61
@ kPROOF_FEEDBACK
Definition: MessageTypes.h:62
ROOT::R::TRInterface & r
Definition: Object.C:4
#define SafeDelete(p)
Definition: RConfig.hxx:529
#define d(i)
Definition: RSha256.hxx:102
#define f(i)
Definition: RSha256.hxx:104
#define c(i)
Definition: RSha256.hxx:101
#define h(i)
Definition: RSha256.hxx:106
#define e(i)
Definition: RSha256.hxx:103
static Int_t init()
const Ssiz_t kNPOS
Definition: RtypesCore.h:111
int Int_t
Definition: RtypesCore.h:41
int Ssiz_t
Definition: RtypesCore.h:63
const Bool_t kFALSE
Definition: RtypesCore.h:88
long Long_t
Definition: RtypesCore.h:50
bool Bool_t
Definition: RtypesCore.h:59
double Double_t
Definition: RtypesCore.h:55
long long Long64_t
Definition: RtypesCore.h:69
float Float_t
Definition: RtypesCore.h:53
const Bool_t kTRUE
Definition: RtypesCore.h:87
const char Option_t
Definition: RtypesCore.h:62
#define ClassImp(name)
Definition: Rtypes.h:363
R__EXTERN Int_t gDebug
Definition: Rtypes.h:90
#define gDirectory
Definition: TDirectory.h:213
R__EXTERN TEnv * gEnv
Definition: TEnv.h:171
void Info(const char *location, const char *msgfmt,...)
const Int_t kBreak
Definition: TError.h:40
R__EXTERN Int_t gErrorIgnoreLevel
Definition: TError.h:105
ErrorHandlerFunc_t SetErrorHandler(ErrorHandlerFunc_t newhandler)
Set an errorhandler function. Returns the old handler.
Definition: TError.cxx:106
#define CATCH(n)
Definition: TException.h:63
void Throw(int code)
If an exception context has been set (using the TRY and RETRY macros) jump back to where it was set.
Definition: TException.cxx:27
#define ENDTRY
Definition: TException.h:69
#define TRY
Definition: TException.h:56
static unsigned int total
int type
Definition: TGX11.cxx:120
#define Printf
Definition: TGeoToOCC.h:18
float xmin
Definition: THbookFile.cxx:93
int nentries
Definition: THbookFile.cxx:89
float * q
Definition: THbookFile.cxx:87
float ymin
Definition: THbookFile.cxx:93
float xmax
Definition: THbookFile.cxx:93
float ymax
Definition: THbookFile.cxx:93
const Bool_t kSortDescending
Definition: TList.h:38
#define PDB(mask, level)
Definition: TProofDebug.h:56
#define kPEX_ABORTED
#define kPEX_STOPPED
static Bool_t gAbort
R__EXTERN TProofServ * gProofServ
Definition: TProofServ.h:347
#define gROOT
Definition: TROOT.h:410
char * Form(const char *fmt,...)
typedef void((*Func_t)())
@ kReadPermission
Definition: TSystem.h:48
R__EXTERN TSystem * gSystem
Definition: TSystem.h:540
R__EXTERN TVirtualMonitoringWriter * gMonitoringWriter
static struct mg_connection * fc(struct mg_context *ctx)
Definition: civetweb.c:3728
void Set(Int_t n)
Set size of this array to n floats.
Definition: TArrayF.cxx:105
void Set(Int_t n)
Set size of this array to n ints.
Definition: TArrayI.cxx:105
void Set(Int_t n)
Set size of this array to n long64s.
Definition: TArrayL64.cxx:105
Int_t GetSize() const
Definition: TArray.h:47
Class to manage histogram axis.
Definition: TAxis.h:30
Double_t GetXmax() const
Definition: TAxis.h:134
virtual Int_t FindBin(Double_t x)
Find bin number corresponding to abscissa x.
Definition: TAxis.cxx:279
Double_t GetXmin() const
Definition: TAxis.h:133
Int_t GetNbins() const
Definition: TAxis.h:121
virtual TObject * ReadObject(const TClass *cl)
Read object from I/O buffer.
The ROOT global object gROOT contains a list of all defined classes.
Definition: TClass.h:75
static TClass * GetClass(const char *name, Bool_t load=kTRUE, Bool_t silent=kFALSE)
Static method returning pointer to TClass of the specified class name.
Definition: TClass.cxx:2885
virtual void ls(Option_t *option="") const
List (ls) all objects in this collection.
virtual const char * GetName() const
Return name of this collection.
virtual void Print(Option_t *option="") const
Default print for collections, calls Print(option, 1).
void SetName(const char *name)
Definition: TCollection.h:204
virtual Int_t GetEntries() const
Definition: TCollection.h:177
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
virtual Bool_t IsEmpty() const
Definition: TCollection.h:186
virtual Int_t GetSize() const
Return the capacity of the collection, i.e.
Definition: TCollection.h:182
virtual Int_t Write(const char *name=0, Int_t option=0, Int_t bufsize=0)
Write all objects in this collection.
Manages an element of a TDSet.
Definition: TDSet.h:66
TFileInfo * GetFileInfo(const char *type="TTree")
Return the content of this element in the form of a TFileInfo.
Definition: TDSet.cxx:231
const char * GetObjName() const
Definition: TDSet.h:120
Long64_t GetNum() const
Definition: TDSet.h:114
@ kNewRun
Definition: TDSet.h:75
@ kNewPacket
Definition: TDSet.h:76
Float_t GetMaxProcTime() const
Definition: TDSet.h:144
const char * GetDirectory() const
Return directory where to look for object.
Definition: TDSet.cxx:253
Long64_t GetTDSetOffset() const
Definition: TDSet.h:128
Bool_t GetValid() const
Definition: TDSet.h:119
const char * GetMsd() const
Definition: TDSet.h:117
const char * GetFileName() const
Definition: TDSet.h:111
Long64_t GetFirst() const
Definition: TDSet.h:112
This class implements a data set to be used for PROOF processing.
Definition: TDSet.h:153
virtual TDSetElement * Next(Long64_t totalEntries=-1)
Returns next TDSetElement.
Definition: TDSet.cxx:413
Int_t Remove(TDSetElement *elem, Bool_t deleteElem=kTRUE)
Remove TDSetElement 'elem' from the list.
Definition: TDSet.cxx:1577
virtual Bool_t Add(const char *file, const char *objname=0, const char *dir=0, Long64_t first=0, Long64_t num=-1, const char *msd=0)
Add file to list of files to be analyzed.
Definition: TDSet.cxx:1052
virtual void Reset()
Reset or initialize access to the elements.
Definition: TDSet.cxx:1369
Bool_t ElementsValid()
Check if all elements are valid.
Definition: TDSet.cxx:1556
void Lookup(Bool_t removeMissing=kFALSE, TList **missingFiles=0)
Resolve the end-point URL for the current elements of this data set If the removeMissing option is se...
Definition: TDSet.cxx:1606
TDSetElement * Current() const
Definition: TDSet.h:238
TObject * GetEntryList() const
Definition: TDSet.h:251
void Validate()
Validate the TDSet by opening files.
Definition: TDSet.cxx:1590
const char * GetType() const
Definition: TDSet.h:228
void SetWriteV3(Bool_t on=kTRUE)
Set/Reset the 'OldStreamer' bit in this instance and its elements.
Definition: TDSet.cxx:1875
TList * GetListOfElements() const
Definition: TDSet.h:231
const char * GetDirectory() const
Definition: TDSet.h:230
const char * GetObjName() const
Definition: TDSet.h:229
@ kSomeInvalid
Definition: TDSet.h:161
@ kEmpty
Definition: TDSet.h:159
@ kIsLocal
Definition: TDSet.h:163
@ kValidityChecked
Definition: TDSet.h:160
@ kMultiDSet
Definition: TDSet.h:162
UInt_t Convert(Bool_t toGMT=kFALSE) const
Convert fDatime from TDatime format to the standard time_t format.
Definition: TDatime.cxx:181
virtual TDirectory * mkdir(const char *name, const char *title="")
Create a sub-directory "a" or a hierarchy of sub-directories "a/b/c/...".
virtual Bool_t cd(const char *path=0)
Change current directory to "this" directory.
Describe directory structure in memory.
Definition: TDirectory.h:34
virtual Bool_t cd(const char *path=0)
Change current directory to "this" directory.
Definition: TDirectory.cxx:497
Utility class to draw objects in the feedback list during queries.
Definition: TDrawFeedback.h:35
A List of entry numbers in a TTree or TChain.
Definition: TEntryList.h:26
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
Definition: TEnv.cxx:491
virtual TEnvRec * Lookup(const char *n) const
Loop over all resource records and return the one with name.
Definition: TEnv.cxx:547
virtual void SetValue(const char *name, const char *value, EEnvLevel level=kEnvChange, const char *type=0)
Set the value of a resource or create a new resource.
Definition: TEnv.cxx:736
virtual Long64_t GetEntryNumber(Long64_t)
Definition: TEventIter.cxx:230
virtual Int_t GetLearnEntries()=0
TList * GetPackets()
Definition: TEventIter.h:89
virtual void StopProcess(Bool_t abort)
Set flag to stop the process.
Definition: TEventIter.cxx:142
virtual Long64_t GetCacheSize()=0
static TEventIter * Create(TDSet *dset, TSelector *sel, Long64_t first, Long64_t num)
Create and instance of the appropriate iterator.
Definition: TEventIter.cxx:150
virtual Int_t GetNextPacket(Long64_t &first, Long64_t &num)=0
virtual void InvalidatePacket()
Invalidated the current packet (if any) by setting the TDSetElement::kCorrupted bit.
Definition: TEventIter.cxx:134
A TEventList object is a list of selected events (entries) in a TTree.
Definition: TEventList.h:31
virtual Int_t GetN() const
Definition: TEventList.h:56
virtual Long64_t * GetList() const
Definition: TEventList.h:55
virtual void Add(const TEventList *list)
Merge contents of alist with this list.
Definition: TEventList.cxx:116
Class that contains a list of TFileInfo's and accumulated meta data information about its entries.
Class describing a generic file including meta information.
Definition: TFileInfo.h:38
TUrl * GetCurrentUrl() const
Return the current url.
Definition: TFileInfo.cxx:248
This class provides file copy and merging services.
Definition: TFileMerger.h:30
virtual Bool_t OutputFile(const char *url, Bool_t force)
Open merger output file.
virtual Bool_t AddFile(TFile *source, Bool_t own, Bool_t cpProgress)
Add the TFile to this file merger and give ownership of the TFile to this object (unless kFALSE is re...
virtual void PrintFiles(Option_t *options)
Print list of files being merged.
virtual Bool_t Merge(Bool_t=kTRUE)
Merge the files.
TList * GetMergeList()
Definition: TFileMerger.h:85
virtual void Reset()
Reset merger file list.
A ROOT file is a suite of consecutive data records (TKey instances) with a well defined format.
Definition: TFile.h:48
virtual void Close(Option_t *option="")
Close a file.
Definition: TFile.cxx:912
static EFileType GetType(const char *name, Option_t *option="", TString *prefix=0)
Resolve the file type as a function of the protocol field in 'name'.
Definition: TFile.cxx:4690
static Long64_t GetFileBytesRead()
Static function returning the total number of bytes read from all files.
Definition: TFile.cxx:4443
EFileType
File type.
Definition: TFile.h:187
@ kLocal
Definition: TFile.h:187
@ kNet
Definition: TFile.h:187
@ kFile
Definition: TFile.h:187
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseGeneralPurpose, Int_t netopt=0)
Create / open a file.
Definition: TFile.cxx:3975
static Int_t GetFileReadCalls()
Static function returning the total number of read calls from all files.
Definition: TFile.cxx:4460
1-D histogram with an int per channel (see TH1 documentation)}
Definition: TH1.h:530
The TH1 histogram class.
Definition: TH1.h:56
TAxis * GetZaxis()
Definition: TH1.h:318
virtual Int_t GetDimension() const
Definition: TH1.h:278
TAxis * GetXaxis()
Get the behaviour adopted by the object about the statoverflows. See EStatOverflows for more informat...
Definition: TH1.h:316
Int_t GetBufferLength() const
Definition: TH1.h:232
virtual Bool_t Add(TF1 *h1, Double_t c1=1, Option_t *option="")
Performs the operation: this = this + c1*f1 if errors are defined (see TH1::Sumw2),...
Definition: TH1.cxx:777
virtual Int_t Fill(Double_t x)
Increment bin with abscissa X by 1.
Definition: TH1.cxx:3251
TAxis * GetYaxis()
Definition: TH1.h:317
virtual Double_t GetEntries() const
Return the current number of entries.
Definition: TH1.cxx:4185
virtual Double_t GetBinContent(Int_t bin) const
Return content of bin number bin.
Definition: TH1.cxx:4790
const Double_t * GetBuffer() const
Definition: TH1.h:234
static void SetLimitsFinder(THLimitsFinder *finder)
This static function can be used to specify a finder derived from THLimitsFinder.
THashList implements a hybrid collection class consisting of a hash table and a list to store TObject...
Definition: THashList.h:34
TObject * FindObject(const char *name) const
Find object using its name.
Definition: THashList.cxx:262
TObject * Remove(TObject *obj)
Remove object from the list.
Definition: THashList.cxx:378
void Clear(Option_t *option="")
Remove all objects from the list.
Definition: THashList.cxx:189
TObject * Next()
Definition: TCollection.h:249
A doubly linked list.
Definition: TList.h:44
virtual void Add(TObject *obj)
Definition: TList.h:87
virtual TObject * Remove(TObject *obj)
Remove object from the list.
Definition: TList.cxx:818
virtual void AddFirst(TObject *obj)
Add object at the beginning of the list.
Definition: TList.cxx:97
virtual TObject * FindObject(const char *name) const
Delete a TObjLink object.
Definition: TList.cxx:574
virtual TObject * At(Int_t idx) const
Returns the object at position idx. Returns 0 if idx is out of range.
Definition: TList.cxx:354
virtual TObject * Last() const
Return the last object in the list. Returns 0 when list is empty.
Definition: TList.cxx:689
virtual void AddAfter(const TObject *after, TObject *obj)
Insert object after object after in the list.
Definition: TList.cxx:247
virtual void AddBefore(const TObject *before, TObject *obj)
Insert object before object before in the list.
Definition: TList.cxx:193
virtual TIterator * MakeIterator(Bool_t dir=kIterForward) const
Return a list iterator.
Definition: TList.cxx:718
virtual void Delete(Option_t *option="")
Remove all objects from the list AND delete all heap based objects.
Definition: TList.cxx:467
virtual TObject * First() const
Return the first object in the list. Returns 0 when list is empty.
Definition: TList.cxx:655
virtual void AddLast(TObject *obj)
Add object at the end of the list.
Definition: TList.cxx:149
virtual void Clear(Option_t *option="")
Remove all objects from the list.
Definition: TList.cxx:399
This code implements the MD5 message-digest algorithm.
Definition: TMD5.h:44
static TMD5 * FileChecksum(const char *file)
Returns checksum of specified file.
Definition: TMD5.cxx:474
virtual TMD5 * Checksum()
Returns checksum of the current content.
Definition: TMacro.cxx:194
void SaveSource(FILE *fp)
Save macro source in file pointer fp.
Definition: TMacro.cxx:381
TMap implements an associative array of (key,value) pairs using a THashTable for efficient retrieval ...
Definition: TMap.h:40
void Add(TObject *obj)
This function may not be used (but we need to provide it since it is a pure virtual in TCollection).
Definition: TMap.cxx:53
TObject * GetValue(const char *keyname) const
Returns a pointer to the value associated with keyname as name of the key.
Definition: TMap.cxx:235
TObject * Remove(TObject *key)
Remove the (key,value) pair with key from the map.
Definition: TMap.cxx:295
TClass * GetClass() const
Definition: TMessage.h:71
Method or function calling interface.
Definition: TMethodCall.h:37
void ResetParam()
Reset parameter list. To be used before the first call the SetParam().
Bool_t IsValid() const
Return true if the method call has been properly initialized and is usable.
void Execute(const char *, const char *, int *=0)
Execute method on this object with the given parameter string, e.g.
Definition: TMethodCall.h:64
void InitWithPrototype(TClass *cl, const char *method, const char *proto, Bool_t objectIsConst=kFALSE, ROOT::EFunctionMatchMode mode=ROOT::kConversionMatch)
Initialize the method invocation environment.
void SetParam(Long_t l)
Add a long method parameter.
The TNamed class is the base class for all named ROOT classes.
Definition: TNamed.h:29
virtual void SetTitle(const char *title="")
Set the title of the TNamed.
Definition: TNamed.cxx:164
TNamed()
Definition: TNamed.h:36
virtual void SetName(const char *name)
Set the name of the TNamed.
Definition: TNamed.cxx:140
virtual const char * GetTitle() const
Returns title of object.
Definition: TNamed.h:48
virtual const char * GetName() const
Returns name of object.
Definition: TNamed.h:47
An array of TObjects.
Definition: TObjArray.h:37
Collectable string class.
Definition: TObjString.h:28
const char * GetName() const
Returns name of object.
Definition: TObjString.h:39
Mother of all ROOT objects.
Definition: TObject.h:37
virtual Int_t Write(const char *name=0, Int_t option=0, Int_t bufsize=0)
Write this object to the current directory.
Definition: TObject.cxx:785
virtual const char * GetName() const
Returns name of object.
Definition: TObject.cxx:357
R__ALWAYS_INLINE Bool_t TestBit(UInt_t f) const
Definition: TObject.h:172
virtual Bool_t HandleTimer(TTimer *timer)
Execute action in response of a timer timing out.
Definition: TObject.cxx:411
virtual TObject * Clone(const char *newname="") const
Make a clone of an object using the Streamer facility.
Definition: TObject.cxx:144
virtual const char * ClassName() const
Returns name of class to which the object belongs.
Definition: TObject.cxx:128
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition: TObject.cxx:866
void MayNotUse(const char *method) const
Use this method to signal that a method (defined in a base class) may not be called in a derived clas...
Definition: TObject.cxx:933
R__ALWAYS_INLINE Bool_t IsZombie() const
Definition: TObject.h:134
@ kOverwrite
overwrite existing object with same name
Definition: TObject.h:88
@ kSingleKey
write collection with single key
Definition: TObject.h:87
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
Definition: TObject.cxx:694
virtual Bool_t InheritsFrom(const char *classname) const
Returns kTRUE if object inherits from class "classname".
Definition: TObject.cxx:443
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition: TObject.cxx:880
void ResetBit(UInt_t f)
Definition: TObject.h:171
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition: TObject.cxx:854
Set the selector's data members to the corresponding elements of the output list.
static TOutputListSelectorDataMap * FindInList(TCollection *coll)
Find a TOutputListSelectorDataMap in a collection.
Bool_t SetDataMembers(TSelector *sel) const
Given an output list, set the data members of a TSelector.
Class used by TMap to store (key,value) pairs.
Definition: TMap.h:102
void SetValue(TObject *val)
Definition: TMap.h:122
TObject * Value() const
Definition: TMap.h:121
TObject * Key() const
Definition: TMap.h:120
static void SetMemValues()
Record memory usage.
Definition: TPerfStats.cxx:778
static void Setup(TList *input)
Setup the PROOF input list with requested statistics and tracing options.
Definition: TPerfStats.cxx:727
static void Stop()
Terminate the PROOF statistics run.
Definition: TPerfStats.cxx:764
static void GetMemValues(Long_t &vmax, Long_t &rmax)
Get memory usage.
Definition: TPerfStats.cxx:790
static void Start(TList *input, TList *output)
Initialize PROOF statistics run.
Definition: TPerfStats.cxx:744
Implement Tree drawing using PROOF.
Definition: TProofDraw.h:49
Class to find axis limits and synchronize them between workers.
static void AutoBinFunc(TString &key, Double_t &xmin, Double_t &xmax, Double_t &ymin, Double_t &ymax, Double_t &zmin, Double_t &zmax)
Get bining information.
Int_t Unlock()
Unlock the directory.
Int_t Lock()
Locks the directory.
Class to steer the merging of files produced on the workers.
UInt_t GetTypeOpt() const
Bool_t IsMerge() const
const char * GetDir(Bool_t raw=kFALSE) const
const char * GetOutputFileName() const
Bool_t IsRegister() const
const char * GetFileName() const
Bool_t IsMerged() const
Bool_t IsRetrieve() const
TFileCollection * GetFileCollection()
Get instance of the file collection to be used in 'dataset' mode.
Long64_t Merge(TCollection *list)
Merge objects from the list into this object.
void SetDir(const char *dir, Bool_t raw=kFALSE)
void SetMerged(Bool_t merged=kTRUE)
void SetOutputFileName(const char *name)
Set the name of the output file; in the form of an Url.
void SetFileName(const char *name)
void Print(Option_t *option="") const
Dump the class content.
void SetWorkerOrdinal(const char *ordinal)
Int_t AdoptFile(TFile *f)
Adopt a file already open.
TFileMerger * GetFileMerger(Bool_t local=kFALSE)
Get instance of the file merger to be used in 'merge' mode.
Long64_t Process(const char *selector, Long64_t nentries=-1, Option_t *option="")
Process the specified TSelector file 'nentries' times.
TStopwatch * fMergeSTW
Definition: TProofPlayer.h:295
ErrorHandlerFunc_t fErrorHandler
tdset for current processing
Definition: TProofPlayer.h:289
Int_t InitPacketizer(TDSet *dset, Long64_t nentries, Long64_t first, const char *defpackunit, const char *defpackdata)
Init the packetizer Return 0 on success (fPacketizer is correctly initialized), -1 on failure.
void NotifyMemory(TObject *obj)
Printout the memory record after merging object 'obj' This record is used by the memory monitor.
void StopProcess(Bool_t abort, Int_t timeout=-1)
Stop process after this event.
TObject * HandleHistogram(TObject *obj, Bool_t &merged)
Low statistic histograms need a special treatment when using autobin.
Bool_t IsClient() const
Is the player running on the client?
TList * MergeFeedback()
Merge feedback lists.
void SetInitTime()
Set init time.
TVirtualPacketizer * fPacketizer
Definition: TProofPlayer.h:286
void RedirectOutput(Bool_t on=kTRUE)
Control output redirection to TProof::fLogFileW.
void AddOutput(TList *out)
Incorporate the content of the received output list 'out' into the final output list fOutput.
TString fSelectorFileName
Definition: TProofPlayer.h:293
void Feedback(TList *objs)
Feedback signal.
TMessage * fProcessMessage
Histogram with packets being processed (owned by TPerfStats)
Definition: TProofPlayer.h:292
virtual ~TProofPlayerRemote()
Destructor.
Bool_t MergeOutputFiles()
Merge output in files.
virtual Long64_t Finalize(Bool_t force=kFALSE, Bool_t sync=kFALSE)
Finalize query (may not be used in this class).
Int_t Incorporate(TObject *obj, TList *out, Bool_t &merged)
Incorporate object 'newobj' in the list 'outlist'.
void StoreOutput(TList *out)
Store received output list.
virtual void MergeOutput(Bool_t savememvalues=kFALSE)
Merge objects in output the lists.
void SetSelectorDataMembersFromOutputList()
Set the selector's data members: find the mapping of data members to otuput list entries in the outpu...
void SetMerging(Bool_t on=kTRUE)
Switch on/off merge timer.
Bool_t HistoSameAxis(TH1 *h0, TH1 *h1)
Return kTRUE is the histograms 'h0' and 'h1' have the same binning and ranges on the axis (i....
Int_t AddOutputObject(TObject *obj)
Incorporate the received object 'obj' into the output list fOutput.
virtual void StoreFeedback(TObject *slave, TList *out)
Store feedback results from the specified slave.
void StopFeedback()
Stop reporting of feedback objects.
virtual Bool_t JoinProcess(TList *workers)
Prepares the given list of new workers to join a progressing process.
TProof * GetProof() const
Definition: TProofPlayer.h:307
TDSetElement * GetNextPacket(TSlave *slave, TMessage *r)
Get next packet for specified slave.
virtual Bool_t HandleTimer(TTimer *timer)
Send feedback objects to client.
void Progress(Long64_t total, Long64_t processed)
Progress signal.
Long64_t DrawSelect(TDSet *set, const char *varexp, const char *selection, Option_t *option="", Long64_t nentries=-1, Long64_t firstentry=0)
Draw (support for TChain::Draw()).
void SetLastMergingMsg(TObject *obj)
Set the message to be notified in case of exception.
virtual Bool_t SendSelector(const char *selector_file)
Send the selector file(s) to master or worker nodes.
void SetupFeedback()
Setup reporting of feedback objects.
virtual Long64_t Process(TDSet *set, const char *selector, Option_t *option="", Long64_t nentries=-1, Long64_t firstentry=0)
Process specified TDSet on PROOF.
void SetupFeedback()
Setup feedback.
Bool_t HandleTimer(TTimer *timer)
Handle timer event.
void StopFeedback()
Stop feedback.
void HandleGetTreeHeader(TMessage *mess)
Handle tree header request.
void SetupFeedback()
Setup reporting of feedback objects and progress messages.
Long64_t Process(TDSet *set, const char *selector, Option_t *option="", Long64_t nentries=-1, Long64_t firstentry=0)
Process specified TDSet on PROOF.
Bool_t HandleTimer(TTimer *timer)
Send progress and feedback to client.
void Progress(Long64_t total, Long64_t processed)
Progress signal.
Definition: TProofPlayer.h:428
Internal class steering processing in PROOF.
Definition: TProofPlayer.h:60
Long64_t fReadCallsRun
Bytes read in this run.
Definition: TProofPlayer.h:80
TList * fAutoBins
Definition: TProofPlayer.h:63
TString fOutputFilePath
Definition: TProofPlayer.h:97
void HandleGetTreeHeader(TMessage *mess)
Handle tree header request.
void Progress(Long64_t total, Long64_t processed)
Report progress (may not be used in this class).
TFile * fOutputFile
Definition: TProofPlayer.h:98
Bool_t fCreateSelObj
the latest selector
Definition: TProofPlayer.h:69
virtual void StopFeedback()
Stop feedback (may not be used in this class).
virtual Int_t DrawCanvas(TObject *obj)
Draw the object if it is a canvas.
void AddOutput(TList *out)
Incorporate output list (may not be used in this class).
static THashList * fgDrawInputPars
Definition: TProofPlayer.h:103
void Feedback(TList *objs)
Set feedback list (may not be used in this class).
TEventIter * fEvIter
period (ms) for sending intermediate results
Definition: TProofPlayer.h:73
Long64_t fTotalEvents
Definition: TProofPlayer.h:76
TTimer * fFeedbackTimer
class of the latest selector
Definition: TProofPlayer.h:71
TProofPlayer(TProof *proof=0)
Default ctor.
TList * GetOutputList() const
Get output list.
Int_t SavePartialResults(Bool_t queryend=kFALSE, Bool_t force=kFALSE)
Save the partial results of this query to a dedicated file under the user data directory.
void RestorePreviousQuery()
Definition: TProofPlayer.h:169
void RemoveQueryResult(const char *ref)
Remove all query result instances referenced 'ref' from the list of results.
void SetDrawFeedbackOption(TDrawFeedback *f, Option_t *opt)
Set draw feedback option.
void UpdateProgressInfo()
Update fProgressStatus.
TSelector * fSelector
Definition: TProofPlayer.h:68
Long64_t GetEventsProcessed() const
Definition: TProofPlayer.h:206
Int_t fDrawQueries
Definition: TProofPlayer.h:86
void UpdateAutoBin(const char *name, Double_t &xmin, Double_t &xmax, Double_t &ymin, Double_t &ymax, Double_t &zmin, Double_t &zmax)
Update automatic binning parameters for given object "name".
Long64_t GetCacheSize()
Return the size in bytes of the cache.
TProofProgressStatus * GetProgressStatus() const
Definition: TProofPlayer.h:224
Int_t GetLearnEntries()
Return the number of entries in the learning phase.
Long64_t fReadBytesRun
Definition: TProofPlayer.h:79
THashList * fOutput
Definition: TProofPlayer.h:67
Bool_t IsClient() const
Definition: TProofPlayer.h:202
TTimer * fStopTimer
Definition: TProofPlayer.h:89
EExitStatus fExitStatus
status of query in progress
Definition: TProofPlayer.h:75
Long_t fFeedbackPeriod
timer for sending intermediate results
Definition: TProofPlayer.h:72
TList * fQueryResults
Events processed in this run.
Definition: TProofPlayer.h:83
virtual Bool_t JoinProcess(TList *workers)
Not implemented: meaningful only in the remote player. Returns kFALSE.
virtual void MergeOutput(Bool_t savememvalues=kFALSE)
Merge output (may not be used in this class).
void MapOutputListToDataMembers() const
void FeedBackCanvas(const char *name, Bool_t create)
Create/destroy a named canvas for feedback.
Long_t fSaveMemThreshold
Definition: TProofPlayer.h:99
void ClearInput()
Clear input list.
Bool_t fSaveResultsPerPacket
Definition: TProofPlayer.h:101
Long64_t fProcessedRun
Read calls in this run.
Definition: TProofPlayer.h:81
Bool_t fSavePartialResults
Definition: TProofPlayer.h:100
void AddQueryResult(TQueryResult *q)
Add query result to the list, making sure that there are no duplicates.
void StoreFeedback(TObject *slave, TList *out)
Store feedback list (may not be used in this class).
TDSetElement * GetNextPacket(TSlave *slave, TMessage *r)
Get next packet (may not be used in this class).
TTimer * fProcTimeTimer
Definition: TProofPlayer.h:94
TStatus * fSelStatus
iterator on events or objects
Definition: TProofPlayer.h:74
Int_t AddOutputObject(TObject *obj)
Incorporate output object (may not be used in this class).
virtual ~TProofPlayer()
Destructor.
TClass * fSelectorClass
kTRUE when fSelector has been created locally
Definition: TProofPlayer.h:70
TQueryResult * fPreviousQuery
Definition: TProofPlayer.h:85
void SetDispatchTimer(Bool_t on=kTRUE)
Enable/disable the timer to dispatch pening events while processing.
Long64_t Process(TDSet *set, const char *selector, Option_t *option="", Long64_t nentries=-1, Long64_t firstentry=0)
Process specified TDSet on PROOF worker.
TTimer * fDispatchTimer
Definition: TProofPlayer.h:92
TList * fInput
Definition: TProofPlayer.h:66
TObject * GetOutput(const char *name) const
Get output object by name.
Long64_t Finalize(Bool_t force=kFALSE, Bool_t sync=kFALSE)
Finalize query (may not be used in this class).
TDrawFeedback * CreateDrawFeedback(TProof *p)
Draw feedback creation proxy.
TQueryResult * fQuery
Definition: TProofPlayer.h:84
void StopProcess(Bool_t abort, Int_t timeout=-1)
Stop the process after this event.
Int_t GetDrawArgs(const char *var, const char *sel, Option_t *opt, TString &selector, TString &objname)
Parse the arguments from var, sel and opt and fill the selector and object name accordingly.
void DeleteDrawFeedback(TDrawFeedback *f)
Delete draw feedback object.
TProofProgressStatus * fProgressStatus
Definition: TProofPlayer.h:77
Int_t ReinitSelector(TQueryResult *qr)
Reinitialize fSelector using the selector files in the query result.
Int_t fMaxDrawQueries
Definition: TProofPlayer.h:87
void SetStopTimer(Bool_t on=kTRUE, Bool_t abort=kFALSE, Int_t timeout=0)
Enable/disable the timer to stop/abort processing.
std::mutex fStopTimerMtx
Definition: TProofPlayer.h:90
TQueryResult * GetQueryResult(const char *ref)
Get query result instances referenced 'ref' from the list of results.
Long64_t DrawSelect(TDSet *set, const char *varexp, const char *selection, Option_t *option="", Long64_t nentries=-1, Long64_t firstentry=0)
Draw (may not be used in this class).
virtual void SetupFeedback()
Set up feedback (may not be used in this class).
void StoreOutput(TList *out)
Store output list (may not be used in this class).
void SetCurrentQuery(TQueryResult *q)
Set current query and save previous value.
EExitStatus GetExitStatus() const
Definition: TProofPlayer.h:205
void AddInput(TObject *inp)
Add object to input list.
void SetProcessing(Bool_t on=kTRUE)
Set processing bit according to 'on'.
Bool_t CheckMemUsage(Long64_t &mfreq, Bool_t &w80r, Bool_t &w80v, TString &wmsg)
Check the memory usage, if requested.
TStopwatch * fProcTime
Definition: TProofPlayer.h:95
Int_t AssertSelector(const char *selector_file)
Make sure that a valid selector object Return -1 in case of problems, 0 otherwise.
void HandleRecvHisto(TMessage *mess)
Receive histo from slave.
Long64_t fTotal
Definition: TProof.h:163
Container class for processing statistics.
void SetLastUpdate(Double_t updtTime=0)
Update time stamp either with the passed value (if > 0) or with the current time.
void SetBytesRead(Long64_t bytesRead)
Long64_t GetEntries() const
void IncEntries(Long64_t entries=1)
void SetReadCalls(Long64_t readCalls)
const char * GetOrdinal() const
Definition: TProofServ.h:253
Int_t GetQuerySeqNum() const
Definition: TProofServ.h:260
const char * GetTopSessionTag() const
Definition: TProofServ.h:246
static void ErrorHandler(Int_t level, Bool_t abort, const char *location, const char *msg)
The PROOF error handler function.
static void GetLocalServer(TString &dsrv)
Extract LOCALDATASERVER info in 'dsrv'.
static void SetLastEntry(Long64_t lastentry)
Set the last entry before exception.
static Float_t GetMemHWM()
MemHWM getter.
Bool_t IsParallel() const
True if in parallel mode.
static void FilterLocalroot(TString &path, const char *url="root://dum/")
If 'path' is local and 'dsrv' is Xrootd, apply 'path.Localroot' settings, if any.
static FILE * SetErrorHandlerFile(FILE *ferr)
Set the file stream where to log (default stderr).
Float_t GetEffSessions() const
Definition: TProofServ.h:264
static void SetLastMsg(const char *lastmsg)
Set the message to be sent back in case of exceptions.
const char * GetDataDirOpts() const
Definition: TProofServ.h:251
TSocket * GetSocket() const
Definition: TProofServ.h:257
void SendAsynMessage(const char *msg, Bool_t lf=kTRUE)
Send an asychronous message to the master / client .
Bool_t IsMaster() const
Definition: TProofServ.h:293
const char * GetDataDir() const
Definition: TProofServ.h:250
Int_t GetTotSessions() const
Definition: TProofServ.h:262
Long64_t GetMsgSizeHWM() const
Definition: TProofServ.h:274
TProofLockPath * GetCacheLock()
Definition: TProofServ.h:281
static Long_t GetResMemMax()
ResMemMax getter.
const char * GetCacheDir() const
Definition: TProofServ.h:248
static Long_t GetVirtMemMax()
VirtMemMax getter.
const char * GetSessionDir() const
Definition: TProofServ.h:247
Int_t GetProtocol() const
Definition: TProofServ.h:252
const char * GetPrefix() const
Definition: TProofServ.h:276
static Float_t GetMemStop()
MemStop getter.
Bool_t IsTopMaster() const
Definition: TProofServ.h:295
Implementation of TProof controlling PROOF federated clusters.
void ValidateDSet(TDSet *dset)
Validate a TDSet.
This class controls a Parallel ROOT Facility, PROOF, cluster.
Definition: TProof.h:316
Bool_t fRedirLog
Definition: TProof.h:510
Int_t fMergersCount
Definition: TProof.h:550
void Activate(TList *slaves=0)
Activate slave server list.
Definition: TProof.cxx:2367
FILE * fLogFileW
Definition: TProof.h:512
void AddFeedback(const char *name)
Add object to feedback list.
Definition: TProof.cxx:9961
Bool_t IsParallel() const
Definition: TProof.h:939
TObject * GetParameter(const char *par) const
Get specified parameter.
Definition: TProof.cxx:9890
@ kRunning
Definition: TProof.h:371
void ResetMergePrg()
Reset the merge progress notificator.
Definition: TProof.cxx:2443
Int_t fSeqNum
Definition: TProof.h:526
Int_t Collect(const TSlave *sl, Long_t timeout=-1, Int_t endtype=-1, Bool_t deactonfail=kFALSE)
Collect responses from slave sl.
Definition: TProof.cxx:2647
static Int_t SendInputData(TQueryResult *qr, TProof *p, TString &emsg)
Send the input data file to the workers.
Definition: TProof.cxx:12353
Int_t Broadcast(const TMessage &mess, TList *slaves)
Broadcast a message to all slaves in the specified list.
Definition: TProof.cxx:2453
Bool_t IsSync() const
Definition: TProof.h:669
void SetParameter(const char *par, const char *value)
Set input list parameter.
Definition: TProof.cxx:9794
Int_t GetParallel() const
Returns number of slaves active in parallel mode.
Definition: TProof.cxx:2282
void RemoveFeedback(const char *name)
Remove object from feedback list.
Definition: TProof.cxx:9972
void FinalizationDone()
Definition: TProof.h:694
ERunStatus GetRunStatus() const
Definition: TProof.h:943
TList * GetListOfActiveSlaves() const
Definition: TProof.h:723
@ kIsClient
Definition: TProof.h:344
void Progress(Long64_t total, Long64_t processed)
Get query progress information.
Definition: TProof.cxx:9173
Long64_t Finalize(Int_t query=-1, Bool_t force=kFALSE)
Finalize the qry-th query in fQueries.
Definition: TProof.cxx:5855
Bool_t IsLite() const
Definition: TProof.h:933
TString fLogFileName
Definition: TProof.h:511
Int_t fProtocol
Definition: TProof.h:571
Int_t SendFile(const char *file, Int_t opt=(kBinary|kForward|kCp|kCpBin), const char *rfile=0, TSlave *sl=0)
Send a file to master or slave servers.
Definition: TProof.cxx:6866
void Feedback(TList *objs)
Get list of feedback objects.
Definition: TProof.cxx:9240
EQueryMode GetQueryMode(Option_t *mode=0) const
Find out the query mode based on the current setting and 'mode'.
Definition: TProof.cxx:6091
@ kBinary
Definition: TProof.h:443
@ kCp
Definition: TProof.h:447
@ kCpBin
Definition: TProof.h:446
@ kForward
Definition: TProof.h:445
virtual void SendInputDataFile()
Send the input data objects to the master; the objects are taken from the dedicated list and / or the...
Definition: TProof.cxx:9584
Int_t fNotIdle
Definition: TProof.h:505
Bool_t IsMaster() const
Definition: TProof.h:936
TList * GetInputList()
Get input list.
Definition: TProof.cxx:9725
TStopwatch fQuerySTW
Definition: TProof.h:593
@ kSync
Definition: TProof.h:350
A container class for query results.
Definition: TQueryResult.h:36
virtual void SetOutputList(TList *out, Bool_t adopt=kTRUE)
Set / change the output list.
void SetNumMergers(Int_t nmergers)
Definition: TQueryResult.h:101
const char * GetLibList() const
Definition: TQueryResult.h:129
Bool_t Matches(const char *ref)
Return TRUE if reference ref matches.
TMacro * GetSelecImp() const
Definition: TQueryResult.h:128
void SetMergeTime(Float_t mergetime)
Definition: TQueryResult.h:98
void SetRecvTime(Float_t recvtime)
Definition: TQueryResult.h:99
TList * GetOutputList()
Definition: TQueryResult.h:131
void Print(Option_t *opt="") const
Print query content. Use opt = "F" for a full listing.
TDatime GetStartTime() const
Definition: TQueryResult.h:117
virtual void SetProcessInfo(Long64_t ent, Float_t cpu=0., Long64_t siz=-1, Float_t inittime=0., Float_t proctime=0.)
Set processing info.
TMacro * GetSelecHdr() const
Definition: TQueryResult.h:127
const char * GetOptions() const
Definition: TQueryResult.h:119
Bool_t IsDraw() const
Definition: TQueryResult.h:144
TList * GetInputList()
Definition: TQueryResult.h:120
virtual void SetFinalized()
Definition: TQueryResult.h:91
Bool_t IsFinalized() const
Definition: TQueryResult.h:145
static const char * GetMacroPath()
Get macro search path. Static utility function.
Definition: TROOT.cxx:2764
static void SetMacroPath(const char *newpath)
Set or extend the macro search path.
Definition: TROOT.cxx:2790
A TSelector object is used by the TTree::Draw, TTree::Scan, TTree::Process to navigate in a TTree and...
Definition: TSelector.h:33
virtual EAbort GetAbort() const
Definition: TSelector.h:75
virtual void ProcessFill(Long64_t)
Definition: TSelector.cxx:317
@ kAbortFile
Definition: TSelector.h:36
@ kAbortProcess
Definition: TSelector.h:36
@ kContinue
Definition: TSelector.h:36
virtual int Version() const
Definition: TSelector.h:54
virtual TList * GetInputList() const
Definition: TSelector.h:70
virtual Bool_t Process(Long64_t)
Definition: TSelector.cxx:330
static Bool_t IsStandardDraw(const char *selec)
Find out if this is a standard selection used for Draw actions (either TSelectorDraw,...
Definition: TSelector.cxx:237
virtual void SlaveBegin(TTree *)
Definition: TSelector.h:57
virtual Bool_t ProcessCut(Long64_t)
Definition: TSelector.cxx:300
virtual void SetOption(const char *option)
Definition: TSelector.h:66
virtual Long64_t GetStatus() const
Definition: TSelector.h:60
virtual void SetInputList(TList *input)
Definition: TSelector.h:68
virtual void Abort(const char *why, EAbort what=kAbortProcess)
Abort processing.
Definition: TSelector.cxx:116
virtual TList * GetOutputList() const
Definition: TSelector.h:71
virtual void SlaveTerminate()
Definition: TSelector.h:72
virtual void Begin(TTree *)
Definition: TSelector.h:56
virtual void Terminate()
Definition: TSelector.h:73
static TSelector * GetSelector(const char *filename)
The code in filename is loaded (interpreted or compiled, see below), filename must contain a valid cl...
Definition: TSelector.cxx:142
virtual Int_t IndexOf(const TObject *obj) const
Return index of object in collection.
Class describing a PROOF worker server.
Definition: TSlave.h:46
TSocket * GetSocket() const
Definition: TSlave.h:134
const char * GetName() const
Returns name of object.
Definition: TSlave.h:124
const char * GetOrdinal() const
Definition: TSlave.h:131
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
Definition: TSocket.cxx:522
A sorted doubly linked list.
Definition: TSortedList.h:28
This class holds the status of an ongoing operation and collects error messages.
Definition: TStatus.h:32
void Add(const char *mesg)
Add an error message.
Definition: TStatus.cxx:46
void SetMemValues(Long_t vmem=-1, Long_t rmem=-1, Bool_t master=kFALSE)
Set max memory values.
Definition: TStatus.cxx:159
void SetExitStatus(Int_t est)
Definition: TStatus.h:66
@ kNotOk
Definition: TStatus.h:36
Stopwatch class.
Definition: TStopwatch.h:28
Double_t RealTime()
Stop the stopwatch (if it is running) and return the realtime (in seconds) passed between the start a...
Definition: TStopwatch.cxx:110
void Stop()
Stop the stopwatch.
Definition: TStopwatch.cxx:77
void Reset()
Definition: TStopwatch.h:52
Basic string class.
Definition: TString.h:131
Ssiz_t Length() const
Definition: TString.h:405
TString & Insert(Ssiz_t pos, const char *s)
Definition: TString.h:644
Int_t Atoi() const
Return integer value of string.
Definition: TString.cxx:1896
Bool_t EndsWith(const char *pat, ECaseCompare cmp=kExact) const
Return true if string ends with the specified string.
Definition: TString.cxx:2152
TString & Replace(Ssiz_t pos, Ssiz_t n, const char *s)
Definition: TString.h:677
const char * Data() const
Definition: TString.h:364
Bool_t IsDigit() const
Returns true if all characters in string are digits (0-9) or white spaces, i.e.
Definition: TString.cxx:1738
TString & ReplaceAll(const TString &s1, const TString &s2)
Definition: TString.h:687
Ssiz_t Last(char c) const
Find last occurrence of a character c.
Definition: TString.cxx:876
TObjArray * Tokenize(const TString &delim) const
This function is used to isolate sequential tokens in a TString.
Definition: TString.cxx:2172
Bool_t BeginsWith(const char *s, ECaseCompare cmp=kExact) const
Definition: TString.h:610
Bool_t IsNull() const
Definition: TString.h:402
TString & Remove(Ssiz_t pos)
Definition: TString.h:668
static TString Format(const char *fmt,...)
Static method which formats a string using a printf style format descriptor and return a TString.
Definition: TString.cxx:2286
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
Definition: TString.cxx:2264
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
Definition: TString.h:619
virtual Func_t DynFindSymbol(const char *module, const char *entry)
Find specific entry point in specified library.
Definition: TSystem.cxx:2044
virtual Bool_t ExpandPathName(TString &path)
Expand a pathname getting rid of special shell characters like ~.
Definition: TSystem.cxx:1264
virtual const char * DirName(const char *pathname)
Return the directory name in pathname.
Definition: TSystem.cxx:1013
virtual const char * Getenv(const char *env)
Get environment variable.
Definition: TSystem.cxx:1652
virtual const char * GetIncludePath()
Get the list of include path.
Definition: TSystem.cxx:3894
virtual TString SplitAclicMode(const char *filename, TString &mode, TString &args, TString &io) const
This method split a filename of the form:
Definition: TSystem.cxx:4160
virtual int MakeDirectory(const char *name)
Make a directory.
Definition: TSystem.cxx:834
virtual int GetSysInfo(SysInfo_t *info) const
Returns static system info, like OS type, CPU type, number of CPUs RAM size, etc into the SysInfo_t s...
Definition: TSystem.cxx:2501
virtual void SetIncludePath(const char *includePath)
IncludePath should contain the list of compiler flags to indicate where to find user defined header f...
Definition: TSystem.cxx:4096
virtual int Load(const char *module, const char *entry="", Bool_t system=kFALSE)
Load a shared library.
Definition: TSystem.cxx:1843
virtual Bool_t AccessPathName(const char *path, EAccessMode mode=kFileExists)
Returns FALSE if one can access a file using the specified access mode.
Definition: TSystem.cxx:1286
virtual Bool_t ChangeDirectory(const char *path)
Change directory.
Definition: TSystem.cxx:869
virtual int GetProcInfo(ProcInfo_t *info) const
Returns cpu and memory used by this process into the ProcInfo_t structure.
Definition: TSystem.cxx:2532
virtual void DispatchOneEvent(Bool_t pendingOnly=kFALSE)
Dispatch a single event.
Definition: TSystem.cxx:438
virtual const char * BaseName(const char *pathname)
Base name of a file name. Base name of /user/root is root.
Definition: TSystem.cxx:941
virtual const char * HostName()
Return the system's host name.
Definition: TSystem.cxx:312
virtual const char * WorkingDirectory()
Return working directory.
Definition: TSystem.cxx:878
virtual char * Which(const char *search, const char *file, EAccessMode mode=kFileExists)
Find location of file in a search path.
Definition: TSystem.cxx:1536
virtual int Unlink(const char *name)
Unlink, i.e.
Definition: TSystem.cxx:1371
virtual Int_t RedirectOutput(const char *name, const char *mode="a", RedirectHandle_t *h=0)
Redirect standard output (stdout, stderr) to the specified file.
Definition: TSystem.cxx:1702
virtual const char * TempDirectory() const
Return a user configured or systemwide directory to create temporary files in.
Definition: TSystem.cxx:1472
char * DynamicPathName(const char *lib, Bool_t quiet=kFALSE)
Find a dynamic library called lib using the system search paths.
Definition: TSystem.cxx:2020
Handles synchronous and a-synchronous timer events.
Definition: TTimer.h:51
virtual void Start(Long_t milliSec=-1, Bool_t singleShot=kFALSE)
Starts the timer with a milliSec timeout.
Definition: TTimer.cxx:211
void SetObject(TObject *object)
Set the object to be notified at time out.
Definition: TTimer.cxx:184
virtual Bool_t Notify()
Notify when timer times out.
Definition: TTimer.cxx:143
A TTree object has a header with a name and a title.
Definition: TTree.h:71
virtual void SetMaxEntryLoop(Long64_t maxev=kMaxEntries)
Definition: TTree.h:559
TDirectory * GetDirectory() const
Definition: TTree.h:401
virtual void SetMaxVirtualSize(Long64_t size=0)
Definition: TTree.h:561
virtual void DropBaskets()
Remove some baskets from memory.
Definition: TTree.cxx:4314
virtual void SetDirectory(TDirectory *dir)
Change the tree's directory.
Definition: TTree.cxx:8590
virtual Long64_t GetEntries() const
Definition: TTree.h:402
virtual void SetAutoFlush(Long64_t autof=-30000000)
This function may be called at the start of a program to change the default value for fAutoFlush.
Definition: TTree.cxx:7870
This class defines a UUID (Universally Unique IDentifier), also known as GUIDs (Globally Unique IDent...
Definition: TUUID.h:42
const char * AsString() const
Return UUID as string. Copy string immediately since it will be reused.
Definition: TUUID.cxx:560
This class represents a WWW compatible URL.
Definition: TUrl.h:35
const char * GetUrl(Bool_t withDeflt=kFALSE) const
Return full URL.
Definition: TUrl.cxx:385
const char * GetFile() const
Definition: TUrl.h:72
const char * GetHost() const
Definition: TUrl.h:70
const char * GetHostFQDN() const
Return fully qualified domain name of url host.
Definition: TUrl.cxx:467
void SetHost(const char *host)
Definition: TUrl.h:87
const char * GetProtocol() const
Definition: TUrl.h:67
Int_t GetPort() const
Definition: TUrl.h:81
virtual Bool_t SendProcessingProgress(Double_t, Double_t, Bool_t=kFALSE)
virtual Bool_t SendProcessingStatus(const char *, Bool_t=kFALSE)
The packetizer is a load balancing object created for each query.
Float_t GetProcTime() const
virtual Int_t AddWorkers(TList *workers)
Adds new workers.
TNtuple * GetProgressPerf(Bool_t steal=kFALSE)
void SetFailedPackets(TList *list)
Long64_t GetTotalEntries() const
virtual void StopProcess(Bool_t abort, Bool_t stoptimer=kFALSE)
Stop process.
Float_t GetInitTime() const
virtual void SetInitTime()
Set the initialization time.
Bool_t IsValid() const
Long64_t GetBytesRead() const
TList * GetConfigParams(Bool_t steal=kFALSE)
virtual TDSetElement * GetNextPacket(TSlave *sl, TMessage *r)
Get next packet.
const Int_t n
Definition: legend1.C:16
TH1F * h1
Definition: legend1.C:5
TF1 * f1
Definition: legend1.C:11
static constexpr double nm
static constexpr double pi
static constexpr double ps
Short_t Abs(Short_t d)
Definition: TMathBase.h:120
Definition: first.py:1
Int_t fCpus
Definition: TSystem.h:155
Int_t fPhysRam
Definition: TSystem.h:159
auto * m
Definition: textangle.C:8
auto * t1
Definition: textangle.C:20