ROOT  6.06/09
Reference Guide
TFilePrefetch.cxx
Go to the documentation of this file.
1 // @(#)root/io:$Id$
2 // Author: Elvin Sindrilaru 19/05/2011
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2011, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 #include "TFilePrefetch.h"
13 #include "TTimeStamp.h"
14 #include "TVirtualPerfStats.h"
15 #include "TVirtualMonitoring.h"
16 
17 #include <iostream>
18 #include <string>
19 #include <sstream>
20 #include <cstdio>
21 #include <cstdlib>
22 #include <cctype>
23 
24 static const int kMAX_READ_SIZE = 2; //maximum size of the read list of blocks
25 
26 inline int xtod(char c) { return (c>='0' && c<='9') ? c-'0' : ((c>='A' && c<='F') ? c-'A'+10 : ((c>='a' && c<='f') ? c-'a'+10 : 0)); }
27 
28 using namespace std;
29 
31 
32 /**
33 \class TFilePrefetch
34 \ingroup IO
35 
36 The prefetching mechanism uses two classes (TFilePrefetch and
37 TFPBlock) to prefetch in advance a block of tree entries. There is
38 a thread which takes care of actually transferring the blocks and
39 making them available to the main requesting thread. Therefore,
40 the time spent by the main thread waiting for the data before
41 processing considerably decreases. Besides the prefetching
42 mechanisms there is also a local caching option which can be
43 enabled by the user. Both capabilities are disabled by default
44 and must be explicitly enabled by the user.
45 */
46 
47 
48 ////////////////////////////////////////////////////////////////////////////////
49 /// Constructor.
50 
52  fFile(file),
53  fConsumer(0),
54  fThreadJoined(kTRUE)
55 {
56  fPendingBlocks = new TList();
57  fReadBlocks = new TList();
58 
59  fPendingBlocks->SetOwner();
60  fReadBlocks->SetOwner();
61 
62  fMutexReadList = new TMutex();
63  fMutexPendingList = new TMutex();
64  fNewBlockAdded = new TCondition(0);
65  fReadBlockAdded = new TCondition(0);
66  fSemMasterWorker = new TSemaphore(0);
67  fSemWorkerMaster = new TSemaphore(0);
68  fSemChangeFile = new TSemaphore(0);
69 }
70 
71 ////////////////////////////////////////////////////////////////////////////////
72 /// Destructor
73 
75 {
76  if (!fThreadJoined) {
77  WaitFinishPrefetch();
78  }
79 
80  SafeDelete(fConsumer);
81  SafeDelete(fPendingBlocks);
82  SafeDelete(fReadBlocks);
83  SafeDelete(fMutexReadList);
84  SafeDelete(fMutexPendingList);
85  SafeDelete(fNewBlockAdded);
86  SafeDelete(fReadBlockAdded);
87  SafeDelete(fSemMasterWorker);
88  SafeDelete(fSemWorkerMaster);
89  SafeDelete(fSemChangeFile);
90 }
91 
92 
93 ////////////////////////////////////////////////////////////////////////////////
94 /// Killing the async prefetching thread
95 
97 {
98  fSemMasterWorker->Post();
99 
100  TMutex *mutexCond = fNewBlockAdded->GetMutex();
101  while ( fSemWorkerMaster->Wait(10) != 0 ) {
102  mutexCond->Lock();
103  fNewBlockAdded->Signal();
104  mutexCond->UnLock();
105  }
106 
107  fConsumer->Join();
108  fThreadJoined=kTRUE;
109 }
110 
111 
112 ////////////////////////////////////////////////////////////////////////////////
113 /// Read one block and insert it in prefetchBuffers list.
114 
116 {
117  char* path = 0;
118 
119  if (CheckBlockInCache(path, block)){
120  block->SetBuffer(GetBlockFromCache(path, block->GetDataSize()));
121  inCache = kTRUE;
122  }
123  else{
124  fFile->ReadBuffers(block->GetBuffer(), block->GetPos(), block->GetLen(), block->GetNoElem());
125  if (fFile->GetArchive()) {
126  for (Int_t i = 0; i < block->GetNoElem(); i++)
127  block->SetPos(i, block->GetPos(i) - fFile->GetArchiveOffset());
128  }
129  inCache =kFALSE;
130  }
131  delete[] path;
132 }
133 
134 ////////////////////////////////////////////////////////////////////////////////
135 /// Get blocks specified in prefetchBlocks.
136 
138 {
139  Bool_t inCache = kFALSE;
140  TFPBlock* block = 0;
141 
142  while((block = GetPendingBlock())){
143  ReadAsync(block, inCache);
144  AddReadBlock(block);
145  if (!inCache)
146  SaveBlockInCache(block);
147  }
148 }
149 
150 ////////////////////////////////////////////////////////////////////////////////
151 /// Search for a requested element in a block and return the index.
152 
154 {
155  Int_t first = 0, last = -1, mid = -1;
156  last = (Int_t) blockObj->GetNoElem()-1;
157 
158  while (first <= last){
159  mid = first + (last - first) / 2;
160  if ((offset >= blockObj->GetPos(mid) && offset <= (blockObj->GetPos(mid) + blockObj->GetLen(mid))
161  && ( (offset + len) <= blockObj->GetPos(mid) + blockObj->GetLen(mid)))){
162 
163  *index = mid;
164  return true;
165  }
166  else if (blockObj->GetPos(mid) < offset){
167  first = mid + 1;
168  }
169  else{
170  last = mid - 1;
171  }
172  }
173  return false;
174 }
175 
176 ////////////////////////////////////////////////////////////////////////////////
177 /// Return the time spent wating for buffer to be read in microseconds.
178 
180 {
181  return Long64_t(fWaitTime.RealTime()*1.e+6);
182 }
183 
184 ////////////////////////////////////////////////////////////////////////////////
185 /// Return a prefetched element.
186 
188 {
189  Bool_t found = false;
190  TFPBlock* blockObj = 0;
191  TMutex *mutexBlocks = fMutexReadList;
192  Int_t index = -1;
193 
194  while (1){
195  mutexBlocks->Lock();
196  TIter iter(fReadBlocks);
197  while ((blockObj = (TFPBlock*) iter.Next())){
198  index = -1;
199  if (BinarySearchReadList(blockObj, offset, len, &index)){
200  found = true;
201  break;
202  }
203  }
204  if (found)
205  break;
206  else{
207  mutexBlocks->UnLock();
208 
209  fWaitTime.Start(kFALSE);
210  fReadBlockAdded->Wait(); //wait for a new block to be added
211  fWaitTime.Stop();
212  }
213  }
214 
215  if (found){
216  char *pBuff = blockObj->GetPtrToPiece(index);
217  pBuff += (offset - blockObj->GetPos(index));
218  memcpy(buf, pBuff, len);
219  }
220  mutexBlocks->UnLock();
221  return found;
222 }
223 
224 ////////////////////////////////////////////////////////////////////////////////
225 /// Create a TFPBlock object or recycle one and add it to the prefetchBlocks list.
226 
227 void TFilePrefetch::ReadBlock(Long64_t* offset, Int_t* len, Int_t nblock)
228 {
229  TFPBlock* block = CreateBlockObj(offset, len, nblock);
230  AddPendingBlock(block);
231 }
232 
233 ////////////////////////////////////////////////////////////////////////////////
234 /// Safe method to add a block to the pendingList.
235 
237 {
238  TMutex *mutexBlocks = fMutexPendingList;
239  TMutex *mutexCond = fNewBlockAdded->GetMutex();
240 
241  mutexBlocks->Lock();
242  fPendingBlocks->Add(block);
243  mutexBlocks->UnLock();
244 
245  mutexCond->Lock();
246  fNewBlockAdded->Signal();
247  mutexCond->UnLock();
248 }
249 
250 ////////////////////////////////////////////////////////////////////////////////
251 /// Safe method to remove a block from the pendingList.
252 
254 {
255  TFPBlock* block = 0;
256  TMutex *mutex = fMutexPendingList;
257  mutex->Lock();
258 
259  if (fPendingBlocks->GetSize()){
260  block = (TFPBlock*)fPendingBlocks->First();
261  block = (TFPBlock*)fPendingBlocks->Remove(block);
262  }
263  mutex->UnLock();
264  return block;
265 }
266 
267 ////////////////////////////////////////////////////////////////////////////////
268 /// Safe method to add a block to the readList.
269 
271 {
272  TMutex *mutexCond = fReadBlockAdded->GetMutex();
273  TMutex *mutex = fMutexReadList;
274  mutex->Lock();
275 
276  if (fReadBlocks->GetSize() >= kMAX_READ_SIZE){
277  TFPBlock* movedBlock = (TFPBlock*) fReadBlocks->First();
278  movedBlock = (TFPBlock*)fReadBlocks->Remove(movedBlock);
279  delete movedBlock;
280  movedBlock = 0;
281  }
282 
283  fReadBlocks->Add(block);
284  mutex->UnLock();
285 
286  //signal the addition of a new block
287  mutexCond->Lock();
288  fReadBlockAdded->Signal();
289  mutexCond->UnLock();
290 }
291 
292 
293 ////////////////////////////////////////////////////////////////////////////////
294 /// Create a new block or recycle an old one.
295 
297 {
298  TFPBlock* blockObj = 0;
299  TMutex *mutex = fMutexReadList;
300 
301  mutex->Lock();
302 
303  if (fReadBlocks->GetSize() >= kMAX_READ_SIZE){
304  blockObj = static_cast<TFPBlock*>(fReadBlocks->First());
305  fReadBlocks->Remove(blockObj);
306  mutex->UnLock();
307  blockObj->ReallocBlock(offset, len, noblock);
308  }
309  else{
310  mutex->UnLock();
311  blockObj = new TFPBlock(offset, len, noblock);
312  }
313  return blockObj;
314 }
315 
316 ////////////////////////////////////////////////////////////////////////////////
317 /// Return reference to the consumer thread.
318 
320 {
321  return fConsumer;
322 }
323 
324 
325 ////////////////////////////////////////////////////////////////////////////////
326 /// Change the file
327 ///
328 /// When prefetching is enabled we also need to:
329 /// - make sure the async thread is not doing any work
330 /// - clear all blocks from prefetching and read list
331 /// - reset the file pointer
332 
334 {
335  if (!fThreadJoined) {
336  fSemChangeFile->Wait();
337  }
338 
339  if (fFile) {
340  // Remove all pending and read blocks
341  fMutexPendingList->Lock();
342  fPendingBlocks->Clear();
343  fMutexPendingList->UnLock();
344 
345  fMutexReadList->Lock();
346  fReadBlocks->Clear();
347  fMutexReadList->UnLock();
348  }
349 
350  fFile = file;
351  if (!fThreadJoined) {
352  fSemChangeFile->Post();
353  }
354 }
355 
356 
357 ////////////////////////////////////////////////////////////////////////////////
358 /// Used to start the consumer thread.
359 
361 {
362  int rc;
363 
364  fConsumer = new TThread((TThread::VoidRtnFunc_t) ThreadProc, (void*) this);
365  rc = fConsumer->Run();
366  if ( !rc ) {
367  fThreadJoined = kFALSE;
368  }
369  return rc;
370 }
371 
372 
373 ////////////////////////////////////////////////////////////////////////////////
374 /// Execution loop of the consumer thread.
375 
377 {
378  TFilePrefetch* pClass = (TFilePrefetch*) arg;
379  TSemaphore* semChangeFile = pClass->fSemChangeFile;
380  semChangeFile->Post();
381  pClass->fNewBlockAdded->Wait();
382  semChangeFile->Wait();
383 
384  while( pClass->fSemMasterWorker->TryWait() != 0 ) {
385  pClass->ReadListOfBlocks();
386 
387  // Use the semaphore to deal with the case when the file pointer
388  // is changed on the fly by TChain
389  semChangeFile->Post();
390  pClass->fNewBlockAdded->Wait();
391  semChangeFile->Wait();
392  }
393 
394  pClass->fSemWorkerMaster->Post();
395  return (TThread::VoidRtnFunc_t) 1;
396 }
397 
398 //############################# CACHING PART ###################################
399 
400 ////////////////////////////////////////////////////////////////////////////////
401 /// Sum up individual hex values to obtain a decimal value.
402 
404 {
405  Int_t result = 0;
406  const char* ptr = hex;
407 
408  for(Int_t i=0; i < (Int_t)strlen(hex); i++)
409  result += xtod(ptr[i]);
410 
411  return result;
412 }
413 
414 ////////////////////////////////////////////////////////////////////////////////
415 /// Test if the block is in cache.
416 
418 {
419  if (fPathCache == "")
420  return false;
421 
422  Bool_t found = false;
423  TString fullPath(fPathCache); // path of the cached files.
424 
425  Int_t value = 0;
426 
427  if (!gSystem->OpenDirectory(fullPath))
428  gSystem->mkdir(fullPath);
429 
430  //dir is SHA1 value modulo 16; filename is the value of the SHA1(offset+len)
431  TMD5* md = new TMD5();
432 
433  TString concatStr;
434  for (Int_t i=0; i < block->GetNoElem(); i++){
435  concatStr.Form("%lld", block->GetPos(i));
436  md->Update((UChar_t*)concatStr.Data(), concatStr.Length());
437  }
438 
439  md->Final();
440  TString fileName( md->AsString() );
441  value = SumHex(fileName);
442  value = value % 16;
443  TString dirName;
444  dirName.Form("%i", value);
445 
446  fullPath += "/" + dirName + "/" + fileName;
447 
448  FileStat_t stat;
449  if (gSystem->GetPathInfo(fullPath, stat) == 0) {
450  path = new char[fullPath.Length() + 1];
451  strlcpy(path, fullPath,fullPath.Length() + 1);
452  found = true;
453  } else
454  found = false;
455 
456  delete md;
457  return found;
458 }
459 
460 ////////////////////////////////////////////////////////////////////////////////
461 /// Return a buffer from cache.
462 
464 {
465  char *buffer = 0;
466  TString strPath = path;
467 
468  strPath += "?filetype=raw";
469  TFile* file = new TFile(strPath);
470 
471  Double_t start = 0;
472  if (gPerfStats != 0) start = TTimeStamp();
473 
474  buffer = (char*) calloc(length, sizeof(char));
475  file->ReadBuffer(buffer, 0, length);
476 
477  fFile->fBytesRead += length;
478  fFile->fgBytesRead += length;
479  fFile->SetReadCalls(fFile->GetReadCalls() + 1);
480  fFile->fgReadCalls++;
481 
482  if (gMonitoringWriter)
484  if (gPerfStats != 0) {
485  gPerfStats->FileReadEvent(fFile, length, start);
486  }
487 
488  file->Close();
489  delete file;
490  return buffer;
491 }
492 
493 ////////////////////////////////////////////////////////////////////////////////
494 /// Save the block content in cache.
495 
497 {
498  if (fPathCache == "")
499  return;
500 
501  //dir is SHA1 value modulo 16; filename is the value of the SHA1
502  TMD5* md = new TMD5();
503 
504  TString concatStr;
505  for(Int_t i=0; i< block->GetNoElem(); i++){
506  concatStr.Form("%lld", block->GetPos(i));
507  md->Update((UChar_t*)concatStr.Data(), concatStr.Length());
508  }
509  md->Final();
510 
511  TString fileName( md->AsString() );
512  Int_t value = SumHex(fileName);
513  value = value % 16;
514 
515  TString fullPath( fPathCache );
516  TString dirName;
517  dirName.Form("%i", value);
518  fullPath += ("/" + dirName);
519 
520  if (!gSystem->OpenDirectory(fullPath))
521  gSystem->mkdir(fullPath);
522 
523  TFile* file = 0;
524  fullPath += ("/" + fileName);
525  FileStat_t stat;
526  if (gSystem->GetPathInfo(fullPath, stat) == 0) {
527  fullPath += "?filetype=raw";
528  file = TFile::Open(fullPath, "update");
529  } else{
530  fullPath += "?filetype=raw";
531  file = TFile::Open(fullPath, "new");
532  }
533 
534  if (file) {
535  // coverity[unchecked_value] We do not print error message, have not error
536  // return code and close the file anyway, not need to check the return value.
537  file->WriteBuffer(block->GetBuffer(), block->GetDataSize());
538  file->Close();
539  delete file;
540  }
541  delete md;
542 }
543 
544 
545 ////////////////////////////////////////////////////////////////////////////////
546 /// Set the path of the cache directory.
547 
549 {
550  fPathCache = path;
551 
552  if (!gSystem->OpenDirectory(path)){
553  return (!gSystem->mkdir(path) ? true : false);
554  }
555 
556  // Directory already exists
557  return true;
558 }
559 
void AddPendingBlock(TFPBlock *)
Safe method to add a block to the pendingList.
Definition: TMutex.h:37
void *(* VoidRtnFunc_t)(void *)
Definition: TThread.h:61
void ReadAsync(TFPBlock *, Bool_t &)
Read one block and insert it in prefetchBuffers list.
Bool_t BinarySearchReadList(TFPBlock *, Long64_t, Int_t, Int_t *)
Search for a requested element in a block and return the index.
long long Long64_t
Definition: RtypesCore.h:69
unsigned int hex
Definition: math.cpp:442
void ReallocBlock(Long64_t *, Int_t *, Int_t)
Reallocate the block's buffer based on the length of the elements it will contain.
Definition: TFPBlock.cxx:94
Bool_t CheckBlockInCache(char *&, TFPBlock *)
Test if the block is in cache.
void Final()
MD5 finalization, ends an MD5 message-digest operation, writing the the message digest and zeroizing ...
Definition: TMD5.cxx:165
char * GetPtrToPiece(Int_t index) const
Get block buffer.
Definition: TFPBlock.h:111
Ssiz_t Length() const
Definition: TString.h:390
TSemaphore * fSemMasterWorker
Definition: TFilePrefetch.h:64
static TThread::VoidRtnFunc_t ThreadProc(void *)
Execution loop of the consumer thread.
int GetPathInfo(const char *path, Long_t *id, Long_t *size, Long_t *flags, Long_t *modtime)
Get info about a file: id, size, flags, modification time.
Definition: TSystem.cxx:1363
Int_t TryWait()
If semaphore value is > 0 then decrement it and return 0.
Definition: TSemaphore.cxx:81
int xtod(char c)
static const int kMAX_READ_SIZE
Int_t Wait(Int_t millisec=0)
If semaphore value is > 0 then decrement it and carry on.
Definition: TSemaphore.cxx:38
A ROOT file is a suite of consecutive data records (TKey instances) with a well defined format...
Definition: TFile.h:45
virtual Bool_t ReadBuffer(char *buf, Int_t len)
Read a buffer from the file.
Definition: TFile.cxx:1596
Long64_t GetWaitTime()
Return the time spent wating for buffer to be read in microseconds.
Basic string class.
Definition: TString.h:137
int Int_t
Definition: RtypesCore.h:41
bool Bool_t
Definition: RtypesCore.h:59
const Bool_t kFALSE
Definition: Rtypes.h:92
STL namespace.
TCondition * fNewBlockAdded
Definition: TFilePrefetch.h:62
Int_t ThreadStart()
Used to start the consumer thread.
virtual int mkdir(const char *name, Bool_t recursive=kFALSE)
Make a file system directory.
Definition: TSystem.cxx:900
Int_t UnLock()
Unlock the mutex.
Definition: TMutex.cxx:68
void SaveBlockInCache(TFPBlock *)
Save the block content in cache.
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=1, Int_t netopt=0)
Create / open a file.
Definition: TFile.cxx:3851
const char * Data() const
Definition: TString.h:349
TSemaphore * fSemChangeFile
Definition: TFilePrefetch.h:66
#define SafeDelete(p)
Definition: RConfig.h:436
void SetPos(Int_t, Long64_t)
Set pos value for index idx.
Definition: TFPBlock.cxx:72
const char * AsString() const
Return message digest as string.
Definition: TMD5.cxx:218
This code implements the MD5 message-digest algorithm.
Definition: TMD5.h:46
std::map< std::string, std::string >::const_iterator iter
Definition: TAlienJob.cxx:54
R__EXTERN TVirtualMonitoringWriter * gMonitoringWriter
void SetBuffer(char *)
Set block buffer.
Definition: TFPBlock.cxx:81
void SetFile(TFile *)
Change the file.
This class represents the encapsulation of a block request.
Definition: TFPBlock.h:24
A doubly linked list.
Definition: TList.h:47
void AddReadBlock(TFPBlock *)
Safe method to add a block to the readList.
Int_t Wait()
Wait to be signaled.
Definition: TCondition.cxx:74
TThread * GetThread() const
Return reference to the consumer thread.
char * GetBuffer() const
Get block buffer.
Definition: TFPBlock.h:105
Double_t length(const TVector2 &v)
Definition: CsgOps.cxx:347
void Update(const UChar_t *buf, UInt_t len)
Update TMD5 object to reflect the concatenation of another buffer full of bytes.
Definition: TMD5.cxx:106
R__EXTERN TSystem * gSystem
Definition: TSystem.h:549
void WaitFinishPrefetch()
Killing the async prefetching thread.
virtual Bool_t WriteBuffer(const char *buf, Int_t len)
Write a buffer to the file.
Definition: TFile.cxx:2288
TObject * Next()
Definition: TCollection.h:158
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
Definition: TString.cxx:2321
Int_t GetLen(Int_t) const
Get length of the element at index i.
Definition: TFPBlock.h:99
Bool_t ReadBuffer(char *, Long64_t, Int_t)
Return a prefetched element.
Int_t Post()
If any threads are blocked in Wait(), wake one of them up and increment the value of the semaphore...
Definition: TSemaphore.cxx:105
Int_t SumHex(const char *)
Sum up individual hex values to obtain a decimal value.
#define gPerfStats
void ReadListOfBlocks()
Get blocks specified in prefetchBlocks.
virtual ~TFilePrefetch()
Destructor.
#define ClassImp(name)
Definition: Rtypes.h:279
Long64_t GetPos(Int_t) const
Get position of the element at index i.
Definition: TFPBlock.h:93
double Double_t
Definition: RtypesCore.h:55
The TTimeStamp encapsulates seconds and ns since EPOCH.
Definition: TTimeStamp.h:76
Int_t Lock()
Lock the mutex.
Definition: TMutex.cxx:46
The prefetching mechanism uses two classes (TFilePrefetch and TFPBlock) to prefetch in advance a bloc...
Definition: TFilePrefetch.h:53
virtual Bool_t SendFileReadProgress(TFile *)
char * GetBlockFromCache(const char *, Int_t)
Return a buffer from cache.
Int_t GetNoElem() const
Return number of elements in the block.
Definition: TFPBlock.h:87
Bool_t SetCache(const char *)
Set the path of the cache directory.
void ReadBlock(Long64_t *, Int_t *, Int_t)
Create a TFPBlock object or recycle one and add it to the prefetchBlocks list.
virtual void * OpenDirectory(const char *name)
Open a directory. Returns 0 if directory does not exist.
Definition: TSystem.cxx:830
Long64_t GetDataSize() const
Return size of the data in the block.
Definition: TFPBlock.h:75
double result[121]
unsigned char UChar_t
Definition: RtypesCore.h:34
const Bool_t kTRUE
Definition: Rtypes.h:91
TSemaphore * fSemWorkerMaster
Definition: TFilePrefetch.h:65
float value
Definition: math.cpp:443
TFPBlock * GetPendingBlock()
Safe method to remove a block from the pendingList.
virtual void Close(Option_t *option="")
Close a file.
Definition: TFile.cxx:898
TFPBlock * CreateBlockObj(Long64_t *, Int_t *, Int_t)
Create a new block or recycle an old one.