#include "TFilePrefetch.h"
#include "TTimeStamp.h"
#include "TVirtualPerfStats.h"
#include "TVirtualMonitoring.h"
#include <iostream>
#include <string>
#include <sstream>
#include <cstdio>
#include <cstdlib>
#include <cctype>
static const int kMAX_READ_SIZE = 2;
inline int xtod(char c) { return (c>='0' && c<='9') ? c-'0' : ((c>='A' && c<='F') ? c-'A'+10 : ((c>='a' && c<='f') ? c-'a'+10 : 0)); }
using namespace std;
ClassImp(TFilePrefetch)
TFilePrefetch::TFilePrefetch(TFile* file)
{
fConsumer = 0;
fFile = file;
fPendingBlocks = new TList();
fReadBlocks = new TList();
fMutexReadList = new TMutex();
fMutexPendingList = new TMutex();
fMutexSynch = new TMutex();
fNewBlockAdded = new TCondition(0);
fReadBlockAdded = new TCondition(0);
fSem = new TSemaphore(0);
}
TFilePrefetch::~TFilePrefetch()
{
fMutexSynch->Lock();
fMutexSynch->UnLock();
fSem->Post();
fNewBlockAdded->Signal();
fConsumer->Join();
SafeDelete(fConsumer);
SafeDelete(fPendingBlocks);
SafeDelete(fReadBlocks);
SafeDelete(fMutexReadList);
SafeDelete(fMutexPendingList);
SafeDelete(fMutexSynch);
SafeDelete(fNewBlockAdded);
SafeDelete(fReadBlockAdded);
SafeDelete(fSem);
}
void TFilePrefetch::ReadAsync(TFPBlock* block, Bool_t &inCache)
{
char* path = 0;
if (CheckBlockInCache(path, block)){
block->SetBuffer(GetBlockFromCache(path, block->GetFullSize()));
inCache = kTRUE;
}
else{
fFile->ReadBuffers(block->GetBuffer(), block->GetPos(), block->GetLen(), block->GetNoElem());
if (fFile->GetArchive()){
for (Int_t i = 0; i < block->GetNoElem(); i++)
block->SetPos(i, block->GetPos(i) - fFile->GetArchiveOffset());
}
inCache =kFALSE;
}
delete[] path;
}
void TFilePrefetch::ReadListOfBlocks()
{
Bool_t inCache = kFALSE;
TFPBlock* block = 0;
while((block = GetPendingBlock())){
ReadAsync(block, inCache);
AddReadBlock(block);
if (!inCache)
SaveBlockInCache(block);
}
}
Bool_t TFilePrefetch::BinarySearchReadList(TFPBlock* blockObj, Long64_t offset, Int_t len, Int_t* index)
{
Int_t first = 0, last = -1, mid = -1;
last = (Int_t) blockObj->GetNoElem()-1;
while (first <= last){
mid = first + (last - first) / 2;
if ((offset >= blockObj->GetPos(mid) && offset <= (blockObj->GetPos(mid) + blockObj->GetLen(mid))
&& ( (offset + len) <= blockObj->GetPos(mid) + blockObj->GetLen(mid)))){
*index = mid;
return true;
}
else if (blockObj->GetPos(mid) < offset){
first = mid + 1;
}
else{
last = mid - 1;
}
}
return false;
}
Long64_t TFilePrefetch::GetWaitTime()
{
return Long64_t(fWaitTime.RealTime()*1.e+6);
}
Bool_t TFilePrefetch::ReadBuffer(char* buf, Long64_t offset, Int_t len)
{
Bool_t found = false;
TFPBlock* blockObj = 0;
TMutex *mutexBlocks = fMutexReadList;
Int_t index = -1;
while (1){
mutexBlocks->Lock();
TIter iter(fReadBlocks);
while ((blockObj = (TFPBlock*) iter.Next())){
index = -1;
if (BinarySearchReadList(blockObj, offset, len, &index)){
found = true;
break;
}
}
if (found)
break;
else{
mutexBlocks->UnLock();
fWaitTime.Start(kFALSE);
fReadBlockAdded->Wait();
fWaitTime.Stop();
}
}
if (found){
Int_t auxInt = 0;
char* ptrInt = 0;
for(Int_t i=0; i < blockObj->GetNoElem(); i++){
ptrInt = blockObj->GetBuffer();
ptrInt += auxInt;
if (index == i){
ptrInt+= (offset - blockObj->GetPos(i));
memcpy(buf, ptrInt, len);
break;
}
auxInt += blockObj->GetLen(i);
}
}
mutexBlocks->UnLock();
return found;
}
void TFilePrefetch::ReadBlock(Long64_t* offset, Int_t* len, Int_t nblock)
{
TFPBlock* block = CreateBlockObj(offset, len, nblock);
AddPendingBlock(block);
}
void TFilePrefetch::AddPendingBlock(TFPBlock* block)
{
TMutex *mutexBlocks = fMutexPendingList;
mutexBlocks->Lock();
fPendingBlocks->Add(block);
mutexBlocks->UnLock();
fNewBlockAdded->Signal();
}
TFPBlock* TFilePrefetch::GetPendingBlock()
{
TFPBlock* block = 0;
TMutex *mutexBlocks = fMutexPendingList;
mutexBlocks->Lock();
if (fPendingBlocks->GetSize()){
block = (TFPBlock*)fPendingBlocks->First();
block = (TFPBlock*)fPendingBlocks->Remove(block);
}
mutexBlocks->UnLock();
return block;
}
void TFilePrefetch::AddReadBlock(TFPBlock* block)
{
TMutex *mutexBlocks = fMutexReadList;
mutexBlocks->Lock();
if (fReadBlocks->GetSize() >= kMAX_READ_SIZE){
TFPBlock* movedBlock = (TFPBlock*) fReadBlocks->First();
movedBlock = (TFPBlock*)fReadBlocks->Remove(movedBlock);
delete movedBlock;
movedBlock = 0;
}
fReadBlocks->Add(block);
mutexBlocks->UnLock();
fReadBlockAdded->Signal();
}
TFPBlock* TFilePrefetch::CreateBlockObj(Long64_t* offset, Int_t* len, Int_t noblock)
{
TFPBlock* blockObj = 0;
TMutex *mutexRead = fMutexReadList;
mutexRead->Lock();
if (fReadBlocks->GetSize() >= kMAX_READ_SIZE){
blockObj = static_cast<TFPBlock*>(fReadBlocks->First());
fReadBlocks->Remove(blockObj);
blockObj->ReallocBlock(offset, len, noblock);
mutexRead->UnLock();
}
else{
mutexRead->UnLock();
blockObj = new TFPBlock(offset, len, noblock);
}
return blockObj;
}
TThread* TFilePrefetch::GetThread() const
{
return fConsumer;
}
void TFilePrefetch::SetFile(TFile *file)
{
fFile = file;
}
Int_t TFilePrefetch::ThreadStart()
{
int rc;
fConsumer= new TThread((TThread::VoidRtnFunc_t) ThreadProc,
(void*) this);
rc = fConsumer->Run();
return rc;
}
TThread::VoidRtnFunc_t TFilePrefetch::ThreadProc(void* arg)
{
TFilePrefetch* tmp = (TFilePrefetch*) arg;
tmp->fMutexSynch->Lock();
while(tmp->fSem->TryWait() !=0){
tmp->ReadListOfBlocks();
if (tmp->fSem->TryWait() == 0) break;
tmp->fMutexSynch->UnLock();
tmp->fNewBlockAdded->Wait();
tmp->fMutexSynch->Lock();
}
tmp->fMutexSynch->UnLock();
return (TThread::VoidRtnFunc_t) 1;
}
Int_t TFilePrefetch::SumHex(const char *hex)
{
Int_t result = 0;
const char* ptr = hex;
for(Int_t i=0; i < (Int_t)strlen(hex); i++)
result += xtod(ptr[i]);
return result;
}
Bool_t TFilePrefetch::CheckBlockInCache(char*& path, TFPBlock* block)
{
if (fPathCache == "")
return false;
Bool_t found = false;
TString fullPath(fPathCache);
Int_t value = 0;
if (gSystem->OpenDirectory(fullPath) == 0)
gSystem->mkdir(fullPath);
TMD5* md = new TMD5();
TString concatStr;
for (Int_t i=0; i < block->GetNoElem(); i++){
concatStr.Form("%lld", block->GetPos(i));
md->Update((UChar_t*)concatStr.Data(), concatStr.Length());
}
md->Final();
TString fileName( md->AsString() );
value = SumHex(fileName);
value = value % 16;
TString dirName;
dirName.Form("%i", value);
fullPath += "/" + dirName + "/" + fileName;
FileStat_t stat;
if (gSystem->GetPathInfo(fullPath, stat) == 0) {
path = new char[fullPath.Length() + 1];
strlcpy(path, fullPath,fullPath.Length() + 1);
found = true;
} else
found = false;
delete md;
return found;
}
char* TFilePrefetch::GetBlockFromCache(const char* path, Int_t length)
{
char *buffer = 0;
TString strPath = path;
strPath += "?filetype=raw";
TFile* file = new TFile(strPath);
Double_t start = 0;
if (gPerfStats != 0) start = TTimeStamp();
buffer = (char*) calloc(length+1, sizeof(char));
file->ReadBuffer(buffer, 0, length);
fFile->fBytesRead += length;
fFile->fgBytesRead += length;
fFile->SetReadCalls(fFile->GetReadCalls() + 1);
fFile->fgReadCalls++;
if (gMonitoringWriter)
gMonitoringWriter->SendFileReadProgress(fFile);
if (gPerfStats != 0) {
gPerfStats->FileReadEvent(fFile, length, start);
}
delete file;
return buffer;
}
void TFilePrefetch::SaveBlockInCache(TFPBlock* block)
{
if (fPathCache == "")
return;
TMD5* md = new TMD5();
TString concatStr;
for(Int_t i=0; i< block->GetNoElem(); i++){
concatStr.Form("%lld", block->GetPos(i));
md->Update((UChar_t*)concatStr.Data(), concatStr.Length());
}
md->Final();
TString fileName( md->AsString() );
Int_t value = SumHex(fileName);
value = value % 16;
TString fullPath( fPathCache );
TString dirName;
dirName.Form("%i", value);
fullPath += ("/" + dirName);
if (gSystem->OpenDirectory(fullPath) == false)
gSystem->mkdir(fullPath);
TFile* file = 0;
fullPath += ("/" + fileName);
FileStat_t stat;
if (gSystem->GetPathInfo(fullPath, stat) == 0) {
fullPath += "?filetype=raw";
file = TFile::Open(fullPath, "update");
} else{
fullPath += "?filetype=raw";
file = TFile::Open(fullPath, "new");
}
file->WriteBuffer(block->GetBuffer(), block->GetFullSize());
file->Close();
delete file;
delete md;
}
Bool_t TFilePrefetch::CheckCachePath(const char* locationCache)
{
Bool_t found = true;
TString path = locationCache;
Ssiz_t pos = path.Index(":/");
if (pos > 0) {
TSubString prot = path(0, pos);
TSubString dir = path(pos + 2, path.Length());
TString protocol(prot);
TString directory(dir);
for(Int_t i=0; i < directory.Sizeof()-1; i++)
if (!isdigit(directory[i]) && !isalpha(directory[i]) && directory[i] !='/' && directory[i] != ':'){
found = false;
break;
}
} else
found = false;
return found;
}
Bool_t TFilePrefetch::SetCache(const char* path)
{
if (CheckCachePath(path)){
fPathCache = path;
if (!gSystem->OpenDirectory(path)){
gSystem->mkdir(path);
}
} else
return false;
return true;
}