103 if (ent > 0 &&
entfst > 0) {
106 }
else if (ent <
entfst) {
124 : fIsDone(
kFALSE), fNode(node), fElement(
elem), fNextEntry(
elem->GetFirst())
187 Printf(
"++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
195 if ((
e =
fs->GetElement())) {
196 Printf(
"+++ #%d: %s %lld - %lld (%lld) - next: %lld ", ++nn,
e->GetName(),
197 e->GetFirst(),
e->GetFirst() +
e->GetNum() - 1,
e->GetNum(),
fs->GetNextEntry());
199 Printf(
"+++ #%d: no element! ", ++nn);
207 if ((
e =
fs->GetElement())) {
208 Printf(
"+++ #%d: %s %lld - %lld (%lld) - next: %lld", ++nn,
e->GetName(),
209 e->GetFirst(),
e->GetFirst() +
e->GetNum() - 1,
e->GetNum(),
fs->GetNextEntry());
211 Printf(
"+++ #%d: no element! ", ++nn);
215 Printf(
"++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
271 Error(
"Compare",
"input is not a TPacketizer::TFileNode object");
320 : fNodeName(
name), fFiles(
new TList), fUnAllocFileNext(0),
321 fActFiles(
new TList), fActFileNext(0), fMySlaveCnt(0),
322 fExtSlaveCnt(0), fRunSlaveCnt(0), fProcessed(0), fEvents(0),
368 : fFileNode(0), fCurFile(0), fCurElem(0),
369 fCurProcessed(0), fCurProcTime(0)
403 Error(
"UpdateRates",
"no status object!");
406 if (fCurFile->IsDone()) {
414 st->SetLastEntries(
st->GetEntries() - fStatus->GetEntries());
426 if (
st && fDSubSet && fCurElem) {
429 fDSubSet->Add(fCurElem);
433 Error(
"AddProcessed",
"processed subset of current elem undefined");
450 PDB(kPacketizer,1)
Info(
"TPacketizerAdaptive",
451 "enter (first %lld, num %lld)", first, num);
469 Error(
"TPacketizerAdaptive",
"No progress status");
497 Info(
"TPacketizerAdaptive",
"using the basic strategy of TPacketizer");
499 Warning(
"TPacketizerAdaptive",
"unsupported strategy index (%d): ignore",
strategy);
505 Info(
"TPacketizerAdaptive",
506 "The value of PROOF_MaxSlavesPerNode must be positive");
514 Info(
"TPacketizerAdaptive",
515 "The value of PROOF_MaxSlavesPerNode must be positive");
526 Info(
"TPacketizerAdaptive",
"Setting max number of workers per node to %ld",
541 Info(
"TPacketizerAdaptive",
542 "The only accepted value of PROOF_ForceLocal parameter is 1 !");
555 Info(
"TPacketizerAdaptive",
556 "using alternate fraction of query time as a packet size: %d",
559 Info(
"TPacketizerAdaptive",
"packetAsAFraction parameter must be higher than 0");
569 Info(
"TPacketizerAdaptive",
"failed packets will be re-assigned");
612 if (
e->GetValid())
continue;
620 Info(
"TPacketizerAdaptive",
"element name: %s (url: %s)",
e->GetFileName(),
url.GetUrl());
624 if ( !
url.IsValid() ||
628 }
else if (
url.IsValid() && !
strncmp(
url.GetProtocol(),
"file", 4)) {
630 url.SetProtocol(
"root");
632 host =
url.GetHostFQDN();
635 if (host.
Contains(
"localhost") || host ==
"127.0.0.1") {
637 host =
url.GetHostFQDN();
647 if (
strncmp(
url.GetFile(), os->GetName(), os->GetString().Length()) == 0) {
648 disk = os->GetName();
665 Info(
"TPacketizerAdaptive",
"creating new node '%s' or the element",
nodeStr.Data());
668 Info(
"TPacketizerAdaptive",
"adding element to existing node '%s'",
nodeStr.Data());
693 Info(
"TPacketizerAdaptive",
694 "processing subset of entries: validating by file? %s",
byfile ?
"yes":
"no");
709 Info(
"TPacketizerAdaptive",
710 "processing range: first %lld, num %lld", first, num);
718 if (!
e->GetValid())
continue;
724 Info(
"TPacketizerAdaptive",
"processing element '%s'",
e->GetFileName());
726 Info(
"TPacketizerAdaptive",
727 " --> first %lld, elenum %lld (cur %lld) (entrylist: %p)",
eFirst,
eNum,
cur,
e->GetEntryList());
729 if (!
e->GetEntryList()) {
734 Info(
"TPacketizerAdaptive",
" --> skip element cur %lld",
cur);
739 if (num != -1 && (first+num <=
cur)) {
742 Info(
"TPacketizerAdaptive",
" --> drop element cur %lld",
cur);
747 if (
cur <= first || (num != -1 && (first+num <=
cur+
eNum))) {
753 e->SetNum(
e->GetNum() - (first -
cur) );
755 Info(
"TPacketizerAdaptive",
" --> adjust start %lld and end %lld",
759 if (num != -1 && (first+num <=
cur+
eNum)) {
762 e->SetNum( first + num -
e->GetFirst() -
cur );
764 Info(
"TPacketizerAdaptive",
" --> adjust end %lld", first + num -
cur);
771 Info(
"TPacketizerAdaptive",
" --> increment 'cur' by %lld",
eNum);
785 Info(
"TPacketizerAdaptive",
" --> entry-list element: %lld entries",
eNum);
790 Info(
"TPacketizerAdaptive",
" --> event-list element: %lld entries (evl:%p)",
eNum,
evl);
794 Info(
"TPacketizerAdaptive",
" --> empty entry- or event-list element!");
799 Info(
"TPacketizerAdaptive",
" --> next cur %lld",
cur);
803 if ( !
url.IsValid() ||
807 }
else if (
url.IsValid() && !
strncmp(
url.GetProtocol(),
"file", 4)) {
809 url.SetProtocol(
"root");
811 host =
url.GetHostFQDN();
814 if (host.
Contains(
"localhost") || host ==
"127.0.0.1") {
816 host =
url.GetHostFQDN();
826 if (
strncmp(
url.GetFile(), os->GetName(), os->GetString().Length()) == 0) {
827 disk = os->GetName();
845 Info(
"TPacketizerAdaptive",
" --> creating new node '%s' for element",
nodeStr.Data());
848 Info(
"TPacketizerAdaptive",
" --> adding element to exiting node '%s'",
nodeStr.Data());
855 PDB(kPacketizer,2)
e->Print(
"a");
858 Info(
"TPacketizerAdaptive",
"processing %lld entries in %d files on %d hosts",
872 PDB(kPacketizer,1)
Info(
"TPacketizerAdaptive",
"return");
905 if (
fn->GetMySlaveCnt() == 0) {
912 Info(
"InitStats",
"no valid or non-empty file found: setting invalid");
925 PDB(kPacketizer,1)
Info(
"InitStats",
"return");
939 Info(
"GetNextUnAlloc",
"looking for file on node %s", node->
GetName());
968 Info(
"GetNextUnAlloc",
"found! (host: %s)",
uu.GetHost());
973 Warning(
"GetNextUnAlloc",
"unallocate entry %d is empty!", i);
980 Info(
"GetNextUnAlloc",
"reached Workers-per-Node Limit (%ld)",
fMaxSlaveCnt);
986 while (file == 0 && ((node =
NextNode()) != 0)) {
988 Info(
"GetNextUnAlloc",
"looking for file on node %s", node->
GetName());
1001 PDB(kPacketizer, 2) {
1003 Info(
"GetNextUnAlloc",
"no file found!");
1019 PDB(kPacketizer,2) {
1065 PDB(kPacketizer,2) {
1066 Info(
"NextActiveNode",
"enter");
1074 Info(
"NextActiveNode",
"reached Workers-per-Node limit (%ld)",
fMaxSlaveCnt);
1118 while ((key =
slaves.Next()) != 0) {
1121 Warning(
"Reset",
"TSlaveStat associated to key '%s' is NULL", key->
GetName());
1131 if (
fn->GetMySlaveCnt() <
fncnt) {
1139 fnmin->IncMySlaveCnt();
1141 Info(
"Reset",
"assigning node '%s' to '%s' (cnt: %d)",
1167 Info(
"ValidateFiles",
"socket added to monitor: %p (%s)",
1168 slm->GetSocket(),
slm->GetName());
1169 mon.Add(
slm->GetSocket());
1173 mon.DeActivateAll();
1201 Error(
"ValidateFiles",
"TSlaveStat associated to slave '%s' is NULL", s->GetName());
1209 if ((node =
slstat->GetFileNode()) != 0) {
1227 if (entries < 0 ||
strlen(
elem->GetTitle()) <= 0) {
1236 s->GetSocket()->Send(
m );
1237 mon.Activate(s->GetSocket());
1239 Info(
"ValidateFiles",
1240 "sent to worker-%s (%s) via %p GETENTRIES on %s %s %s %s",
1241 s->GetOrdinal(), s->GetName(), s->GetSocket(),
1242 dset->IsTree() ?
"tree" :
"objects",
elem->GetFileName(),
1243 elem->GetDirectory(),
elem->GetObjName());
1246 elem->SetTDSetOffset(entries);
1250 if (!
elem->GetEntryList()) {
1251 if (
elem->GetFirst() > entries) {
1252 Error(
"ValidateFiles",
1253 "first (%lld) higher then number of entries (%lld) in %s",
1254 elem->GetFirst(), entries,
elem->GetFileName());
1256 slstat->fCurFile->SetDone();
1260 if (
elem->GetNum() == -1) {
1261 elem->SetNum(entries -
elem->GetFirst());
1262 }
else if (
elem->GetFirst() +
elem->GetNum() > entries) {
1263 Warning(
"ValidateFiles",
"num (%lld) + first (%lld) larger then number of"
1264 " keys/entries (%lld) in %s",
elem->GetNum(),
elem->GetFirst(),
1265 entries,
elem->GetFileName());
1266 elem->SetNum(entries -
elem->GetFirst());
1269 Info(
"ValidateFiles",
1270 "found elem '%s' with %lld entries",
elem->GetFileName(), entries);
1287 if (
mon.GetActive() == 0) {
1294 Info(
"ValidateFiles",
"{%lld, %lld, %lld}: needs to validate %lld more files",
1303 Info(
"ValidateFiles",
"no need to validate more files");
1311 PDB(kPacketizer,3) {
1312 Info(
"ValidateFiles",
"waiting for %d slaves:",
mon.GetActive());
1318 Info(
"ValidateFiles",
" worker-%s (%s)",
1319 sl->GetOrdinal(),
sl->GetName());
1327 Error(
"ValidateFiles",
"selection has been interrupted - STOP");
1328 mon.DeActivateAll();
1332 mon.DeActivate(sock);
1334 PDB(kPacketizer,3)
Info(
"ValidateFiles",
"select returned: %p", sock);
1339 Error(
"ValidateFiles",
"worker-%s (%s) got invalid - STOP",
1350 Error(
"ValidateFiles",
"Recv failed! for worker-%s (%s)",
1363 Error(
"ValidateFiles",
"kPROOF_FATAL from worker-%s (%s)",
1379 (*reply) >> entries;
1382 if ((
reply->BufferSize() >
reply->Length())) {
1388 e->SetTDSetOffset(entries);
1394 if (!
e->GetEntryList()) {
1395 if (
e->GetFirst() > entries) {
1396 Error(
"ValidateFiles",
1397 "first (%lld) higher then number of entries (%lld) in %s",
1398 e->GetFirst(), entries,
e->GetFileName());
1406 if (
e->GetNum() == -1) {
1407 e->SetNum(entries -
e->GetFirst());
1408 }
else if (
e->GetFirst() +
e->GetNum() > entries) {
1409 Error(
"ValidateFiles",
1410 "num (%lld) + first (%lld) larger then number of keys/entries (%lld) in %s",
1411 e->GetNum(),
e->GetFirst(), entries,
e->GetFileName());
1412 e->SetNum(entries -
e->GetFirst());
1426 Error(
"ValidateFiles",
"cannot get entries for file: %s - skipping",
e->GetFileName() );
1433 m <<
TString(
Form(
"Cannot get entries for file: %s - skipping",
1442 PDB(kPacketizer,3)
Info(
"ValidateFiles",
" %lld events validated",
totent);
1466 if (
el->GetValid()) {
1520 PDB(kPacketizer,3) {
1521 Info(
"CalculatePacketSize",
"%s: switching off synchronization of packet and cache sizes:",
slstat->GetOrdinal());
1522 Info(
"CalculatePacketSize",
"%s: few files (%d) remaining of very different sizes (max/avg = %.2f > %.2f)",
1544 Info(
"CalculatePacketSize",
"%s: avgr: %f, rate: %f, left: %lld, pacT: %f, sz: %f (csz: %f), num: %lld",
1555 Info(
"CalculatePacketSize",
"%s: num: %lld",
slstat->GetOrdinal(), num);
1558 if (num < 1) num = 1;
1575 Error(
"AddProcessed",
"%s: TSlaveStat instance for worker %s not found!",
1576 (
sl ?
sl->GetOrdinal() :
"x.x"),
1577 (
sl ?
sl->GetName() :
"**undef**"));
1583 if (
slstat->fCurElem != 0 ) {
1596 progress =
slstat->AddProcessed(status);
1598 (*fProgressStatus) += *progress;
1600 slstat->UpdateRates(status);
1607 Info(
"AddProcessed",
"%s: %s: %lld %7.3lf %7.3lf %7.3lf %lld",
1613 slstat->fCurElem->GetFileName(),
1633 Error(
"AddProcessed",
"%s: processed too much? (%lld, %lld)",
1674 Error(
"GetNextPacket",
"TSlaveStat instance for worker %s not found!",
1675 (
sl ?
sl->GetName() :
"**undef**"));
1687 if (
slstat->fCurElem != 0 ) {
1694 if (
sl->GetProtocol() > 18) {
1699 if (
sl->GetProtocol() > 25) {
1712 if (
r->BufferSize() >
r->Length()) (*r) >>
bytesRead;
1715 if (
r->BufferSize() >
r->Length()) (*r) >>
totev;
1723 Error(
"GetNextPacket",
"%s: the worker processed a different # of entries",
sl->GetOrdinal());
1726 Error(
"GetNextPacket",
"%s: processed too many entries! (%lld, %lld)",
1736 Info(
"GetNextPacket",
"%s: file '%s' turned corrupted: invalidating file (%lld)",
1740 Info(
"GetNextPacket",
"%s: %d entries un-processed",
sl->GetOrdinal(),
nunproc);
1754 Info(
"GetNextPacket",
"%s: removed file: %s, entries left: %lld",
sl->GetOrdinal(),
1759 Info(
"GetNextPacket",
"%s: file '%s' could not be open: invalidating related element",
1773 Info(
"GetNextPacket",
"%s: error raised by worker, but TFileStat object invalid:"
1774 " protocol error?",
sl->GetOrdinal());
1791 Info(
"GetNextPacket",
"%s: entries processed: %lld - looking for a packet from node '%s'",
1795 if ( file != 0 && file->
IsDone() ) {
1817 if (
slstat->GetFileNode() != 0 ) {
1832 if (
slstat->GetFileNode()->GetRunSlaveCnt() >
1833 slstat->GetFileNode()->GetMySlaveCnt() - 1 )
1865 file =
slstat->GetFileNode()->GetNextUnAlloc();
1867 file =
slstat->GetFileNode()->GetNextActive();
1883 if (file == 0)
return 0;
1893 Error(
"GetNextPacket",
1894 "inconsistent value for fNEventsOnRemLoc (%lld): stop delivering packets!",
1918 if ( first + num * 1.5 >= last ) {
1939 Info(
"GetNextPacket",
"%s: %s %lld %lld (%lld)",
sl->GetOrdinal(), base->
GetFileName(), first, first + num - 1, num);
1952 while ((key =
nxw())) {
1972 while ((key =
nxw()) != 0) {
2018 while ((key =
nxw()) != 0) {
2025 dt =
now -
slstat->GetProgressStatus()->GetLastUpdate();
2036 Info(
"GetEstEntriesProcessed",
"%s: e:%lld rate:%f dt:%f e:%lld",
2037 slstat->fSlave->GetOrdinal(),
2045 Info(
"GetEstEntriesProcessed",
2046 "dt: %f, estimated entries: %lld (%lld), bytes read: %lld rate: %f (all: %d)",
2055 return ((
all) ? 0 : 1);
2073 Error(
"MarkBad",
"Worker does not exist");
2079 slaveStat->fCurFile->GetNode()->DecRunSlaveCnt();
2099 if (
e->MergeElement(
enxt) >= 0) {
2113 Warning(
"MarkBad",
"subset processed by bad worker not found!");
2115 (*fProgressStatus) -= *(
slaveStat->GetProgressStatus());
2132 Error(
"ReassignPacket",
"empty packet!");
2140 if (!
url.IsValid() ||
strncmp(
url.GetProtocol(),
"root", 4)) {
2143 host =
url.GetHost();
2161 (*listOfMissingFiles)->Add((
TObject *)
fi);
2175 Error(
"SplitPerHost",
"Empty list of packets!");
2178 if (elements->
GetSize() <= 0) {
2179 Error(
"SplitPerHost",
"The input list contains no elements");
2188 Error(
"SplitPerHost",
"Error removing a missing file");
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void input
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h offset
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h Atom_t Int_t ULong_t ULong_t bytes
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize fs
R__EXTERN TProofServ * gProofServ
R__EXTERN TProof * gProof
char * Form(const char *fmt,...)
Formats a string in a circular formatting buffer.
void Printf(const char *fmt,...)
Formats a string in a circular formatting buffer and prints the string.
R__EXTERN TSystem * gSystem
virtual void AddAll(const TCollection *col)
Add all objects from collection col to this collection.
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
void Print(Option_t *option="") const override
Default print for collections, calls Print(option, 1).
virtual Int_t GetSize() const
Return the capacity of the collection, i.e.
Manages an element of a TDSet.
void SetEntries(Long64_t ent)
Long64_t GetEntries(Bool_t istree=kTRUE, Bool_t openfile=kTRUE)
Returns number of entries in tree or objects in file.
TObject * GetEntryList() const
const char * GetFileName() const
Long64_t GetFirst() const
This class implements a data set to be used for PROOF processing.
A List of entry numbers in a TTree or TChain.
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
<div class="legacybox"><h2>Legacy Code</h2> TEventList is a legacy interface: there will be no bug fi...
Class describing a generic file including meta information.
TObject * After(const TObject *obj) const override
Returns the object after object obj.
void Clear(Option_t *option="") override
Remove all objects from the list.
TObject * FindObject(const char *name) const override
Find an object in this list using its name.
void Add(TObject *obj) override
TObject * Remove(TObject *obj) override
Remove object from the list.
TObject * Last() const override
Return the last object in the list. Returns 0 when list is empty.
TObject * First() const override
Return the first object in the list. Returns 0 when list is empty.
TObject * At(Int_t idx) const override
Returns the object at position idx. Returns 0 if idx is out of range.
virtual void Sort(Bool_t order=kSortAscending)
Sort linked list.
TMap implements an associative array of (key,value) pairs using a THashTable for efficient retrieval ...
void Add(TObject *obj) override
This function may not be used (but we need to provide it since it is a pure virtual in TCollection).
TObject * Remove(TObject *key) override
Remove the (key,value) pair with key from the map.
void DeleteValues()
Remove all (key,value) pairs from the map AND delete the values when they are allocated on the heap.
TObject * GetValue(const char *keyname) const
Returns a pointer to the value associated with keyname as name of the key.
const char * GetName() const override
Returns name of object.
Collectable string class.
Mother of all ROOT objects.
virtual const char * GetName() const
Returns name of object.
R__ALWAYS_INLINE Bool_t TestBit(UInt_t f) const
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Int_t GetRunSlaveCnt() const
Int_t GetMySlaveCnt() const
Int_t GetNumberOfActiveFiles() const
Int_t GetSlaveCnt() const
void IncEvents(Long64_t nEvents)
const char * GetName() const override
Returns name of object.
Long64_t GetNEvents() const
void Print(Option_t *=0) const override
This method must be overridden when a class wants to print itself.
void IncProcessed(Long64_t nEvents)
void DecreaseProcessed(Long64_t nEvents)
TFileStat * GetNextActive()
TSortedList * fFilesToProcess
void DecExtSlaveCnt(const char *slave)
Long64_t GetEventsLeftPerSlave() const
Int_t Compare(const TObject *other) const override
Compare abstract method.
void IncExtSlaveCnt(const char *slave)
TFileStat * GetNextUnAlloc()
void RemoveActive(TFileStat *file)
Bool_t IsSortable() const override
TFileNode(const char *name, Int_t strategy, TSortedList *files)
Long64_t GetProcessed() const
TObject * fUnAllocFileNext
Int_t GetExtSlaveCnt() const
void Add(TDSetElement *elem, Bool_t tolist)
void Print(Option_t *=0) const override
This method must be overridden when a class wants to print itself.
void MoveNextEntry(Long64_t step)
Bool_t IsSortable() const override
TDSetElement * GetElement() const
Int_t Compare(const TObject *obj) const override
Compare abstract method.
Long64_t GetNextEntry() const
TFileStat(TFileNode *node, TDSetElement *elem, TList *file)
TFileNode * GetNode() const
TFileNode * GetFileNode() const
Long64_t GetEntriesProcessed() const
Int_t GetLocalEventsLeft()
TProofProgressStatus * AddProcessed(TProofProgressStatus *st=0) override
Add the current element to the fDSubSet (subset processed by this worker) and if the status arg is gi...
TSlaveStat(TSlave *slave)
Constructor.
TProofProgressStatus * GetProgressStatus()
~TSlaveStat() override
Cleanup.
void UpdateRates(TProofProgressStatus *st)
Update packetizer rates.
TList * GetProcessedSubSet()
void SetFileNode(TFileNode *node)
Double_t GetProcTime() const
This packetizer is based on TPacketizer but uses different load-balancing algorithms and data structu...
Int_t ReassignPacket(TDSetElement *e, TList **listOfMissingFiles)
The file in the listOfMissingFiles can appear several times; in order to fix that,...
void SplitPerHost(TList *elements, TList **listOfMissingFiles)
Split into per host entries The files in the listOfMissingFiles can appear several times; in order to...
void MarkBad(TSlave *s, TProofProgressStatus *status, TList **missingFiles) override
This method can be called at any time during processing as an effect of handling kPROOF_STOPPROCESS I...
Double_t fMaxEntriesRatio
TFileNode * NextNode()
Get next node which has unallocated files.
Float_t GetCurrentRate(Bool_t &all) override
Get Estimation of the current rate; just summing the current rates of the active workers.
void RemoveActive(TFileStat *file)
Remove file from the list of actives.
Int_t AddProcessed(TSlave *sl, TProofProgressStatus *st, Double_t latency, TList **listOfMissingFiles=0) override
To be used by GetNextPacket but also in reaction to kPROOF_STOPPROCESS message (when the worker was a...
void ValidateFiles(TDSet *dset, TList *slaves, Long64_t maxent=-1, Bool_t byfile=kFALSE)
Check existence of file/dir/tree an get number of entries.
Int_t GetActiveWorkers() override
Return the number of workers still processing.
void RemoveActiveNode(TFileNode *)
Remove node from the list of actives.
Int_t GetEstEntriesProcessed(Float_t, Long64_t &ent, Long64_t &bytes, Long64_t &calls) override
Get estimation for the number of processed entries and bytes read at time t, based on the numbers alr...
Int_t CalculatePacketSize(TObject *slstat, Long64_t cachesz, Int_t learnent)
The result depends on the fStrategy.
Float_t fBaseLocalPreference
Float_t fFractionOfRemoteFiles
Long64_t fNEventsOnRemLoc
~TPacketizerAdaptive() override
Destructor.
TFileNode * NextActiveNode()
Get next active node.
TDSetElement * GetNextPacket(TSlave *sl, TMessage *r) override
Get next packet; A meaningfull difference to TPacketizer is the fact that this packetizer,...
TFileStat * GetNextUnAlloc(TFileNode *node=0, const char *nodeHostName=0)
Get next unallocated file from 'node' or other nodes: First try 'node'.
void RemoveUnAllocNode(TFileNode *)
Remove unallocated node.
void Reset()
Reset the internal data structure for packet distribution.
void InitStats()
(re)initialise the statistics called at the begining or after a worker dies.
TSortedList * fFilesToProcess
TFileStat * GetNextActive()
Get next active file.
Container class for processing statistics.
Double_t GetProcTime() const
Double_t GetLastUpdate() const
Long64_t GetEntries() const
Double_t GetCPUTime() const
Long64_t GetBytesRead() const
TSocket * GetSocket() const
This class controls a Parallel ROOT Facility, PROOF, cluster.
TObject * GetParameter(const char *par) const
Get specified parameter.
void SendDataSetStatus(const char *msg, UInt_t n, UInt_t tot, Bool_t st)
Send or notify data set status.
Class describing a PROOF worker server.
virtual Int_t Recv(TMessage *&mess)
Receive a TMessage object.
virtual Bool_t IsValid() const
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
A sorted doubly linked list.
const char * Data() const
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
virtual TTime Now()
Get current time in milliseconds since 0:00 Jan 1 1995.
virtual const char * HostName()
Return the system's host name.
Basic time type with millisecond precision.
This class represents a WWW compatible URL.
const char * GetHost() const
const char * GetHostFQDN() const
Return fully qualified domain name of url host.
TProofProgressStatus * fStatus
The packetizer is a load balancing object created for each query.
Float_t GetProcTime() const
TProofProgressStatus * fProgressStatus
Long64_t GetReadCalls() const
Long64_t GetEntriesProcessed() const
Double_t GetCumProcTime() const
Bool_t HandleTimer(TTimer *timer) override
Send progress message to client.
Long64_t GetBytesRead() const
TDSetElement * CreateNewPacket(TDSetElement *base, Long64_t first, Long64_t num)
Creates a new TDSetElement from from base packet starting from the first entry with num entries.