#include "TEnv.h"
#include "TFile.h"
#include "TFileCacheRead.h"
#include "TFileCacheWrite.h"
#include "TFilePrefetch.h"
#include "TMath.h"
ClassImp(TFileCacheRead)
TFileCacheRead::TFileCacheRead() : TObject()
{
fBufferSizeMin = 0;
fBufferSize = 0;
fBufferLen = 0;
fBytesRead = 0;
fNoCacheBytesRead = 0;
fBytesReadExtra = 0;
fReadCalls = 0;
fNoCacheReadCalls = 0;
fNseek = 0;
fNtot = 0;
fNb = 0;
fSeekSize = 0;
fSeek = 0;
fSeekIndex = 0;
fSeekSort = 0;
fPos = 0;
fSeekLen = 0;
fSeekSortLen = 0;
fSeekPos = 0;
fLen = 0;
fFile = 0;
fBuffer = 0;
fIsSorted = kFALSE;
fIsTransferred = kFALSE;
fBNseek = 0;
fBNtot = 0;
fBNb = 0;
fBSeekSize = 0;
fBSeek = 0;
fBSeekSort = 0;
fBSeekIndex = 0;
fBPos = 0;
fBSeekLen = 0;
fBSeekSortLen = 0;
fBSeekPos = 0;
fBLen = 0;
fBIsSorted = kFALSE;
fBIsTransferred=kFALSE;
fAsyncReading = kFALSE;
fEnablePrefetching = kFALSE;
fPrefetch = 0;
fPrefetchedBlocks= 0;
}
TFileCacheRead::TFileCacheRead(TFile *file, Int_t buffersize, TObject *tree)
: TObject()
{
if (buffersize <=10000) fBufferSize = 100000;
else fBufferSize = buffersize;
fBufferSizeMin = fBufferSize;
fBufferLen = 0;
fBytesRead = 0;
fNoCacheBytesRead = 0;
fBytesReadExtra = 0;
fReadCalls = 0;
fNoCacheReadCalls = 0;
fNseek = 0;
fNtot = 0;
fNb = 0;
fSeekSize = 10000;
fSeek = new Long64_t[fSeekSize];
fSeekIndex = new Int_t[fSeekSize];
fSeekSort = new Long64_t[fSeekSize];
fPos = new Long64_t[fSeekSize];
fSeekLen = new Int_t[fSeekSize];
fSeekSortLen = new Int_t[fSeekSize];
fSeekPos = new Int_t[fSeekSize];
fLen = new Int_t[fSeekSize];
fFile = file;
fBNseek = 0;
fBNtot = 0;
fBNb = 0;
fBSeekSize = 10000;
fBSeek = new Long64_t[fBSeekSize];
fBSeekIndex = new Int_t[fBSeekSize];
fBSeekSort = new Long64_t[fBSeekSize];
fBPos = new Long64_t[fBSeekSize];
fBSeekLen = new Int_t[fBSeekSize];
fBSeekSortLen = new Int_t[fBSeekSize];
fBSeekPos = new Int_t[fBSeekSize];
fBLen = new Int_t[fBSeekSize];
fBuffer = 0;
fPrefetch = 0;
fPrefetchedBlocks = 0;
fEnablePrefetching = gEnv->GetValue("TFile.AsyncPrefetching", 0);
if (fEnablePrefetching && strcmp(file->GetEndpointUrl()->GetProtocol(), "file")){
SetEnablePrefetchingImpl(true);
}
else {
SetEnablePrefetchingImpl(false);
}
fIsSorted = kFALSE;
fIsTransferred = kFALSE;
fBIsSorted = kFALSE;
fBIsTransferred = kFALSE;
if (file) file->SetCacheRead(this, tree);
}
TFileCacheRead::~TFileCacheRead()
{
SafeDelete(fPrefetch);
delete [] fSeek;
delete [] fSeekIndex;
delete [] fSeekSort;
delete [] fPos;
delete [] fSeekLen;
delete [] fSeekSortLen;
delete [] fSeekPos;
delete [] fLen;
if (fBuffer)
delete [] fBuffer;
delete [] fBSeek;
delete [] fBSeekIndex;
delete [] fBSeekSort;
delete [] fBPos;
delete [] fBSeekLen;
delete [] fBSeekSortLen;
delete [] fBSeekPos;
delete [] fBLen;
}
void TFileCacheRead::Close(Option_t * )
{
if (fPrefetch) {
delete fPrefetch;
fPrefetch = 0;
}
}
void TFileCacheRead::Prefetch(Long64_t pos, Int_t len)
{
fIsSorted = kFALSE;
fIsTransferred = kFALSE;
if (pos <= 0) {
fNseek = 0;
fNtot = 0;
return;
}
if (fNseek >= fSeekSize) {
fSeekSize *= 2;
Long64_t *aSeek = new Long64_t[fSeekSize];
Int_t *aSeekIndex = new Int_t[fSeekSize];
Long64_t *aSeekSort = new Long64_t[fSeekSize];
Long64_t *aPos = new Long64_t[fSeekSize];
Int_t *aSeekLen = new Int_t[fSeekSize];
Int_t *aSeekSortLen = new Int_t[fSeekSize];
Int_t *aSeekPos = new Int_t[fSeekSize];
Int_t *aLen = new Int_t[fSeekSize];
for (Int_t i=0;i<fNseek;i++) {
aSeek[i] = fSeek[i];
aSeekIndex[i] = fSeekIndex[i];
aSeekSort[i] = fSeekSort[i];
aPos[i] = fPos[i];
aSeekLen[i] = fSeekLen[i];
aSeekSortLen[i] = fSeekSortLen[i];
aSeekPos[i] = fSeekPos[i];
aLen[i] = fLen[i];
}
delete [] fSeek;
delete [] fSeekIndex;
delete [] fSeekSort;
delete [] fPos;
delete [] fSeekLen;
delete [] fSeekSortLen;
delete [] fSeekPos;
delete [] fLen;
fSeek = aSeek;
fSeekIndex = aSeekIndex;
fSeekSort = aSeekSort;
fPos = aPos;
fSeekLen = aSeekLen;
fSeekSortLen = aSeekSortLen;
fSeekPos = aSeekPos;
fLen = aLen;
}
fSeek[fNseek] = pos;
fSeekLen[fNseek] = len;
fNseek++;
fNtot += len;
}
void TFileCacheRead::SecondPrefetch(Long64_t pos, Int_t len){
fBIsSorted = kFALSE;
if (pos <= 0) {
fBNseek = 0;
fBNtot = 0;
return;
}
if (fBNseek >= fBSeekSize) {
fBSeekSize *= 2;
Long64_t *aSeek = new Long64_t[fBSeekSize];
Int_t *aSeekIndex = new Int_t[fBSeekSize];
Long64_t *aSeekSort = new Long64_t[fBSeekSize];
Long64_t *aPos = new Long64_t[fBSeekSize];
Int_t *aSeekLen = new Int_t[fBSeekSize];
Int_t *aSeekSortLen = new Int_t[fBSeekSize];
Int_t *aSeekPos = new Int_t[fBSeekSize];
Int_t *aLen = new Int_t[fBSeekSize];
for (Int_t i=0;i<fBNseek;i++) {
aSeek[i] = fBSeek[i];
aSeekIndex[i] = fBSeekIndex[i];
aSeekSort[i] = fBSeekSort[i];
aPos[i] = fBPos[i];
aSeekLen[i] = fBSeekLen[i];
aSeekSortLen[i] = fBSeekSortLen[i];
aSeekPos[i] = fBSeekPos[i];
aLen[i] = fBLen[i];
}
delete [] fBSeek;
delete [] fBSeekIndex;
delete [] fBSeekSort;
delete [] fBPos;
delete [] fBSeekLen;
delete [] fBSeekSortLen;
delete [] fBSeekPos;
delete [] fBLen;
fBSeek = aSeek;
fBSeekIndex = aSeekIndex;
fBSeekSort = aSeekSort;
fBPos = aPos;
fBSeekLen = aSeekLen;
fBSeekSortLen = aSeekSortLen;
fBSeekPos = aSeekPos;
fBLen = aLen;
}
fBSeek[fBNseek] = pos;
fBSeekLen[fBNseek] = len;
fBNseek++;
fBNtot += len;
}
void TFileCacheRead::Print(Option_t *option) const
{
TString opt = option;
opt.ToLower();
printf("Cached Reading.....................: %lld bytes in %d transactions\n",this->GetBytesRead(), this->GetReadCalls());
printf("Reading............................: %lld bytes in %d uncached transactions\n",this->GetNoCacheBytesRead(), this->GetNoCacheReadCalls());
printf("Readahead..........................: %d bytes with overhead = %lld bytes\n",TFile::GetReadaheadSize(),this->GetBytesReadExtra());
printf("Average transaction................: %f Kbytes\n",0.001*Double_t(this->GetBytesRead())/Double_t(this->GetReadCalls()));
printf("Number of blocks in current cache..: %d, total size: %d\n",fNseek,fNtot);
if (fPrefetch){
printf("Prefetching .......................: %lli blocks\n", fPrefetchedBlocks);
printf("Prefetching Wait Time..............: %f seconds\n", fPrefetch->GetWaitTime() / 1e+6);
}
if (!opt.Contains("a")) return;
for (Int_t i=0;i<fNseek;i++) {
if (fIsSorted && !opt.Contains("s")) {
printf("block: %5d, from: %lld to %lld, len = %d bytes\n",i,fSeekSort[i],fSeekSort[i]+fSeekSortLen[i],fSeekSortLen[i]);
} else {
printf("block: %5d, from: %lld to %lld, len = %d bytes\n",i,fSeek[i],fSeek[i]+fSeekLen[i],fSeekLen[i]);
}
}
printf ("Number of long buffers = %d\n",fNb);
for (Int_t j=0;j<fNb;j++) {
printf("fPos[%d] = %lld, fLen = %d\n",j,fPos[j],fLen[j]);
}
}
Int_t TFileCacheRead::ReadBuffer(char *buf, Long64_t pos, Int_t len)
{
Long64_t fileBytesRead0 = fFile->GetBytesRead();
Long64_t fileBytesReadExtra0 = fFile->GetBytesReadExtra();
Int_t fileReadCalls0 = fFile->GetReadCalls();
Int_t loc = -1;
Int_t rc = ReadBufferExt(buf, pos, len, loc);
fBytesRead += fFile->GetBytesRead() - fileBytesRead0;
fBytesReadExtra += fFile->GetBytesReadExtra() - fileBytesReadExtra0;
fReadCalls += fFile->GetReadCalls() - fileReadCalls0;
return rc;
}
Int_t TFileCacheRead::ReadBufferExt(char *buf, Long64_t pos, Int_t len, Int_t &loc)
{
if (fEnablePrefetching)
return ReadBufferExtPrefetch(buf, pos, len, loc);
else
return ReadBufferExtNormal(buf, pos, len, loc);
}
Int_t TFileCacheRead::ReadBufferExtPrefetch(char *buf, Long64_t pos, Int_t len, Int_t &loc)
{
if (fNseek > 0 && !fIsSorted) {
Sort();
loc = -1;
fPrefetch->ReadBlock(fPos, fLen, fNb);
fPrefetchedBlocks++;
fIsTransferred = kTRUE;
}
if (fBNseek > 0 && !fBIsSorted) {
SecondSort();
loc = -1;
fPrefetch->ReadBlock(fBPos, fBLen, fBNb);
fPrefetchedBlocks++;
}
if (TFileCacheWrite *cachew = fFile->GetCacheWrite()) {
if (cachew->ReadBuffer(buf,pos,len) == 0) {
fFile->SetOffset(pos+len);
return 1;
}
}
if (loc < 0) {
loc = (Int_t)TMath::BinarySearch(fNseek,fSeekSort,pos);
}
if (loc >= 0 && loc < fNseek && pos == fSeekSort[loc]) {
if (buf && fPrefetch){
fPrefetch->ReadBuffer(buf, pos, len);
return 1;
}
}
else if (buf && fPrefetch){
loc = (Int_t)TMath::BinarySearch(fBNseek, fBSeekSort, pos);
if (loc >= 0 && loc < fBNseek && pos == fBSeekSort[loc]){
if (fPrefetch->ReadBuffer(buf, pos, len)) {
return 1;
}
}
}
return 0;
}
Int_t TFileCacheRead::ReadBufferExtNormal(char *buf, Long64_t pos, Int_t len, Int_t &loc)
{
if (fNseek > 0 && !fIsSorted) {
Sort();
loc = -1;
if (!fAsyncReading) {
if (fFile->ReadBuffers(fBuffer,fPos,fLen,fNb)) {
return -1;
}
fIsTransferred = kTRUE;
} else {
fFile->ReadBuffers(0, 0, 0, 0);
if (fFile->ReadBuffers(0,fPos,fLen,fNb)) {
return -1;
}
fIsTransferred = kTRUE;
}
}
if (TFileCacheWrite *cachew = fFile->GetCacheWrite()) {
if (cachew->ReadBuffer(buf,pos,len) == 0) {
fFile->SetOffset(pos+len);
return 1;
}
}
if (fAsyncReading) {
Int_t retval;
if (loc < 0)
loc = (Int_t)TMath::BinarySearch(fNseek,fSeekSort,pos);
if (loc >= 0 && loc < fNseek && pos == fSeekSort[loc]) {
if (buf) {
if (fFile->ReadBuffer(buf, pos, len)) {
return -1;
}
fFile->SetOffset(pos+len);
}
retval = 1;
} else {
retval = 0;
}
if (gDebug > 0)
Info("ReadBuffer","pos=%lld, len=%d, retval=%d, loc=%d, "
"fseekSort[loc]=%lld, fSeekLen[loc]=%d",
pos, len, retval, loc, fSeekSort[loc], fSeekLen[loc]);
return retval;
} else {
if (loc < 0)
loc = (Int_t)TMath::BinarySearch(fNseek, fSeekSort, pos);
if (loc >= 0 && loc <fNseek && pos == fSeekSort[loc]) {
if (buf) {
memcpy(buf,&fBuffer[fSeekPos[loc]],len);
fFile->SetOffset(pos+len);
}
return 1;
}
}
return 0;
}
void TFileCacheRead::SetFile(TFile *file, TFile::ECacheAction action)
{
fFile = file;
if (fAsyncReading) {
if (file && file->ReadBufferAsync(0, 0)) {
fAsyncReading = kFALSE;
fBuffer = new char[fBufferSize];
}
}
if (action == TFile::kDisconnect)
Prefetch(0,0);
if (fPrefetch) {
if (action == TFile::kDisconnect)
SecondPrefetch(0, 0);
fPrefetch->SetFile(file);
}
}
void TFileCacheRead::Sort()
{
if (!fNseek) return;
TMath::Sort(fNseek,fSeek,fSeekIndex,kFALSE);
Int_t i;
Int_t nb = 0;
Int_t effectiveNseek = 0;
for (i=0;i<fNseek;i++) {
Int_t ind = fSeekIndex[i];
if (effectiveNseek!=0 && fSeek[ind]==fSeekSort[effectiveNseek-1])
{
if (fSeekSortLen[effectiveNseek-1] < fSeekLen[ind]) {
fSeekSortLen[effectiveNseek-1] = fSeekLen[ind];
}
continue;
}
fSeekSort[effectiveNseek] = fSeek[ind];
fSeekSortLen[effectiveNseek] = fSeekLen[ind];
++effectiveNseek;
}
fNseek = effectiveNseek;
if (fNtot > fBufferSizeMin) {
fBufferSize = fNtot + 100;
delete [] fBuffer;
fBuffer = 0;
if (!fAsyncReading)
fBuffer = new char[fBufferSize];
}
fPos[0] = fSeekSort[0];
fLen[0] = fSeekSortLen[0];
fSeekPos[0] = 0;
for (i=1;i<fNseek;i++) {
fSeekPos[i] = fSeekPos[i-1] + fSeekSortLen[i-1];
if ((fSeekSort[i] != fSeekSort[i-1]+fSeekSortLen[i-1]) ||
(fLen[nb] > 16000000)) {
nb++;
fPos[nb] = fSeekSort[i];
fLen[nb] = fSeekSortLen[i];
} else {
fLen[nb] += fSeekSortLen[i];
}
}
fNb = nb+1;
fIsSorted = kTRUE;
}
void TFileCacheRead::SecondSort()
{
if (!fBNseek) return;
TMath::Sort(fBNseek,fBSeek,fBSeekIndex,kFALSE);
Int_t i;
Int_t nb = 0;
Int_t effectiveNseek = 0;
for (i=0;i<fBNseek;i++) {
Int_t ind = fBSeekIndex[i];
if (effectiveNseek!=0 && fBSeek[ind]==fBSeekSort[effectiveNseek-1])
{
if (fBSeekSortLen[effectiveNseek-1] < fBSeekLen[ind]) {
fBSeekSortLen[effectiveNseek-1] = fBSeekLen[ind];
}
continue;
}
fBSeekSort[effectiveNseek] = fBSeek[ind];
fBSeekSortLen[effectiveNseek] = fBSeekLen[ind];
++effectiveNseek;
}
fBNseek = effectiveNseek;
if (fBNtot > fBufferSizeMin) {
fBufferSize = fBNtot + 100;
delete [] fBuffer;
fBuffer = 0;
if (!fAsyncReading)
fBuffer = new char[fBufferSize];
}
fBPos[0] = fBSeekSort[0];
fBLen[0] = fBSeekSortLen[0];
fBSeekPos[0] = 0;
for (i=1;i<fBNseek;i++) {
fBSeekPos[i] = fBSeekPos[i-1] + fBSeekSortLen[i-1];
if ((fBSeekSort[i] != fBSeekSort[i-1]+fBSeekSortLen[i-1]) ||
(fBLen[nb] > 16000000)) {
nb++;
fBPos[nb] = fBSeekSort[i];
fBLen[nb] = fBSeekSortLen[i];
} else {
fBLen[nb] += fBSeekSortLen[i];
}
}
fBNb = nb+1;
fBIsSorted = kTRUE;
}
TFilePrefetch* TFileCacheRead::GetPrefetchObj(){
return this->fPrefetch;
}
void TFileCacheRead::WaitFinishPrefetch()
{
if ( fEnablePrefetching && fPrefetch ) {
fPrefetch->WaitFinishPrefetch();
}
}
void TFileCacheRead::SetEnablePrefetching(Bool_t setPrefetching)
{
SetEnablePrefetchingImpl(setPrefetching);
}
void TFileCacheRead::SetEnablePrefetchingImpl(Bool_t setPrefetching)
{
fEnablePrefetching = setPrefetching;
if (!fPrefetch && fEnablePrefetching) {
fPrefetch = new TFilePrefetch(fFile);
const char* cacheDir = gEnv->GetValue("Cache.Directory", "");
if (strcmp(cacheDir, ""))
if (!fPrefetch->SetCache((char*) cacheDir))
fprintf(stderr, "Error while trying to set the cache directory: %s.\n", cacheDir);
if (fPrefetch->ThreadStart()){
fprintf(stderr,"Error stating prefetching thread. Disabling prefetching.\n");
fEnablePrefetching = 0;
}
} else if (fPrefetch && !fEnablePrefetching) {
SafeDelete(fPrefetch);
fPrefetch = NULL;
}
if (fEnablePrefetching) {
fAsyncReading = kFALSE;
}
else {
fAsyncReading = gEnv->GetValue("TFile.AsyncReading", 0);
if (fAsyncReading) {
fAsyncReading = kFALSE;
if (fFile && !(fFile->ReadBufferAsync(0, 0)))
fAsyncReading = kTRUE;
}
if (!fAsyncReading && fBuffer == 0) {
fBuffer = new char[fBufferSize];
}
}
}