91 fRate(0), fTimeInstant(0), fCircLvl(5)
118 Int_t ne = fCircNtp->GetEntries();
121 fCircNtp->Fill(0., 0);
126 fCircNtp->GetEntry(
ne-1);
131 fCircNtp->GetEntry(0);
136 Info(
"UpdatePerformance",
"time:%f, dtime:%f, nevts:%lld, speed: %f",
151 fStatus->SetLastProcTime(0.);
159 Error(
"AddProcessed",
"status arg undefined");
175 PDB(kPacketizer,1)
Info(
"TPacketizerUnit",
"enter (num %lld)", num);
188 Info(
"TPacketizerUnit",
"forcing the same cycles on each worker");
196 Info(
"TPacketizerUnit",
"size of the calibration packets: %.2f %% of average number per worker",
fCalibFrac);
202 Warning(
"TPacketizerUnit",
"PROOF_PacketizerTimeLimit is deprecated: use PROOF_MaxPacketTime instead");
234 if (
slave->GetParallel() > 0) {
242 Info(
"TPacketizerUnit",
"node '%s' has NO active worker: excluded from work distribution",
slave->GetOrdinal());
250 Warning(
"TPacketizerUnit",
"some problems assigning work");
257 PDB(kPacketizer,1)
Info(
"TPacketizerUnit",
"return");
266 Error(
"AssignWork",
"assigned a negative number (%lld) of cycles - protocol error?", num);
272 Info(
"AssignWork",
"assigned %lld additional cycles (new total: %lld)", num,
fTotalEntries);
330 while ((key =
nxw()) != 0) {
355 Warning(
"GetNextPacket",
"Received a packet request from an unknown slave: %s:%s",
356 sl->GetName(),
sl->GetOrdinal());
361 Info(
"GetNextPacket",
"worker-%s: fAssigned %lld\t",
sl->GetOrdinal(),
fAssigned);
371 if (
sl->GetProtocol() > 18) {
379 numev = status->GetEntries() -
slstat->GetEntriesProcessed();
380 progress =
slstat->AddProcessed(status);
385 totev = status->GetEntries();
391 Error(
"GetNextPacket",
"no status came in the kPROOF_GETPACKET message");
397 if (
r->BufferSize() >
r->Length()) (*r) >>
bytesRead;
399 if (
r->BufferSize() >
r->Length()) (*r) >>
totev;
403 slstat->GetProgressStatus()->SetLastUpdate();
412 Info(
"GetNextPacket",
"worker-%s (%s): %lld %7.3lf %7.3lf %7.3lf %lld",
413 sl->GetOrdinal(),
sl->GetName(),
423 Info(
"GetNextPacket",
"worker-%s (%s) is done (%lld cycles)",
424 sl->GetOrdinal(),
sl->GetName(),
slstat->GetEntriesProcessed());
437 Error(
"GetNextPacket",
"problems assigning additional work: stop");
461 if (
slstat->fCircNtp->GetEntries() <= 0) {
465 if (num < 1) num = (
avg >= 1) ?
avg : 1;
467 Info(
"GetNextPacket",
"calibration: total entries %lld, workers %d, frac: %.1f %%, raw num: %lld",
471 slstat->UpdatePerformance(0.);
502 Info(
"GetNextPacket",
"%d: worker-%s: rate %lf /s (sum: %lf /s)",
505 Warning(
"GetNextPacket",
"dynamic_cast<TSlaveStat *> failing on value for '%s (%s)'! Skipping",
512 Error(
"GetNextPacket",
"no worker has consistent information: stop processing!");
523 Info(
"GetNextPacket",
"rate: avg: %lf /s/wrk - sum: %lf /s (measurements %d out of %d)",
530 Info(
"GetNextPacket",
"worker-%s (%s): raw packet size: %lld",
sl->GetOrdinal(),
sl->GetName(), num);
538 Info(
"GetNextPacket",
"worker-%s (%s): time-limited packet size: %lld (upper limit: %.2f secs)",
544 Info(
"GetNextPacket",
"worker-%s (%s): time-limited packet size: %lld (lower limit: %.2f secs)",
557 num = (num > 1) ? num : 1;
571 Info(
"GetNextPacket",
"worker-%s: num %lld, processing %lld, remaining %lld",
sl->GetOrdinal(),
588 Error(
"AddWorkers",
"Null list of new workers!");
596 while ((
sl =
dynamic_cast<TSlave*
>(next()) ))
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void input
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
R__EXTERN TProofServ * gProofServ
virtual Int_t GetEntries() const
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
virtual Int_t GetSize() const
Return the capacity of the collection, i.e.
Manages an element of a TDSet.
This class implements a data set to be used for PROOF processing.
TObject * FindObject(const char *name) const override
Find an object in this list using its name.
void Add(TObject *obj) override
TMap implements an associative array of (key,value) pairs using a THashTable for efficient retrieval ...
void Add(TObject *obj) override
This function may not be used (but we need to provide it since it is a pure virtual in TCollection).
void DeleteValues()
Remove all (key,value) pairs from the map AND delete the values when they are allocated on the heap.
TObject * GetValue(const char *keyname) const
Returns a pointer to the value associated with keyname as name of the key.
A simple TTree restricted to a list of double variables only.
Mother of all ROOT objects.
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
~TSlaveStat() override
Destructor.
TSlaveStat(TSlave *sl, TList *input)
Main constructor.
void UpdatePerformance(Double_t time)
Update the circular ntple.
TProofProgressStatus * AddProcessed(TProofProgressStatus *st) override
Update the status info to the 'st'.
This packetizer generates packets of generic units, representing the number of times an operation cyc...
Float_t GetCurrentRate(Bool_t &all) override
Get Estimation of the current rate; just summing the current rates of the active workers.
Double_t GetCurrentTime()
Get current time.
Int_t AddWorkers(TList *workers) override
Adds new workers. Returns the number of workers added, or -1 on failure.
~TPacketizerUnit() override
Destructor.
Int_t AssignWork(TDSet *, Long64_t, Long64_t num) override
Assign work to be done to this packetizer.
TDSetElement * GetNextPacket(TSlave *sl, TMessage *r) override
Get next packet.
Container class for processing statistics.
void SetLastUpdate(Double_t updtTime=0)
Update time stamp either with the passed value (if > 0) or with the current time.
Double_t GetProcTime() const
Double_t GetCPUTime() const
void IncEntries(Long64_t entries=1)
Long64_t GetBytesRead() const
TDSetElement * GetNextPacket(Long64_t totalEntries=-1)
Get next range of entries to be processed on this server.
Bool_t IsTopMaster() const
TObject * GetParameter(const char *par) const
Get specified parameter.
Class describing a PROOF worker server.
Double_t RealTime()
Stop the stopwatch (if it is running) and return the realtime (in seconds) passed between the start a...
void Start(Bool_t reset=kTRUE)
Start the stopwatch.
void Continue()
Resume a stopped stopwatch.
static TString Format(const char *fmt,...)
Static method which formats a string using a printf style format descriptor and return a TString.
virtual void SetCircular(Long64_t maxEntries)
Enable/Disable circularity for this tree.
virtual void SetDirectory(TDirectory *dir)
Change the tree's directory.
TProofProgressStatus * fStatus
The packetizer is a load balancing object created for each query.
TProofProgressStatus * fProgressStatus
Long64_t GetEntriesProcessed() const
Bool_t HandleTimer(TTimer *timer) override
Send progress message to client.