26 inline int xtod(
char c) {
return (c>=
'0' && c<=
'9') ? c-
'0' : ((c>=
'A' && c<=
'F') ? c-
'A'+10 : ((c>=
'a' && c<=
'f') ? c-
'a'+10 : 0)); }
56 fPendingBlocks =
new TList();
57 fReadBlocks =
new TList();
59 fPendingBlocks->SetOwner();
60 fReadBlocks->SetOwner();
62 fMutexReadList =
new TMutex();
63 fMutexPendingList =
new TMutex();
98 fSemMasterWorker->Post();
100 TMutex *mutexCond = fNewBlockAdded->GetMutex();
101 while ( fSemWorkerMaster->Wait(10) != 0 ) {
103 fNewBlockAdded->Signal();
119 if (CheckBlockInCache(path, block)){
125 if (fFile->GetArchive()) {
127 block->
SetPos(i, block->
GetPos(i) - fFile->GetArchiveOffset());
142 while((block = GetPendingBlock())){
143 ReadAsync(block, inCache);
146 SaveBlockInCache(block);
155 Int_t first = 0, last = -1, mid = -1;
158 while (first <= last){
159 mid = first + (last - first) / 2;
160 if ((offset >= blockObj->
GetPos(mid) && offset <= (blockObj->
GetPos(mid) + blockObj->
GetLen(mid))
161 && ( (offset + len) <= blockObj->
GetPos(mid) + blockObj->
GetLen(mid)))){
166 else if (blockObj->
GetPos(mid) < offset){
181 return Long64_t(fWaitTime.RealTime()*1.e+6);
191 TMutex *mutexBlocks = fMutexReadList;
199 if (BinarySearchReadList(blockObj, offset, len, &index)){
210 fReadBlockAdded->Wait();
217 pBuff += (offset - blockObj->
GetPos(index));
218 memcpy(buf, pBuff, len);
229 TFPBlock* block = CreateBlockObj(offset, len, nblock);
230 AddPendingBlock(block);
238 TMutex *mutexBlocks = fMutexPendingList;
239 TMutex *mutexCond = fNewBlockAdded->GetMutex();
242 fPendingBlocks->Add(block);
246 fNewBlockAdded->Signal();
256 TMutex *mutex = fMutexPendingList;
259 if (fPendingBlocks->GetSize()){
260 block = (
TFPBlock*)fPendingBlocks->First();
261 block = (
TFPBlock*)fPendingBlocks->Remove(block);
272 TMutex *mutexCond = fReadBlockAdded->GetMutex();
273 TMutex *mutex = fMutexReadList;
278 movedBlock = (
TFPBlock*)fReadBlocks->Remove(movedBlock);
283 fReadBlocks->Add(block);
288 fReadBlockAdded->Signal();
299 TMutex *mutex = fMutexReadList;
304 blockObj =
static_cast<TFPBlock*
>(fReadBlocks->First());
305 fReadBlocks->Remove(blockObj);
311 blockObj =
new TFPBlock(offset, len, noblock);
335 if (!fThreadJoined) {
336 fSemChangeFile->Wait();
341 fMutexPendingList->Lock();
342 fPendingBlocks->Clear();
343 fMutexPendingList->UnLock();
345 fMutexReadList->Lock();
346 fReadBlocks->Clear();
347 fMutexReadList->UnLock();
351 if (!fThreadJoined) {
352 fSemChangeFile->Post();
365 rc = fConsumer->Run();
380 semChangeFile->
Post();
382 semChangeFile->
Wait();
389 semChangeFile->
Post();
391 semChangeFile->
Wait();
406 const char* ptr =
hex;
409 result +=
xtod(ptr[i]);
419 if (fPathCache ==
"")
441 value = SumHex(fileName);
444 dirName.
Form(
"%i", value);
446 fullPath +=
"/" + dirName +
"/" + fileName;
450 path =
new char[fullPath.
Length() + 1];
451 strlcpy(path, fullPath,fullPath.
Length() + 1);
468 strPath +=
"?filetype=raw";
474 buffer = (
char*) calloc(length,
sizeof(
char));
477 fFile->fBytesRead +=
length;
478 fFile->fgBytesRead +=
length;
479 fFile->SetReadCalls(fFile->GetReadCalls() + 1);
480 fFile->fgReadCalls++;
485 gPerfStats->FileReadEvent(fFile, length, start);
498 if (fPathCache ==
"")
515 TString fullPath( fPathCache );
517 dirName.
Form(
"%i", value);
518 fullPath += (
"/" + dirName);
524 fullPath += (
"/" + fileName);
527 fullPath +=
"?filetype=raw";
530 fullPath +=
"?filetype=raw";
void AddPendingBlock(TFPBlock *)
Safe method to add a block to the pendingList.
void *(* VoidRtnFunc_t)(void *)
void ReadAsync(TFPBlock *, Bool_t &)
Read one block and insert it in prefetchBuffers list.
Bool_t BinarySearchReadList(TFPBlock *, Long64_t, Int_t, Int_t *)
Search for a requested element in a block and return the index.
void ReallocBlock(Long64_t *, Int_t *, Int_t)
Reallocate the block's buffer based on the length of the elements it will contain.
Bool_t CheckBlockInCache(char *&, TFPBlock *)
Test if the block is in cache.
void Final()
MD5 finalization, ends an MD5 message-digest operation, writing the the message digest and zeroizing ...
char * GetPtrToPiece(Int_t index) const
Get block buffer.
TSemaphore * fSemMasterWorker
static TThread::VoidRtnFunc_t ThreadProc(void *)
Execution loop of the consumer thread.
int GetPathInfo(const char *path, Long_t *id, Long_t *size, Long_t *flags, Long_t *modtime)
Get info about a file: id, size, flags, modification time.
Int_t TryWait()
If semaphore value is > 0 then decrement it and return 0.
static const int kMAX_READ_SIZE
Int_t Wait(Int_t millisec=0)
If semaphore value is > 0 then decrement it and carry on.
A ROOT file is a suite of consecutive data records (TKey instances) with a well defined format...
virtual Bool_t ReadBuffer(char *buf, Int_t len)
Read a buffer from the file.
Long64_t GetWaitTime()
Return the time spent wating for buffer to be read in microseconds.
TCondition * fNewBlockAdded
Int_t ThreadStart()
Used to start the consumer thread.
virtual int mkdir(const char *name, Bool_t recursive=kFALSE)
Make a file system directory.
Int_t UnLock()
Unlock the mutex.
void SaveBlockInCache(TFPBlock *)
Save the block content in cache.
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=1, Int_t netopt=0)
Create / open a file.
const char * Data() const
TSemaphore * fSemChangeFile
void SetPos(Int_t, Long64_t)
Set pos value for index idx.
const char * AsString() const
Return message digest as string.
This code implements the MD5 message-digest algorithm.
std::map< std::string, std::string >::const_iterator iter
R__EXTERN TVirtualMonitoringWriter * gMonitoringWriter
void SetBuffer(char *)
Set block buffer.
void SetFile(TFile *)
Change the file.
This class represents the encapsulation of a block request.
void AddReadBlock(TFPBlock *)
Safe method to add a block to the readList.
Int_t Wait()
Wait to be signaled.
TThread * GetThread() const
Return reference to the consumer thread.
char * GetBuffer() const
Get block buffer.
Double_t length(const TVector2 &v)
void Update(const UChar_t *buf, UInt_t len)
Update TMD5 object to reflect the concatenation of another buffer full of bytes.
R__EXTERN TSystem * gSystem
void WaitFinishPrefetch()
Killing the async prefetching thread.
virtual Bool_t WriteBuffer(const char *buf, Int_t len)
Write a buffer to the file.
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
Int_t GetLen(Int_t) const
Get length of the element at index i.
Bool_t ReadBuffer(char *, Long64_t, Int_t)
Return a prefetched element.
Int_t Post()
If any threads are blocked in Wait(), wake one of them up and increment the value of the semaphore...
Int_t SumHex(const char *)
Sum up individual hex values to obtain a decimal value.
void ReadListOfBlocks()
Get blocks specified in prefetchBlocks.
virtual ~TFilePrefetch()
Destructor.
Long64_t GetPos(Int_t) const
Get position of the element at index i.
The TTimeStamp encapsulates seconds and ns since EPOCH.
Int_t Lock()
Lock the mutex.
The prefetching mechanism uses two classes (TFilePrefetch and TFPBlock) to prefetch in advance a bloc...
virtual Bool_t SendFileReadProgress(TFile *)
char * GetBlockFromCache(const char *, Int_t)
Return a buffer from cache.
Int_t GetNoElem() const
Return number of elements in the block.
Bool_t SetCache(const char *)
Set the path of the cache directory.
void ReadBlock(Long64_t *, Int_t *, Int_t)
Create a TFPBlock object or recycle one and add it to the prefetchBlocks list.
virtual void * OpenDirectory(const char *name)
Open a directory. Returns 0 if directory does not exist.
Long64_t GetDataSize() const
Return size of the data in the block.
TSemaphore * fSemWorkerMaster
TFPBlock * GetPendingBlock()
Safe method to remove a block from the pendingList.
virtual void Close(Option_t *option="")
Close a file.
TFPBlock * CreateBlockObj(Long64_t *, Int_t *, Int_t)
Create a new block or recycle an old one.