86 : fIsDone(
kFALSE), fNode(node), fElement(
elem), fNextEntry(
elem->GetFirst())
165 Error(
"Compare",
"input is not a TPacketizer::TFileNode object");
182 std::cout <<
"OBJ: " <<
IsA()->GetName() <<
"\t" <<
fNodeName
184 <<
"\tSlaveCount " <<
fSlaveCnt << std::endl;
200 fActFileNext(0), fMySlaveCnt(0), fSlaveCnt(0)
231 : fFileNode(0), fCurFile(0), fCurElem(0)
254 fStatus->SetLastProcTime(0.);
262 Error(
"AddProcessed",
"status arg undefined");
278 PDB(kPacketizer,1)
Info(
"TPacketizer",
"Enter (first %lld, num %lld)", first, num);
291 Error(
"TPacketizer",
"No progress status");
298 Warning(
"TPacketizer",
"PROOF_MaxSlavesPerNode must be positive");
307 Warning(
"TPacketizer",
"PROOF_MaxSlavesPerNode must be positive");
321 Info(
"TPacketizer",
"setting max number of workers per node to %ld",
fMaxSlaveCnt);
344 if (
e->GetValid())
continue;
350 if ( !
url.IsValid() ||
354 }
else if (
url.IsValid() && !
strncmp(
url.GetProtocol(),
"file", 4)) {
356 url.SetProtocol(
"root");
358 host =
url.GetHost();
361 if (host.
Contains(
"localhost") || host ==
"127.0.0.1") {
363 host =
url.GetHostFQDN();
381 Info(
"TPacketizer",
"Initial number of workers: %d",
nwrks);
392 "processing subset of entries: validating by file? %s",
byfile ?
"yes":
"no");
405 PDB(kPacketizer,2)
Info(
"TPacketizer",
"processing range: first %lld, num %lld", first, num);
413 if (!
e->GetValid())
continue;
423 Info(
"TPacketizer",
" --> '%s'",
e->GetFileName());
427 if (!
e->GetEntryList()){
432 Info(
"TPacketizer",
" --> skip element cur %lld",
cur);
437 if (num != -1 && (first+num <=
cur)) {
440 Info(
"TPacketizer",
" --> drop element cur %lld",
cur);
445 if (
cur <= first || (num != -1 && (first+num <=
cur+
eNum))) {
451 e->SetNum(
e->GetNum() - (first -
cur) );
453 Info(
"TPacketizer",
" --> adjust start %lld and end %lld",
457 if (num != -1 && (first+num <=
cur+
eNum)) {
460 e->SetNum( first + num -
e->GetFirst() -
cur );
462 Info(
"TPacketizer",
" --> adjust end %lld", first + num -
cur);
469 Info(
"TPacketizer",
" --> increment 'cur' by %lld",
eNum);
490 Info(
"TPacketizer",
" --> next cur %lld",
cur);
494 if ( !
url.IsValid() ||
498 }
else if (
url.IsValid() && !
strncmp(
url.GetProtocol(),
"file", 4)) {
500 url.SetProtocol(
"root");
502 host =
url.GetHostFQDN();
505 if (host.
Contains(
"localhost") || host ==
"127.0.0.1") {
507 host =
url.GetHostFQDN();
520 PDB(kPacketizer,2)
e->Print(
"a");
524 Info(
"TPacketizer",
"processing %lld entries in %d files on %d hosts",
534 Info(
"TPacketizer",
"no valid or non-empty file found: setting invalid");
549 Info(
"Process",
"using alternate fraction of query time as a packet Size: %ld",
573 PDB(kPacketizer,1)
Info(
"TPacketizer",
"Return");
598 Error(
"AddWorkers",
"Null list of new workers!");
606 while ((
sl =
dynamic_cast<TSlave*
>(next()) ))
664 std::cout <<
"TPacketizer::NextUnAllocNode()" << std::endl;
670 PDB(kPacketizer,1)
Info(
"NextUnAllocNode",
"reached workers per node limit (%ld)",
709 Printf(
"TPacketizer::NextActiveNode : ----------------------");
716 Info(
"NextActiveNode",
"reached workers per node limit (%ld)",
fMaxSlaveCnt);
760 while ((key =
slaves.Next()) != 0) {
770 Warning(
"Reset",
"TSlaveStat associated to key '%s' is NULL", key->
GetName());
793 Info(
"ValidateFiles",
"socket added to monitor: %p (%s)",
794 slm->GetSocket(),
slm->GetName());
795 mon.Add(
slm->GetSocket());
798 Info(
"ValidateFiles",
799 "mon: %p, wrk: %p, sck: %p", &
mon,
slm,
slm->GetSocket());
827 Error(
"ValidateFiles",
"TSlaveStat associated to slave '%s' is NULL", s->GetName());
834 if ( (node =
slstat->GetFileNode()) != 0 ) {
853 if (entries < 0 ||
strlen(
elem->GetTitle()) <= 0) {
862 s->GetSocket()->Send(
m );
863 mon.Activate(s->GetSocket());
865 Info(
"ValidateFiles",
866 "sent to worker-%s (%s) via %p GETENTRIES on %s %s %s %s",
867 s->GetOrdinal(), s->GetName(), s->GetSocket(),
868 dset->IsTree() ?
"tree" :
"objects",
elem->GetFileName(),
869 elem->GetDirectory(),
elem->GetObjName());
872 elem->SetTDSetOffset(entries);
876 if (!
elem->GetEntryList()) {
877 if (
elem->GetFirst() > entries) {
878 Error(
"ValidateFiles",
879 "first (%lld) higher then number of entries (%lld) in %s",
880 elem->GetFirst(), entries,
elem->GetFileName());
882 slstat->fCurFile->SetDone();
886 if (
elem->GetNum() == -1) {
887 elem->SetNum(entries -
elem->GetFirst());
888 }
else if (
elem->GetFirst() +
elem->GetNum() > entries) {
889 Warning(
"ValidateFiles",
"num (%lld) + first (%lld) larger then number of"
890 " keys/entries (%lld) in %s",
elem->GetNum(),
elem->GetFirst(),
891 entries,
elem->GetFileName());
892 elem->SetNum(entries -
elem->GetFirst());
895 Info(
"ValidateFiles",
896 "found elem '%s' with %lld entries",
elem->GetFileName(), entries);
910 if (
mon.GetActive() == 0) {
917 Info(
"ValidateFiles",
"{%lld, %lld, %lld): needs to validate %lld more files",
926 Info(
"ValidateFiles",
"no need to validate more files");
935 Info(
"ValidateFiles",
"waiting for %d workers:",
mon.GetActive());
939 while ((s = (
TSocket*) next())) {
940 Info(
"ValidateFiles",
"found sck: %p", s);
943 Info(
"ValidateFiles",
" worker-%s (%s)",
sl->GetOrdinal(),
sl->GetName());
951 Error(
"ValidateFiles",
"selection has been interrupted - STOP");
956 mon.DeActivate(sock);
958 PDB(kPacketizer,3)
Info(
"ValidateFiles",
"select returned: %p", sock);
963 Error(
"ValidateFiles",
"worker-%s (%s) got invalid - STOP",
976 Error(
"ValidateFiles",
"Recv failed! for worker-%s (%s)",
986 Error(
"ValidateFiles",
"kPROOF_FATAL from worker-%s (%s)",
1002 (*reply) >> entries;
1005 if ((
reply->BufferSize() >
reply->Length())) {
1011 e->SetTDSetOffset(entries);
1012 if ( entries > 0 ) {
1018 if (!
e->GetEntryList()){
1019 if (
e->GetFirst() > entries ) {
1020 Error(
"ValidateFiles",
"first (%lld) higher then number of entries (%lld) in %s",
1021 e->GetFirst(), entries,
e->GetFileName());
1029 if (
e->GetNum() == -1 ) {
1030 e->SetNum( entries -
e->GetFirst() );
1031 }
else if (
e->GetFirst() +
e->GetNum() > entries ) {
1032 Error(
"ValidateFiles",
1033 "num (%lld) + first (%lld) larger then number of keys/entries (%lld) in %s",
1034 e->GetNum(),
e->GetFirst(), entries,
e->GetFileName());
1035 e->SetNum(entries -
e->GetFirst());
1049 Error(
"ValidateFiles",
"cannot get entries for %s (",
e->GetFileName() );
1056 m <<
TString(
Form(
"Cannot get entries for file: %s - skipping",
e->GetFileName()));
1064 PDB(kPacketizer,3)
Info(
"ValidateFiles",
" %lld events validated",
totent);
1104 if (
slstat == 0 )
return 0;
1106 return slstat->GetEntriesProcessed();
1121 while ((key =
nxw()) != 0) {
1151 Info(
"GetNextPacket",
"worker-%s (%s)",
sl->GetOrdinal(),
sl->GetName());
1155 if (
slstat->fCurElem != 0 ) {
1164 if (
sl->GetProtocol() > 18) {
1174 progress =
slstat->AddProcessed(status);
1185 Error(
"GetNextPacket",
"no status came in the kPROOF_GETPACKET message");
1191 if (
r->BufferSize() >
r->Length()) (*r) >>
bytesRead;
1193 if (
r->BufferSize() >
r->Length()) (*r) >>
totev;
1207 Info(
"GetNextPacket",
"worker-%s (%s): %lld %7.3lf %7.3lf %7.3lf %lld",
1208 sl->GetOrdinal(),
sl->GetName(),
1233 if ( file != 0 && file->
IsDone() ) {
1246 if (
slstat->GetFileNode() != 0) {
1263 if (!file)
return 0;
1277 if (num < 1) num = 1;
1282 if ( first + num >= last ) {
1305 Info(
"GetNextPacket",
"%s: %s %lld %lld",
sl->GetOrdinal(), base->
GetFileName(), first, num);
1318 while ((key =
nxw())) {
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void input
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h offset
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
R__EXTERN TProofServ * gProofServ
R__EXTERN TProof * gProof
char * Form(const char *fmt,...)
Formats a string in a circular formatting buffer.
void Printf(const char *fmt,...)
Formats a string in a circular formatting buffer and prints the string.
R__EXTERN TSystem * gSystem
virtual void AddAll(const TCollection *col)
Add all objects from collection col to this collection.
virtual Int_t GetEntries() const
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
void Print(Option_t *option="") const override
Default print for collections, calls Print(option, 1).
virtual Int_t GetSize() const
Return the capacity of the collection, i.e.
Manages an element of a TDSet.
TObject * GetEntryList() const
const char * GetFileName() const
Long64_t GetFirst() const
This class implements a data set to be used for PROOF processing.
A List of entry numbers in a TTree or TChain.
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
<div class="legacybox"><h2>Legacy Code</h2> TEventList is a legacy interface: there will be no bug fi...
TObject * After(const TObject *obj) const override
Returns the object after object obj.
void Clear(Option_t *option="") override
Remove all objects from the list.
TObject * FindObject(const char *name) const override
Find an object in this list using its name.
void Add(TObject *obj) override
TObject * Remove(TObject *obj) override
Remove object from the list.
TObject * First() const override
Return the first object in the list. Returns 0 when list is empty.
virtual void Sort(Bool_t order=kSortAscending)
Sort linked list.
TMap implements an associative array of (key,value) pairs using a THashTable for efficient retrieval ...
void Add(TObject *obj) override
This function may not be used (but we need to provide it since it is a pure virtual in TCollection).
TObject * FindObject(const char *keyname) const override
Check if a (key,value) pair exists with keyname as name of the key.
void DeleteValues()
Remove all (key,value) pairs from the map AND delete the values when they are allocated on the heap.
TObject * GetValue(const char *keyname) const
Returns a pointer to the value associated with keyname as name of the key.
Mother of all ROOT objects.
virtual const char * GetName() const
Returns name of object.
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
virtual TClass * IsA() const
Int_t GetSlaveCnt() const
void Print(Option_t *) const override
This method must be overridden when a class wants to print itself.
TObject * fUnAllocFileNext
const char * GetName() const override
Returns name of object.
void DecSlaveCnt(const char *slave)
TFileNode(const char *name)
TFileStat * GetNextActive()
void Add(TDSetElement *elem)
Int_t GetNumberOfActiveFiles() const
Bool_t IsSortable() const override
TFileStat * GetNextUnAlloc()
void IncSlaveCnt(const char *slave)
Int_t Compare(const TObject *other) const override
Compare abstract method.
void RemoveActive(TFileStat *file)
TFileStat(TFileNode *node, TDSetElement *elem)
Long64_t GetNextEntry() const
void MoveNextEntry(Long64_t step)
TFileNode * GetNode() const
TDSetElement * GetElement() const
TSlaveStat(TSlave *slave)
void SetFileNode(TFileNode *node)
~TSlaveStat() override
Cleanup.
TFileNode * GetFileNode() const
TProofProgressStatus * AddProcessed(TProofProgressStatus *st) override
This class generates packets to be processed on PROOF worker servers.
TFileStat * GetNextActive()
Get next active file.
TFileNode * NextActiveNode()
Get next active node.
void RemoveUnAllocNode(TFileNode *)
Remove unallocated node.
Float_t GetCurrentRate(Bool_t &all) override
Get Estimation of the current rate; just summing the current rates of the active workers.
~TPacketizer() override
Destructor.
TFileNode * NextUnAllocNode()
Get next unallocated node.
void Reset()
Reset the internal datastructure for packet distribution.
Int_t GetActiveWorkers() override
Return the number of workers still processing.
TDSetElement * GetNextPacket(TSlave *sl, TMessage *r) override
Get next packet.
TFileStat * GetNextUnAlloc(TFileNode *node=0)
Get next unallocated file.
void RemoveActiveNode(TFileNode *)
Remove node from the list of actives.
Int_t AddWorkers(TList *workers) override
Adds new workers. Returns the number of workers added, or -1 on failure.
void ValidateFiles(TDSet *dset, TList *slaves, Long64_t maxent=-1, Bool_t byfile=kFALSE)
Check existence of file/dir/tree an get number of entries.
void RemoveActive(TFileStat *file)
Remove file from the list of actives.
Container class for processing statistics.
void SetLastUpdate(Double_t updtTime=0)
Update time stamp either with the passed value (if > 0) or with the current time.
Double_t GetProcTime() const
void IncBytesRead(Long64_t bytesRead)
Long64_t GetEntries() const
Double_t GetCPUTime() const
void IncEntries(Long64_t entries=1)
Long64_t GetBytesRead() const
TSocket * GetSocket() const
This class controls a Parallel ROOT Facility, PROOF, cluster.
TObject * GetParameter(const char *par) const
Get specified parameter.
void SendDataSetStatus(const char *msg, UInt_t n, UInt_t tot, Bool_t st)
Send or notify data set status.
Class describing a PROOF worker server.
virtual Int_t Recv(TMessage *&mess)
Receive a TMessage object.
virtual Bool_t IsValid() const
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
const char * Data() const
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
virtual const char * HostName()
Return the system's host name.
This class represents a WWW compatible URL.
TProofProgressStatus * fStatus
The packetizer is a load balancing object created for each query.
TProofProgressStatus * fProgressStatus
Long64_t GetEntriesProcessed() const
Bool_t HandleTimer(TTimer *timer) override
Send progress message to client.
TDSetElement * CreateNewPacket(TDSetElement *base, Long64_t first, Long64_t num)
Creates a new TDSetElement from from base packet starting from the first entry with num entries.