#include "TTreeCacheUnzip.h"
#include "TChain.h"
#include "TBranch.h"
#include "TFile.h"
#include "TEventList.h"
#include "TVirtualMutex.h"
#include "TThread.h"
#include "TCondition.h"
#include "TMath.h"
#include "Bytes.h"
#include "TEnv.h"
#define THREADCNT 2
extern "C" void R__unzip(Int_t *nin, UChar_t *bufin, Int_t *lout, char *bufout, Int_t *nout);
extern "C" int R__unzip_header(Int_t *nin, UChar_t *bufin, Int_t *lout);
TTreeCacheUnzip::EParUnzipMode TTreeCacheUnzip::fgParallel = TTreeCacheUnzip::kDisable;
Double_t TTreeCacheUnzip::fgRelBuffSize = .5;
ClassImp(TTreeCacheUnzip)
TTreeCacheUnzip::TTreeCacheUnzip() : TTreeCache(),
fActiveThread(kFALSE),
fAsyncReading(kFALSE),
fCycle(0),
fLastReadPos(0),
fBlocksToGo(0),
fUnzipLen(0),
fUnzipChunks(0),
fUnzipStatus(0),
fTotalUnzipBytes(0),
fNseekMax(0),
fUnzipBufferSize(0),
fNUnzip(0),
fNFound(0),
fNStalls(0),
fNMissed(0)
{
Init();
}
TTreeCacheUnzip::TTreeCacheUnzip(TTree *tree, Int_t buffersize) : TTreeCache(tree,buffersize),
fActiveThread(kFALSE),
fAsyncReading(kFALSE),
fCycle(0),
fLastReadPos(0),
fBlocksToGo(0),
fUnzipLen(0),
fUnzipChunks(0),
fUnzipStatus(0),
fTotalUnzipBytes(0),
fNseekMax(0),
fUnzipBufferSize(0),
fNUnzip(0),
fNFound(0),
fNStalls(0),
fNMissed(0)
{
Init();
}
void TTreeCacheUnzip::Init()
{
fMutexList = new TMutex(kTRUE);
fIOMutex = new TMutex(kTRUE);
fUnzipStartCondition = new TCondition(fMutexList);
fUnzipDoneCondition = new TCondition(fMutexList);
fTotalUnzipBytes = 0;
fCompBuffer = new char[16384];
fCompBufferSize = 16384;
if (fgParallel == kDisable) {
fParallel = kFALSE;
}
else if(fgParallel == kEnable || fgParallel == kForce) {
SysInfo_t info;
gSystem->GetSysInfo(&info);
fUnzipBufferSize = Long64_t(fgRelBuffSize * GetBufferSize());
if(gDebug > 0)
Info("TTreeCacheUnzip", "Enabling Parallel Unzipping");
fParallel = kTRUE;
for (Int_t i = 0; i < 10; i++) fUnzipThread[i] = 0;
StartThreadUnzip(THREADCNT);
}
else {
Warning("TTreeCacheUnzip", "Parallel Option unknown");
}
if (gEnv->GetValue("TFile.AsyncReading", 1)) {
if (fFile && !(fFile->ReadBufferAsync(0, 0)))
fAsyncReading = kTRUE;
}
}
TTreeCacheUnzip::~TTreeCacheUnzip()
{
ResetCache();
if (IsActiveThread())
StopThreadUnzip();
delete [] fUnzipLen;
delete fUnzipStartCondition;
delete fUnzipDoneCondition;
delete fMutexList;
delete fIOMutex;
delete [] fUnzipStatus;
delete [] fUnzipChunks;
}
void TTreeCacheUnzip::AddBranch(TBranch *b, Bool_t subbranches )
{
R__LOCKGUARD(fMutexList);
TTreeCache::AddBranch(b, subbranches);
}
void TTreeCacheUnzip::AddBranch(const char *branch, Bool_t subbranches )
{
R__LOCKGUARD(fMutexList);
TTreeCache::AddBranch(branch, subbranches);
}
Bool_t TTreeCacheUnzip::FillBuffer()
{
{
R__LOCKGUARD(fMutexList);
fIsTransferred = kFALSE;
if (fNbranches <= 0) return kFALSE;
TTree *tree = ((TBranch*)fBranches->UncheckedAt(0))->GetTree();
Long64_t entry = tree->GetReadEntry();
if (fEntryCurrent <= entry && entry < fEntryNext) return kFALSE;
if (entry == -1) entry=0;
Long64_t autoFlush = tree->GetAutoFlush();
if (autoFlush > 0) {
Int_t averageEntrySize = tree->GetZipBytes()/tree->GetEntries();
Int_t nauto = fBufferSizeMin/(averageEntrySize*autoFlush);
if (nauto < 1) nauto = 1;
fEntryNext = entry - entry%autoFlush + nauto*autoFlush;
} else {
if (fZipBytes==0) {
fEntryNext = entry + tree->GetEntries();;
} else {
fEntryNext = entry + tree->GetEntries()*fBufferSizeMin/fZipBytes;
}
}
if (fEntryMax <= 0) fEntryMax = tree->GetEntries();
if (fEntryNext > fEntryMax) fEntryNext = fEntryMax+1;
fEntryCurrent = entry;
TEventList *elist = fOwner->GetEventList();
Long64_t chainOffset = 0;
if (elist) {
if (fOwner->IsA() ==TChain::Class()) {
TChain *chain = (TChain*)fOwner;
Int_t t = chain->GetTreeNumber();
chainOffset = chain->GetTreeOffset()[t];
}
}
TFileCacheRead::Prefetch(0,0);
for (Int_t i=0;i<fNbranches;i++) {
TBranch *b = (TBranch*)fBranches->UncheckedAt(i);
if (b->GetDirectory()==0) continue;
if (b->GetDirectory()->GetFile() != fFile) continue;
Int_t nb = b->GetMaxBaskets();
Int_t *lbaskets = b->GetBasketBytes();
Long64_t *entries = b->GetBasketEntry();
if (!lbaskets || !entries) continue;
Int_t blistsize = b->GetListOfBaskets()->GetSize();
for (Int_t j=0;j<nb;j++) {
if (j<blistsize && b->GetListOfBaskets()->UncheckedAt(j)) continue;
Long64_t pos = b->GetBasketSeek(j);
Int_t len = lbaskets[j];
if (pos <= 0 || len <= 0) continue;
if (entries[j] >= fEntryNext) continue;
if (entries[j] < entry && (j<nb-1 && entries[j+1] <= entry)) continue;
if (elist) {
Long64_t emax = fEntryMax;
if (j<nb-1) emax = entries[j+1]-1;
if (!elist->ContainsRange(entries[j]+chainOffset,emax+chainOffset)) continue;
}
fNReadPref++;
TFileCacheRead::Prefetch(pos,len);
}
if (gDebug > 0) printf("Entry: %lld, registering baskets branch %s, fEntryNext=%lld, fNseek=%d, fNtot=%d\n",entry,((TBranch*)fBranches->UncheckedAt(i))->GetName(),fEntryNext,fNseek,fNtot);
}
ResetCache();
fIsLearning = kFALSE;
}
return kTRUE;
}
void TTreeCacheUnzip::SetEntryRange(Long64_t emin, Long64_t emax)
{
R__LOCKGUARD(fMutexList);
TTreeCache::SetEntryRange(emin, emax);
}
void TTreeCacheUnzip::StopLearningPhase()
{
R__LOCKGUARD(fMutexList);
TTreeCache::StopLearningPhase();
}
void TTreeCacheUnzip::UpdateBranches(TTree *tree, Bool_t owner)
{
R__LOCKGUARD(fMutexList);
TTreeCache::UpdateBranches(tree, owner);
}
TTreeCacheUnzip::EParUnzipMode TTreeCacheUnzip::GetParallelUnzip()
{
return fgParallel;
}
Bool_t TTreeCacheUnzip::IsParallelUnzip()
{
if (fgParallel == kEnable || fgParallel == kForce)
return kTRUE;
return kFALSE;
}
Bool_t TTreeCacheUnzip::IsActiveThread()
{
R__LOCKGUARD(fMutexList);
return fActiveThread;
}
Bool_t TTreeCacheUnzip::IsQueueEmpty()
{
R__LOCKGUARD(fMutexList);
if ( fIsLearning )
return kTRUE;
return kFALSE;
}
void TTreeCacheUnzip::WaitUnzipStartSignal()
{
fUnzipStartCondition->TimedWaitRelative(2000);
}
void TTreeCacheUnzip::SendUnzipStartSignal(Bool_t broadcast)
{
if (gDebug > 0) Info("SendSignal", " fUnzipCondition->Signal()");
if (broadcast)
fUnzipStartCondition->Broadcast();
else
fUnzipStartCondition->Signal();
}
Int_t TTreeCacheUnzip::SetParallelUnzip(TTreeCacheUnzip::EParUnzipMode option)
{
if(fgParallel == kEnable || fgParallel == kForce || fgParallel == kDisable) {
fgParallel = option;
return 1;
}
return 0;
}
struct TTreeCacheUnzipData {
TTreeCacheUnzip *inst;
Int_t cnt;
};
Int_t TTreeCacheUnzip::StartThreadUnzip(Int_t nthreads)
{
Int_t nt = nthreads;
if (nt > 10) nt = 10;
if (gDebug > 0)
Info("StartThreadUnzip", "Going to start %d threads.", nt);
for (Int_t i = 0; i < nt; i++) {
if (!fUnzipThread[i]) {
TString nm("UnzipLoop");
nm += i;
if (gDebug > 0)
Info("StartThreadUnzip", "Going to start thread '%s'", nm.Data());
TTreeCacheUnzipData *d = new TTreeCacheUnzipData;
d->inst = this;
d->cnt = i;
fUnzipThread[i] = new TThread(nm.Data(), UnzipLoop, (void*)d);
if (!fUnzipThread[i])
Error("TTreeCacheUnzip::StartThreadUnzip", " Unable to create new thread.");
fUnzipThread[i]->Run();
fActiveThread=kTRUE;
}
}
return (fActiveThread == kTRUE);
}
Int_t TTreeCacheUnzip::StopThreadUnzip()
{
fActiveThread = kFALSE;
for (Int_t i = 0; i < 1; i++) {
if(fUnzipThread[i]){
SendUnzipStartSignal(kTRUE);
if (fUnzipThread[i]->Exists()) {
fUnzipThread[i]->Join();
delete fUnzipThread[i];
}
}
}
return 1;
}
void* TTreeCacheUnzip::UnzipLoop(void *arg)
{
TTreeCacheUnzipData *d = (TTreeCacheUnzipData *)arg;
TTreeCacheUnzip *unzipMng = d->inst;
TThread::SetCancelOn();
TThread::SetCancelDeferred();
Int_t thrnum = d->cnt;
Int_t startindex = thrnum;
Int_t locbuffsz = 16384;
char *locbuff = new char[16384];
Int_t res = 0;
Int_t myCycle = 0;
while( unzipMng->IsActiveThread() ) {
res = 1;
{
R__LOCKGUARD(unzipMng->fMutexList);
if (myCycle != unzipMng->fCycle) startindex = thrnum;
myCycle = unzipMng->fCycle;
if (unzipMng->fNseek) startindex = startindex % unzipMng->fNseek;
else startindex = -1;
}
if (startindex >= 0)
res = unzipMng->UnzipCache(startindex, locbuffsz, locbuff);
{
R__LOCKGUARD(unzipMng->fMutexList);
if(!unzipMng->IsActiveThread()) break;
if ((res == 1) || (!unzipMng->fIsTransferred)) {
unzipMng->WaitUnzipStartSignal();
startindex = unzipMng->fLastReadPos+3+thrnum;
}
}
}
delete d;
delete [] locbuff;
return (void *)0;
}
Int_t TTreeCacheUnzip::GetRecordHeader(char *buf, Int_t maxbytes, Int_t &nbytes, Int_t &objlen, Int_t &keylen)
{
Version_t versionkey;
Short_t klen;
UInt_t datime;
Int_t nb = 0,olen;
Int_t nread = maxbytes;
frombuf(buf,&nb);
nbytes = nb;
if (nb < 0) return nread;
const Int_t headerSize = 16;
if (nread < headerSize) return nread;
frombuf(buf, &versionkey);
frombuf(buf, &olen);
frombuf(buf, &datime);
frombuf(buf, &klen);
if (!olen) olen = nbytes-klen;
objlen = olen;
keylen = klen;
return nread;
}
void TTreeCacheUnzip::ResetCache()
{
{
R__LOCKGUARD(fMutexList);
if (gDebug > 0)
Info("ResetCache", "Thread: %ld -- Resetting the cache. fNseek:%d fNSeekMax:%d fTotalUnzipBytes:%lld", TThread::SelfId(), fNseek, fNseekMax, fTotalUnzipBytes);
fCycle++;
for (Int_t i = 0; i < fNseekMax; i++) {
if (fUnzipLen) fUnzipLen[i] = 0;
if (fUnzipChunks) {
if (fUnzipChunks[i]) delete [] fUnzipChunks[i];
fUnzipChunks[i] = 0;
}
if (fUnzipStatus) fUnzipStatus[i] = 0;
}
while (fActiveBlks.size()) fActiveBlks.pop();
if(fNseekMax < fNseek){
if (gDebug > 0)
Info("ResetCache", "Changing fNseekMax from:%d to:%d", fNseekMax, fNseek);
Byte_t *aUnzipStatus = new Byte_t[fNseek];
memset(aUnzipStatus, 0, fNseek*sizeof(Byte_t));
Int_t *aUnzipLen = new Int_t[fNseek];
memset(aUnzipLen, 0, fNseek*sizeof(Int_t));
char **aUnzipChunks = new char *[fNseek];
memset(aUnzipChunks, 0, fNseek*sizeof(char *));
if (fUnzipStatus) delete [] fUnzipStatus;
if (fUnzipLen) delete [] fUnzipLen;
if (fUnzipChunks) delete [] fUnzipChunks;
fUnzipStatus = aUnzipStatus;
fUnzipLen = aUnzipLen;
fUnzipChunks = aUnzipChunks;
fNseekMax = fNseek;
}
fLastReadPos = 0;
fTotalUnzipBytes = 0;
fBlocksToGo = fNseek;
}
SendUnzipStartSignal(kTRUE);
}
Int_t TTreeCacheUnzip::GetUnzipBuffer(char **buf, Long64_t pos, Int_t len, Bool_t *free)
{
Int_t res = 0;
Int_t loc = -1;
{
R__LOCKGUARD(fMutexList);
Int_t myCycle = fCycle;
if (fParallel && !fIsLearning) {
if(fNseekMax < fNseek){
if (gDebug > 0)
Info("GetUnzipBuffer", "Changing fNseekMax from:%d to:%d", fNseekMax, fNseek);
Byte_t *aUnzipStatus = new Byte_t[fNseek];
memset(aUnzipStatus, 0, fNseek*sizeof(Byte_t));
Int_t *aUnzipLen = new Int_t[fNseek];
memset(aUnzipLen, 0, fNseek*sizeof(Int_t));
char **aUnzipChunks = new char *[fNseek];
memset(aUnzipChunks, 0, fNseek*sizeof(char *));
for (Int_t i = 0; i < fNseekMax; i++) {
aUnzipStatus[i] = fUnzipStatus[i];
aUnzipLen[i] = fUnzipLen[i];
aUnzipChunks[i] = fUnzipChunks[i];
}
if (fUnzipStatus) delete [] fUnzipStatus;
if (fUnzipLen) delete [] fUnzipLen;
if (fUnzipChunks) delete [] fUnzipChunks;
fUnzipStatus = aUnzipStatus;
fUnzipLen = aUnzipLen;
fUnzipChunks = aUnzipChunks;
fNseekMax = fNseek;
}
loc = (Int_t)TMath::BinarySearch(fNseek,fSeekSort,pos);
if ( (fCycle == myCycle) && (loc >= 0) && (loc < fNseek) && (pos == fSeekSort[loc]) ) {
Int_t seekidx = fSeekIndex[loc];
fLastReadPos = seekidx;
do {
if ((fUnzipStatus[seekidx] == 2) && (fUnzipChunks[seekidx]) && (fUnzipLen[seekidx] > 0)) {
if(!(*buf)) {
*buf = fUnzipChunks[seekidx];
fUnzipChunks[seekidx] = 0;
fTotalUnzipBytes -= fUnzipLen[seekidx];
SendUnzipStartSignal(kFALSE);
*free = kTRUE;
}
else {
memcpy(*buf, fUnzipChunks[seekidx], fUnzipLen[seekidx]);
delete fUnzipChunks[seekidx];
fTotalUnzipBytes -= fUnzipLen[seekidx];
fUnzipChunks[seekidx] = 0;
SendUnzipStartSignal(kFALSE);
*free = kFALSE;
}
fNFound++;
return fUnzipLen[seekidx];
}
if ( fUnzipStatus[seekidx] == 1 ) {
fUnzipDoneCondition->TimedWaitRelative(200);
if ( myCycle != fCycle ) {
if (gDebug > 0)
Info("GetUnzipBuffer", "Sudden paging Break!!! IsActiveThread(): %d, fNseek: %d, fIsLearning:%d",
IsActiveThread(), fNseek, fIsLearning);
fLastReadPos = 0;
seekidx = -1;
break;
}
}
} while ( fUnzipStatus[seekidx] == 1 );
if ( (seekidx >= 0) && (fUnzipStatus[seekidx] == 2) && (fUnzipChunks[seekidx]) && (fUnzipLen[seekidx] > 0) ) {
if(!(*buf)) {
*buf = fUnzipChunks[seekidx];
fUnzipChunks[seekidx] = 0;
fTotalUnzipBytes -= fUnzipLen[seekidx];
SendUnzipStartSignal(kFALSE);
*free = kTRUE;
}
else {
memcpy(*buf, fUnzipChunks[seekidx], fUnzipLen[seekidx]);
delete fUnzipChunks[seekidx];
fTotalUnzipBytes -= fUnzipLen[seekidx];
fUnzipChunks[seekidx] = 0;
SendUnzipStartSignal(kFALSE);
*free = kFALSE;
}
fNStalls++;
return fUnzipLen[seekidx];
}
else {
fUnzipStatus[seekidx] = 2;
fUnzipChunks[seekidx] = 0;
if ((fTotalUnzipBytes < fUnzipBufferSize) && fBlocksToGo)
SendUnzipStartSignal(kFALSE);
}
} else {
loc = -1;
fIsTransferred = kFALSE;
}
} else {
}
}
if (len > fCompBufferSize) {
delete [] fCompBuffer;
fCompBuffer = new char[len];
fCompBufferSize = len;
} else {
if (fCompBufferSize > len*4) {
delete [] fCompBuffer;
fCompBuffer = new char[len*2];
fCompBufferSize = len*2;
}
}
{
R__LOCKGUARD(fIOMutex);
res = 0;
if (!ReadBufferExt(fCompBuffer, pos, len, loc)) {
fFile->Seek(pos);
res = fFile->ReadBuffer(fCompBuffer, len);
}
if (res) res = -1;
}
if (!res) {
res = UnzipBuffer(buf, fCompBuffer);
*free = kTRUE;
}
if (!fIsLearning) {
fNMissed++;
}
return res;
}
void TTreeCacheUnzip::SetUnzipRelBufferSize(Float_t relbufferSize)
{
fgRelBuffSize = relbufferSize;
}
void TTreeCacheUnzip::SetUnzipBufferSize(Long64_t bufferSize)
{
R__LOCKGUARD(fMutexList);
fUnzipBufferSize = bufferSize;
}
Int_t TTreeCacheUnzip::UnzipBuffer(char **dest, char *src)
{
Int_t uzlen = 0;
Bool_t alloc = kFALSE;
const Int_t hlen=128;
Int_t nbytes=0, objlen=0, keylen=0;
GetRecordHeader(src, hlen, nbytes, objlen, keylen);
if (!(*dest)) {
UChar_t *bufcur = (UChar_t *) (src + keylen);
Int_t nin, nbuf;
if(R__unzip_header(&nin, bufcur, &nbuf)!=0) {
Error("UnzipBuffer", "Inconsistency found in header (nin=%d, nbuf=%d)", nin, nbuf);
uzlen = -1;
return uzlen;
}
Int_t l = keylen+objlen;
*dest = new char[l];
alloc = kTRUE;
}
Bool_t oldCase = objlen==nbytes-keylen
&& ((TBranch*)fBranches->UncheckedAt(0))->GetCompressionLevel()!=0
&& fFile->GetVersion()<=30401;
if (objlen > nbytes-keylen || oldCase) {
memcpy(*dest, src, keylen);
uzlen += keylen;
char *objbuf = *dest + keylen;
UChar_t *bufcur = (UChar_t *) (src + keylen);
Int_t nin, nbuf;
Int_t nout = 0;
Int_t noutot = 0;
while (1) {
Int_t hc = R__unzip_header(&nin, bufcur, &nbuf);
if (hc!=0) break;
if (gDebug > 2)
Info("UnzipBuffer", " nin:%d, nbuf:%d, bufcur[3] :%d, bufcur[4] :%d, bufcur[5] :%d ",
nin, nbuf, bufcur[3], bufcur[4], bufcur[5]);
if (oldCase && (nin > objlen || nbuf > objlen)) {
if (gDebug > 2)
Info("UnzipBuffer", "oldcase objlen :%d ", objlen);
memcpy( *dest + keylen, src + keylen, objlen);
uzlen += objlen;
return uzlen;
}
R__unzip(&nin, bufcur, &nbuf, objbuf, &nout);
if (gDebug > 2)
Info("UnzipBuffer", "R__unzip nin:%d, bufcur:%p, nbuf:%d, objbuf:%p, nout:%d",
nin, bufcur, nbuf, objbuf, nout);
if (!nout) break;
noutot += nout;
if (noutot >= objlen) break;
bufcur += nin;
objbuf += nout;
}
if (noutot != objlen) {
Error("UnzipBuffer", "nbytes = %d, keylen = %d, objlen = %d, noutot = %d, nout=%d, nin=%d, nbuf=%d",
nbytes,keylen,objlen, noutot,nout,nin,nbuf);
uzlen = -1;
if(alloc) delete [] *dest;
*dest = 0;
return uzlen;
}
uzlen += objlen;
} else {
memcpy(*dest, src, keylen);
uzlen += keylen;
memcpy(*dest + keylen, src + keylen, objlen);
uzlen += objlen;
}
return uzlen;
}
Int_t TTreeCacheUnzip::UnzipCache(Int_t &startindex, Int_t &locbuffsz, char *&locbuff)
{
Int_t myCycle;
const Int_t hlen=128;
Int_t objlen=0, keylen=0;
Int_t nbytes=0;
Int_t readbuf = 0;
Int_t idxtounzip = -1;
Long64_t rdoffs = 0;
Int_t rdlen = 0;
{
R__LOCKGUARD(fMutexList);
if (!IsActiveThread() || !fNseek || fIsLearning || !fIsTransferred) {
if (gDebug > 0)
Info("UnzipCache", "Sudden Break!!! IsActiveThread(): %d, fNseek: %d, fIsLearning:%d",
IsActiveThread(), fNseek, fIsLearning);
return 1;
}
myCycle = fCycle;
idxtounzip = -1;
rdoffs = 0;
rdlen = 0;
if (fTotalUnzipBytes < fUnzipBufferSize) {
if (fBlocksToGo > 0) {
for (Int_t ii=0; ii < fNseek; ii++) {
Int_t reqi = (startindex+ii) % fNseek;
if (!fUnzipStatus[reqi] && (fSeekLen[reqi] > 256) ) {
fUnzipStatus[reqi] = 1;
idxtounzip = reqi;
rdoffs = fSeek[idxtounzip];
rdlen = fSeekLen[idxtounzip];
break;
}
}
if (idxtounzip < 0) fBlocksToGo = 0;
}
}
}
if (idxtounzip < 0) {
if (gDebug > 0)
Info("UnzipCache", "Nothing to do... startindex:%d fTotalUnzipBytes:%lld fUnzipBufferSize:%lld fNseek:%d",
startindex, fTotalUnzipBytes, fUnzipBufferSize, fNseek );
return 1;
}
startindex = idxtounzip+THREADCNT;
if (!IsActiveThread() || !fNseek || fIsLearning ) {
if (gDebug > 0)
Info("UnzipCache", "Sudden Break!!! IsActiveThread(): %d, fNseek: %d, fIsLearning:%d",
IsActiveThread(), fNseek, fIsLearning);
return 1;
}
Int_t loc = -1;
if(locbuffsz < rdlen) {
if (locbuff) delete [] locbuff;
locbuffsz = rdlen;
locbuff = new char[locbuffsz];
} else
if(locbuffsz > rdlen*3) {
if (locbuff) delete [] locbuff;
locbuffsz = rdlen*2;
locbuff = new char[locbuffsz];
}
if (gDebug > 0)
Info("UnzipCache", "Going to unzip block %d", idxtounzip);
readbuf = ReadBufferExt(locbuff, rdoffs, rdlen, loc);
{
R__LOCKGUARD(fMutexList);
if ( (myCycle != fCycle) || !fIsTransferred ) {
if (gDebug > 0)
Info("UnzipCache", "Sudden paging Break!!! IsActiveThread(): %d, fNseek: %d, fIsLearning:%d",
IsActiveThread(), fNseek, fIsLearning);
fUnzipStatus[idxtounzip] = 2;
fUnzipChunks[idxtounzip] = 0;
fUnzipLen[idxtounzip] = 0;
fUnzipDoneCondition->Signal();
startindex = 0;
return 1;
}
if (readbuf <= 0) {
fUnzipStatus[idxtounzip] = 2;
fUnzipChunks[idxtounzip] = 0;
fUnzipLen[idxtounzip] = 0;
if (gDebug > 0)
Info("UnzipCache", "Block %d not done. rdoffs=%lld rdlen=%d readbuf=%d", idxtounzip, rdoffs, rdlen, readbuf);
return -1;
}
GetRecordHeader(locbuff, hlen, nbytes, objlen, keylen);
Int_t len = (objlen > nbytes-keylen)? keylen+objlen : nbytes;
if (len > 4*fUnzipBufferSize) {
Info("UnzipCache", "Block %d is too big, skipping.", idxtounzip);
fUnzipStatus[idxtounzip] = 2;
fUnzipChunks[idxtounzip] = 0;
fUnzipLen[idxtounzip] = 0;
fUnzipDoneCondition->Signal();
return 0;
}
}
char *ptr = 0;
Int_t loclen = 0;
loclen = UnzipBuffer(&ptr, locbuff);
if ((loclen > 0) && (loclen == objlen+keylen)) {
R__LOCKGUARD(fMutexList);
if ( (myCycle != fCycle) || !fIsTransferred) {
if (gDebug > 0)
Info("UnzipCache", "Sudden paging Break!!! IsActiveThread(): %d, fNseek: %d, fIsLearning:%d",
IsActiveThread(), fNseek, fIsLearning);
delete [] ptr;
fUnzipStatus[idxtounzip] = 2;
fUnzipChunks[idxtounzip] = 0;
fUnzipLen[idxtounzip] = 0;
startindex = 0;
fUnzipDoneCondition->Signal();
return 1;
}
fUnzipStatus[idxtounzip] = 2;
fUnzipChunks[idxtounzip] = ptr;
fUnzipLen[idxtounzip] = loclen;
fTotalUnzipBytes += loclen;
fActiveBlks.push(idxtounzip);
if (gDebug > 0)
Info("UnzipCache", "reqi:%d, rdoffs:%lld, rdlen: %d, loclen:%d",
idxtounzip, rdoffs, rdlen, loclen);
fNUnzip++;
}
else {
R__LOCKGUARD(fMutexList);
Info("argh", "loclen:%d objlen:%d loc:%d readbuf:%d", loclen, objlen, loc, readbuf);
fUnzipStatus[idxtounzip] = 2;
fUnzipChunks[idxtounzip] = 0;
fUnzipLen[idxtounzip] = 0;
}
fUnzipDoneCondition->Signal();
delete [] ptr;
return 0;
}
void TTreeCacheUnzip::Print(Option_t* option) const {
printf("******TreeCacheUnzip statistics for file: %s ******\n",fFile->GetName());
printf("Max allowed mem for pending buffers: %lld\n", fUnzipBufferSize);
printf("Number of blocks unzipped by threads: %d\n", fNUnzip);
printf("Number of hits: %d\n", fNFound);
printf("Number of stalls: %d\n", fNStalls);
printf("Number of misses: %d\n", fNMissed);
TTreeCache::Print(option);
}
Int_t TTreeCacheUnzip::ReadBufferExt(char *buf, Long64_t pos, Int_t len, Int_t &loc) {
R__LOCKGUARD(fIOMutex);
return TTreeCache::ReadBufferExt(buf, pos, len, loc);
}