#include "TEnv.h"
#include "TEventIter.h"
#include "TFriendElement.h"
#include "TCollection.h"
#include "TDSet.h"
#include "TFile.h"
#include "TKey.h"
#include "TProofDebug.h"
#include "TSelector.h"
#include "TTimeStamp.h"
#include "TTree.h"
#include "TTreeCache.h"
#include "TTreeCacheUnzip.h"
#include "TVirtualPerfStats.h"
#include "TEventList.h"
#include "TEntryList.h"
#include "TList.h"
#include "TMap.h"
#include "TObjString.h"
#include "TRegexp.h"
#include "TProofServ.h"
#include "TError.h"
#if defined(R__MACOSX)
#include "fcntl.h"
#endif
ClassImp(TEventIter)
TEventIter::TEventIter()
{
fDSet = 0;
fElem = 0;
fFile = 0;
fDir = 0;
fSel = 0;
fFirst = 0;
fCur = -1;
fNum = 0;
fStop = kFALSE;
fOldBytesRead = 0;
fEventList = 0;
fEventListPos = 0;
fEntryList = 0;
fEntryListPos = 0;
fElemFirst = 0;
fElemNum = 0;
fElemCur = -1;
}
TEventIter::TEventIter(TDSet *dset, TSelector *sel, Long64_t first, Long64_t num)
: fDSet(dset), fSel(sel)
{
fElem = 0;
fFile = 0;
fDir = 0;
fFirst = first;
fCur = -1;
fNum = num;
fStop = kFALSE;
fEventList = 0;
fEventListPos = 0;
fEntryList = 0;
fEntryListPos = 0;
fOldBytesRead = 0;
fElemFirst = 0;
fElemNum = 0;
fElemCur = -1;
}
TEventIter::~TEventIter()
{
delete fFile;
}
void TEventIter::InvalidatePacket()
{
if (fElem) fElem->SetBit(TDSetElement::kCorrupted);
}
void TEventIter::StopProcess(Bool_t )
{
fStop = kTRUE;
}
TEventIter *TEventIter::Create(TDSet *dset, TSelector *sel, Long64_t first, Long64_t num)
{
if (dset->TestBit(TDSet::kEmpty)) {
return new TEventIterUnit(dset, sel, num);
} else if (dset->IsTree()) {
return new TEventIterTree(dset, sel, first, num);
} else {
return new TEventIterObj(dset, sel, first, num);
}
}
Int_t TEventIter::LoadDir()
{
Int_t ret = 0;
if ( fFile == 0 || fFilename != fElem->GetFileName() ) {
fDir = 0;
delete fFile; fFile = 0;
fFilename = fElem->GetFileName();
TDirectory *dirsave = gDirectory;
Double_t start = 0;
if (gPerfStats) start = TTimeStamp();
TFile::EFileType typ = TFile::kDefault;
TString fname = gEnv->GetValue("Path.Localroot","");
if (!fname.IsNull())
typ = TFile::GetType(fFilename, "", &fname);
if (typ != TFile::kLocal)
fname = fFilename;
fFile = TFile::Open(fname);
if (gPerfStats) {
gPerfStats->FileOpenEvent(fFile, fFilename, start);
fOldBytesRead = 0;
}
if (dirsave) dirsave->cd();
if (!fFile || fFile->IsZombie() ) {
if (fFile)
Error("Process","Cannot open file: %s (%s)",
fFilename.Data(), strerror(fFile->GetErrno()) );
else
Error("Process","Cannot open file: %s (errno unavailable)",
fFilename.Data());
return -1;
}
PDB(kLoop,2) Info("LoadDir","Opening file: %s", fFilename.Data() );
ret = 1;
}
if ( fDir == 0 || fPath != fElem->GetDirectory() ) {
TDirectory *dirsave = gDirectory;
fPath = fElem->GetDirectory();
if ( !fFile->cd(fPath) ) {
Error("Process","Cannot cd to: %s",
fPath.Data() );
return -1;
}
PDB(kLoop,2) Info("Process","Cd to: %s", fPath.Data() );
fDir = gDirectory;
if (dirsave) dirsave->cd();
ret = 1;
}
return ret;
}
ClassImp(TEventIterUnit)
TEventIterUnit::TEventIterUnit()
{
fDSet = 0;
fElem = 0;
fSel = 0;
fNum = 0;
fCurrent = 0;
fStop = kFALSE;
fOldBytesRead = 0;
}
TEventIterUnit::TEventIterUnit(TDSet* dset, TSelector *sel, Long64_t num)
{
fDSet = dset;
fElem = 0;
fSel = sel;
fNum = num;
fCurrent = 0;
fStop = kFALSE;
fOldBytesRead = 0;
}
Long64_t TEventIterUnit::GetNextEvent()
{
if (fStop || fNum == 0)
return -1;
if (fElem) fElem->ResetBit(TDSetElement::kNewPacket);
while (fElem == 0 || fCurrent == 0) {
if (gPerfStats) {
Long64_t totBytesWritten = TFile::GetFileBytesWritten();
Long64_t bytesWritten = totBytesWritten - fOldBytesRead;
PDB(kLoop, 2) Info("GetNextEvent", "bytes written: %lld", bytesWritten);
gPerfStats->SetBytesRead(bytesWritten);
fOldBytesRead = totBytesWritten;
}
SafeDelete(fElem);
if (!(fElem = fDSet->Next()))
return -1;
fElem->SetBit(TDSetElement::kNewPacket);
if (!fElem->TestBit(TDSetElement::kEmpty)) {
Error("GetNextEvent", "data element must be set to kEmtpy");
return -1;
}
fNum = fElem->GetNum();
if (!(fCurrent = fNum)) {
fNum = 0;
return -1;
}
fFirst = fElem->GetFirst();
}
Long64_t event = fNum - fCurrent + fFirst ;
--fCurrent;
return event;
}
ClassImp(TEventIterObj)
TEventIterObj::TEventIterObj()
{
fKeys = 0;
fNextKey = 0;
fObj = 0;
}
TEventIterObj::TEventIterObj(TDSet *dset, TSelector *sel, Long64_t first, Long64_t num)
: TEventIter(dset,sel,first,num)
{
fClassName = dset->GetType();
fKeys = 0;
fNextKey = 0;
fObj = 0;
}
TEventIterObj::~TEventIterObj()
{
delete fNextKey;
delete fObj;
}
Long64_t TEventIterObj::GetNextEvent()
{
if (fStop || fNum == 0) return -1;
if (fElem) fElem->ResetBit(TDSetElement::kNewPacket);
while ( fElem == 0 || fElemNum == 0 || fCur < fFirst-1 ) {
if (gPerfStats && fFile) {
Long64_t bytesRead = fFile->GetBytesRead();
gPerfStats->SetBytesRead(bytesRead - fOldBytesRead);
fOldBytesRead = bytesRead;
}
SafeDelete(fElem);
fElem = fDSet->Next(fKeys->GetSize());
if (fElem && fElem->GetEntryList()) {
Error("GetNextEvent", "Entry- or event-list not available");
return -1;
}
if ( fElem == 0 ) {
fNum = 0;
return -1;
}
fElem->SetBit(TDSetElement::kNewPacket);
Int_t r = LoadDir();
if ( r == -1 ) {
fNum = 0;
return -1;
} else if ( r == 1 ) {
fKeys = fDir->GetListOfKeys();
fNextKey = new TIter(fKeys);
}
fElemFirst = fElem->GetFirst();
fElemNum = fElem->GetNum();
fEntryList = dynamic_cast<TEntryList *>(fElem->GetEntryList());
fEventList = (fEntryList) ? (TEventList *)0
: dynamic_cast<TEventList *>(fElem->GetEntryList());
fEventListPos = 0;
if (fEventList)
fElemNum = fEventList->GetN();
Long64_t num = fKeys->GetSize();
if ( fElemFirst > num ) {
Error("GetNextEvent","First (%lld) higher then number of keys (%lld) in %s",
fElemFirst, num, fElem->GetName());
fNum = 0;
return -1;
}
if ( fElemNum == -1 ) {
fElemNum = num - fElemFirst;
} else if ( fElemFirst+fElemNum > num ) {
Error("GetNextEvent","Num (%lld) + First (%lld) larger then number of keys (%lld) in %s",
fElemNum, fElemFirst, num, fElem->GetDirectory());
fElemNum = num - fElemFirst;
}
if ( fCur + fElemNum < fFirst ) {
fCur += fElemNum;
continue;
}
fNextKey->Reset();
for(fElemCur = -1; fElemCur < fElemFirst-1 ; fElemCur++, fNextKey->Next()) { }
}
--fElemNum;
++fElemCur;
--fNum;
++fCur;
TKey *key = (TKey*) fNextKey->Next();
TDirectory *dirsave = gDirectory;
fDir->cd();
fObj = key->ReadObj();
if (dirsave) dirsave->cd();
fSel->SetObject( fObj );
return fElemCur;
}
TEventIterTree::TFileTree::TFileTree(const char *name, TFile *f, Bool_t islocal)
: TNamed(name, ""), fUsed(kFALSE), fIsLocal(islocal), fFile(f)
{
fTrees = new TList;
fTrees->SetOwner();
}
TEventIterTree::TFileTree::~TFileTree()
{
fFile->SetCacheRead(0);
SafeDelete(fTrees);
SafeDelete(fFile);
}
ClassImp(TEventIterTree)
TEventIterTree::TEventIterTree()
{
fTree = 0;
fTreeCache = 0;
fUseTreeCache = 1;
fCacheSize = -1;
fTreeCacheIsLearning = kTRUE;
fUseParallelUnzip = 0;
fDontCacheFiles = kFALSE;
}
TEventIterTree::TEventIterTree(TDSet *dset, TSelector *sel, Long64_t first, Long64_t num)
: TEventIter(dset,sel,first,num)
{
fTreeName = dset->GetObjName();
fTree = 0;
fTreeCache = 0;
fTreeCacheIsLearning = kTRUE;
fFileTrees = new TList;
fFileTrees->SetOwner();
fUseTreeCache = gEnv->GetValue("ProofPlayer.UseTreeCache", 1);
fCacheSize = gEnv->GetValue("ProofPlayer.CacheSize", -1);
fUseParallelUnzip = gEnv->GetValue("ProofPlayer.UseParallelUnzip", 0);
if (fUseParallelUnzip) {
TTreeCacheUnzip::SetParallelUnzip(TTreeCacheUnzip::kEnable);
} else {
TTreeCacheUnzip::SetParallelUnzip(TTreeCacheUnzip::kDisable);
}
fDontCacheFiles = gEnv->GetValue("ProofPlayer.DontCacheFiles", 0);
}
TEventIterTree::~TEventIterTree()
{
SafeDelete(fTreeCache);
SafeDelete(fFileTrees);
}
Long64_t TEventIterTree::GetCacheSize()
{
if (fUseTreeCache) return fCacheSize;
return -1;
}
Int_t TEventIterTree::GetLearnEntries()
{
return TTreeCache::GetLearnEntries();
}
TTree* TEventIterTree::GetTrees(TDSetElement *elem)
{
TIter nxft(fFileTrees);
TFileTree *ft = 0;
while ((ft = (TFileTree *)nxft()))
ft->fUsed = kFALSE;
Bool_t localfile = kFALSE;
TTree* main = Load(elem, localfile);
if (main && main != fTree) {
if (fUseTreeCache) {
TFile *curfile = main->GetCurrentFile();
if (!fTreeCache) {
main->SetCacheSize(fCacheSize);
fTreeCache = (TTreeCache *)curfile->GetCacheRead();
if (fCacheSize < 0) fCacheSize = main->GetCacheSize();
} else {
curfile->SetCacheRead(fTreeCache);
fTreeCache->UpdateBranches(main, kTRUE);
}
if (fTreeCache) {
fTreeCacheIsLearning = fTreeCache->IsLearning();
if (fTreeCacheIsLearning)
Info("GetTrees","the tree cache is in learning phase");
}
} else {
main->SetCacheSize(0);
}
}
Bool_t loc = kFALSE;
TList *friends = elem->GetListOfFriends();
if (friends) {
TIter nxf(friends);
TDSetElement *dse = 0;
while ((dse = (TDSetElement *) nxf())) {
TUrl uf(dse->GetName());
TString uo(uf.GetOptions()), alias;
Int_t from = kNPOS;
if ((from = uo.Index("friend_alias=")) != kNPOS) {
from += strlen("friend_alias=");
if (!uo.Tokenize(alias, from, "|"))
Warning("GetTrees", "empty 'friend_alias' found for tree friend");
uo.ReplaceAll(TString::Format("friend_alias=%s|", alias.Data()), "");
uf.SetOptions(uo);
dse->SetName(uf.GetUrl());
}
TTree *friendTree = Load(dse, loc);
if (friendTree && main) {
Bool_t addfriend = kTRUE;
TList *frnds = main->GetListOfFriends();
if (frnds) {
TIter xnxf(frnds);
TFriendElement *fe = 0;
while ((fe = (TFriendElement *) xnxf())) {
if (fe->GetTree() == friendTree) {
addfriend = kFALSE;
break;
}
}
}
if (addfriend) {
if (alias.IsNull())
main->AddFriend(friendTree);
else
main->AddFriend(friendTree, alias);
}
} else {
return 0;
}
}
}
nxft.Reset();
while ((ft = (TFileTree *)nxft())) {
if (!(ft->fUsed)) {
fFileTrees->Remove(ft);
delete ft;
} else {
}
}
return main;
}
TTree* TEventIterTree::Load(TDSetElement *e, Bool_t &localfile)
{
if (!e) {
Error("Load", "undefined element");
return (TTree *)0;
}
const char *fn = e->GetFileName();
const char *dn = e->GetDirectory();
const char *tn = e->GetObjName();
TFile *f = 0;
TString names(fn);
TString name;
Ssiz_t from = 0;
TFileTree *ft = 0;
while (names.Tokenize(name,from,"|")) {
TString key(TUrl(name).GetFileAndOptions());
if ((ft = (TFileTree *) fFileTrees->FindObject(key.Data()))) {
f = ft->fFile;
break;
}
}
if (!f) {
TFile::EFileType typ = TFile::kDefault;
TString fname = gEnv->GetValue("Path.Localroot","");
if (!fname.IsNull())
typ = TFile::GetType(fn, "", &fname);
if (typ != TFile::kLocal) {
fname = fn;
} else {
localfile = kTRUE;
}
f = TFile::Open(fname);
if (!f) {
Error("Load","file '%s' ('%s') could not be open", fn, fname.Data());
return (TTree *)0;
}
#if defined(R__MACOSX)
if (fDontCacheFiles && localfile) {
fcntl(f->GetFd(), F_NOCACHE, 1);
}
#endif
ft = new TFileTree(TUrl(f->GetName()).GetFileAndOptions(), f, localfile);
fFileTrees->Add(ft);
} else {
localfile = ft->fIsLocal;
}
if (ft && ft->fTrees->GetSize() > 0) {
TTree *t = 0;
if (!strcmp(tn, "*"))
t = (TTree *) ft->fTrees->First();
else
t = (TTree *) ft->fTrees->FindObject(tn);
if (t) {
ft->fUsed = kTRUE;
return t;
}
}
TDirectory *dd = f;
if (dn && !(dd = f->GetDirectory(dn))) {
Error("Load","Cannot get to: %s", dn);
return (TTree *)0;
}
PDB(kLoop,2)
Info("Load","got directory: %s", dn);
TString on(tn);
TString sreg(tn);
if (sreg.Length() <= 0 || sreg == "" || sreg.Contains("*")) {
if (sreg.Contains("*"))
sreg.ReplaceAll("*", ".*");
else
sreg = ".*";
TRegexp re(sreg);
if (dd->GetListOfKeys()) {
TIter nxk(dd->GetListOfKeys());
TKey *k = 0;
while ((k = (TKey *) nxk())) {
if (!strcmp(k->GetClassName(), "TTree")) {
TString kn(k->GetName());
if (kn.Index(re) != kNPOS) {
on = kn;
break;
}
}
}
}
}
TKey *key = dd->GetKey(on);
if (key == 0) {
Error("Load", "Cannot find tree \"%s\" in %s", tn, fn);
return (TTree*)0;
}
PDB(kLoop,2) Info("Load", "Reading: %s", tn);
TTree *tree = dynamic_cast<TTree*> (key->ReadObj());
dd->cd();
if (tree == 0) {
Error("Load", "Cannot <dynamic_cast> obj to tree \"%s\"", tn);
return (TTree*)0;
}
ft->fTrees->Add(tree);
ft->fUsed = kTRUE;
PDB(kLoop,2)
Info("Load","TFileTree for '%s' flagged as 'in-use' ...", ft->GetName());
return tree;
}
Long64_t TEventIterTree::GetNextEvent()
{
if (fStop || fNum == 0) return -1;
Bool_t attach = kFALSE;
Bool_t corrupted = (fElem && fElem->TestBit(TDSetElement::kCorrupted)) ? kTRUE : kFALSE;
if (fElem) fElem->ResetBit(TDSetElement::kNewPacket);
while ( fElem == 0 || fElemNum == 0 || fCur < fFirst-1 || corrupted) {
if (gPerfStats && fTree) {
Long64_t totBytesRead = fTree->GetCurrentFile()->GetBytesRead();
Long64_t bytesRead = totBytesRead - fOldBytesRead;
gPerfStats->SetBytesRead(bytesRead);
fOldBytesRead = totBytesRead;
}
Long64_t rest = -1;
if (fElem) {
rest = fElem->GetNum();
if (fElemCur >= 0) rest -= (fElemCur + 1 - fElemFirst);
}
SafeDelete(fElem);
while (!fElem) {
if (corrupted) {
fElem = fDSet->Next(rest);
} else if (fTree) {
fElem = fDSet->Next(fTree->GetEntries());
} else {
fElem = fDSet->Next();
}
if (!fElem) {
fNum = 0;
return -1;
}
corrupted = kFALSE;
fElem->SetBit(TDSetElement::kNewPacket);
fElem->ResetBit(TDSetElement::kCorrupted);
TTree *newTree = GetTrees(fElem);
if (newTree) {
if (newTree != fTree) {
fTree = newTree;
attach = kTRUE;
fOldBytesRead = fTree->GetCurrentFile()->GetBytesRead();
}
if (fTreeCache)
fTreeCache->SetEntryRange(fElem->GetFirst(),
fElem->GetFirst() + fElem->GetNum() - 1);
} else {
SafeDelete(fElem);
fTree = 0;
}
}
fElemFirst = fElem->GetFirst();
fElemNum = fElem->GetNum();
fEntryList = dynamic_cast<TEntryList *>(fElem->GetEntryList());
fEventList = (fEntryList) ? (TEventList *)0
: dynamic_cast<TEventList *>(fElem->GetEntryList());
fEntryListPos = fElemFirst;
fEventListPos = 0;
if (fEntryList)
fElemNum = fEntryList->GetEntriesToProcess();
else if (fEventList)
fElemNum = fEventList->GetN();
Long64_t num = (Long64_t) fTree->GetEntries();
if (!fEntryList && !fEventList) {
if ( fElemFirst > num ) {
Error("GetNextEvent", "first (%lld) higher then number of entries (%lld) in %s",
fElemFirst, num, fElem->GetObjName());
fNum = 0;
return -1;
}
if ( fElemNum == -1 ) {
fElemNum = num - fElemFirst;
} else if ( fElemFirst+fElemNum > num ) {
Error("GetNextEvent", "num (%lld) + first (%lld) larger then number of entries (%lld) in %s",
fElemNum, fElemFirst, num, fElem->GetName());
fElemNum = num - fElemFirst;
}
if ( fCur + fElemNum < fFirst ) {
fCur += fElemNum;
continue;
}
fElemCur = fElemFirst-1;
}
}
if ( attach ) {
PDB(kLoop,1) Info("GetNextEvent", "call Init(%p) and Notify()",fTree);
fSel->Init(fTree);
fSel->Notify();
TIter next(fSel->GetOutputList());
TEntryList *elist=0;
while ((elist=(TEntryList*)next())){
if (elist->InheritsFrom(TEntryList::Class()))
elist->SetTree(fTree->GetName(), fElem->GetFileName());
}
if (fSel->GetAbort() == TSelector::kAbortProcess) {
return -1;
}
attach = kFALSE;
}
Long64_t rv;
if (fEntryList){
--fElemNum;
rv = fEntryList->GetEntry(fEntryListPos);
fEntryListPos++;
} else if (fEventList) {
--fElemNum;
rv = fEventList->GetEntry(fEventListPos);
fEventListPos++;
} else {
--fElemNum;
++fElemCur;
--fNum;
++fCur;
rv = fElemCur;
}
if (fTreeCache && fTreeCacheIsLearning) {
if (!(fTreeCache->IsLearning())) {
fTreeCacheIsLearning = kFALSE;
if (gProofServ) gProofServ->RestartComputeTime();
}
}
fTree->LoadTree(rv);
return rv;
}