Logo ROOT   6.08/07
Reference Guide
TFilePrefetch.cxx
Go to the documentation of this file.
1 // @(#)root/io:$Id$
2 // Author: Elvin Sindrilaru 19/05/2011
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2011, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 #include "TFilePrefetch.h"
13 #include "TTimeStamp.h"
14 #include "TVirtualPerfStats.h"
15 #include "TVirtualMonitoring.h"
16 
17 #include <iostream>
18 #include <string>
19 #include <sstream>
20 #include <cstdio>
21 #include <cstdlib>
22 #include <cctype>
23 
24 static const int kMAX_READ_SIZE = 2; //maximum size of the read list of blocks
25 
26 inline int xtod(char c) { return (c>='0' && c<='9') ? c-'0' : ((c>='A' && c<='F') ? c-'A'+10 : ((c>='a' && c<='f') ? c-'a'+10 : 0)); }
27 
28 using namespace std;
29 
31 
32 /**
33 \class TFilePrefetch
34 \ingroup IO
35 
36 The prefetching mechanism uses two classes (TFilePrefetch and
37 TFPBlock) to prefetch in advance a block of tree entries. There is
38 a thread which takes care of actually transferring the blocks and
39 making them available to the main requesting thread. Therefore,
40 the time spent by the main thread waiting for the data before
41 processing considerably decreases. Besides the prefetching
42 mechanisms there is also a local caching option which can be
43 enabled by the user. Both capabilities are disabled by default
44 and must be explicitly enabled by the user.
45 */
46 
47 
48 ////////////////////////////////////////////////////////////////////////////////
49 /// Constructor.
50 
52  fFile(file),
53  fConsumer(0),
54  fThreadJoined(kTRUE),
55  fPrefetchFinished(kFALSE)
56 {
57  fPendingBlocks = new TList();
58  fReadBlocks = new TList();
59 
60  fPendingBlocks->SetOwner();
61  fReadBlocks->SetOwner();
62 
63  fSemChangeFile = new TSemaphore(0);
64 }
65 
66 ////////////////////////////////////////////////////////////////////////////////
67 /// Destructor
68 
70 {
71  if (!fThreadJoined) {
72  WaitFinishPrefetch();
73  }
74 
75  SafeDelete(fConsumer);
76  SafeDelete(fPendingBlocks);
77  SafeDelete(fReadBlocks);
78  SafeDelete(fSemChangeFile);
79 }
80 
81 
82 ////////////////////////////////////////////////////////////////////////////////
83 /// Killing the async prefetching thread
84 
86 {
87  // Inform the consumer thread that prefetching is over
88  {
89  std::lock_guard<std::mutex> lk(fMutexPendingList);
90  fPrefetchFinished = kTRUE;
91  }
92  fNewBlockAdded.notify_one();
93 
94  fConsumer->Join();
95  fThreadJoined = kTRUE;
96  fPrefetchFinished = kFALSE;
97 }
98 
99 
100 ////////////////////////////////////////////////////////////////////////////////
101 /// Read one block and insert it in prefetchBuffers list.
102 
104 {
105  char* path = 0;
106 
107  if (CheckBlockInCache(path, block)){
108  block->SetBuffer(GetBlockFromCache(path, block->GetDataSize()));
109  inCache = kTRUE;
110  }
111  else{
112  fFile->ReadBuffers(block->GetBuffer(), block->GetPos(), block->GetLen(), block->GetNoElem());
113  if (fFile->GetArchive()) {
114  for (Int_t i = 0; i < block->GetNoElem(); i++)
115  block->SetPos(i, block->GetPos(i) - fFile->GetArchiveOffset());
116  }
117  inCache =kFALSE;
118  }
119  delete[] path;
120 }
121 
122 ////////////////////////////////////////////////////////////////////////////////
123 /// Get blocks specified in prefetchBlocks.
124 
126 {
127  Bool_t inCache = kFALSE;
128  TFPBlock* block = 0;
129 
130  while((block = GetPendingBlock())){
131  ReadAsync(block, inCache);
132  AddReadBlock(block);
133  if (!inCache)
134  SaveBlockInCache(block);
135  }
136 }
137 
138 ////////////////////////////////////////////////////////////////////////////////
139 /// Search for a requested element in a block and return the index.
140 
142 {
143  Int_t first = 0, last = -1, mid = -1;
144  last = (Int_t) blockObj->GetNoElem()-1;
145 
146  while (first <= last){
147  mid = first + (last - first) / 2;
148  if ((offset >= blockObj->GetPos(mid) && offset <= (blockObj->GetPos(mid) + blockObj->GetLen(mid))
149  && ( (offset + len) <= blockObj->GetPos(mid) + blockObj->GetLen(mid)))){
150 
151  *index = mid;
152  return true;
153  }
154  else if (blockObj->GetPos(mid) < offset){
155  first = mid + 1;
156  }
157  else{
158  last = mid - 1;
159  }
160  }
161  return false;
162 }
163 
164 ////////////////////////////////////////////////////////////////////////////////
165 /// Return the time spent wating for buffer to be read in microseconds.
166 
168 {
169  return Long64_t(fWaitTime.RealTime()*1.e+6);
170 }
171 
172 ////////////////////////////////////////////////////////////////////////////////
173 /// Return a prefetched element.
174 
176 {
177  Bool_t found = false;
178  TFPBlock* blockObj = 0;
179  Int_t index = -1;
180 
181  std::unique_lock<std::mutex> lk(fMutexReadList);
182  while (1){
183  TIter iter(fReadBlocks);
184  while ((blockObj = (TFPBlock*) iter.Next())){
185  index = -1;
186  if (BinarySearchReadList(blockObj, offset, len, &index)){
187  found = true;
188  break;
189  }
190  }
191  if (found)
192  break;
193  else{
194  fWaitTime.Start(kFALSE);
195  fReadBlockAdded.wait(lk); //wait for a new block to be added
196  fWaitTime.Stop();
197  }
198  }
199 
200  if (found){
201  char *pBuff = blockObj->GetPtrToPiece(index);
202  pBuff += (offset - blockObj->GetPos(index));
203  memcpy(buf, pBuff, len);
204  }
205  return found;
206 }
207 
208 ////////////////////////////////////////////////////////////////////////////////
209 /// Create a TFPBlock object or recycle one and add it to the prefetchBlocks list.
210 
211 void TFilePrefetch::ReadBlock(Long64_t* offset, Int_t* len, Int_t nblock)
212 {
213  TFPBlock* block = CreateBlockObj(offset, len, nblock);
214  AddPendingBlock(block);
215 }
216 
217 ////////////////////////////////////////////////////////////////////////////////
218 /// Safe method to add a block to the pendingList.
219 
221 {
222  fMutexPendingList.lock();
223  fPendingBlocks->Add(block);
224  fMutexPendingList.unlock();
225 
226  fNewBlockAdded.notify_one();
227 }
228 
229 ////////////////////////////////////////////////////////////////////////////////
230 /// Safe method to remove a block from the pendingList.
231 
233 {
234  TFPBlock* block = 0;
235 
236  // Use the semaphore to deal with the case when the file pointer
237  // is changed on the fly by TChain
238  fSemChangeFile->Post();
239  std::unique_lock<std::mutex> lk(fMutexPendingList);
240  // Wait unless there is a pending block or prefetching is over
241  fNewBlockAdded.wait(lk, [&]{ return fPendingBlocks->GetSize() > 0 || fPrefetchFinished; });
242  lk.unlock();
243  fSemChangeFile->Wait();
244 
245  lk.lock();
246  if (fPendingBlocks->GetSize()){
247  block = (TFPBlock*)fPendingBlocks->First();
248  block = (TFPBlock*)fPendingBlocks->Remove(block);
249  }
250  return block;
251 }
252 
253 ////////////////////////////////////////////////////////////////////////////////
254 /// Safe method to add a block to the readList.
255 
257 {
258  fMutexReadList.lock();
259 
260  if (fReadBlocks->GetSize() >= kMAX_READ_SIZE){
261  TFPBlock* movedBlock = (TFPBlock*) fReadBlocks->First();
262  movedBlock = (TFPBlock*)fReadBlocks->Remove(movedBlock);
263  delete movedBlock;
264  movedBlock = 0;
265  }
266 
267  fReadBlocks->Add(block);
268  fMutexReadList.unlock();
269 
270  //signal the addition of a new block
271  fReadBlockAdded.notify_one();
272 }
273 
274 
275 ////////////////////////////////////////////////////////////////////////////////
276 /// Create a new block or recycle an old one.
277 
279 {
280  TFPBlock* blockObj = 0;
281 
282  fMutexReadList.lock();
283 
284  if (fReadBlocks->GetSize() >= kMAX_READ_SIZE){
285  blockObj = static_cast<TFPBlock*>(fReadBlocks->First());
286  fReadBlocks->Remove(blockObj);
287  fMutexReadList.unlock();
288  blockObj->ReallocBlock(offset, len, noblock);
289  }
290  else{
291  fMutexReadList.unlock();
292  blockObj = new TFPBlock(offset, len, noblock);
293  }
294  return blockObj;
295 }
296 
297 ////////////////////////////////////////////////////////////////////////////////
298 /// Return reference to the consumer thread.
299 
301 {
302  return fConsumer;
303 }
304 
305 
306 ////////////////////////////////////////////////////////////////////////////////
307 /// Change the file
308 ///
309 /// When prefetching is enabled we also need to:
310 /// - make sure the async thread is not doing any work
311 /// - clear all blocks from prefetching and read list
312 /// - reset the file pointer
313 
315 {
316  if (!fThreadJoined) {
317  fSemChangeFile->Wait();
318  }
319 
320  if (fFile) {
321  // Remove all pending and read blocks
322  fMutexPendingList.lock();
323  fPendingBlocks->Clear();
324  fMutexPendingList.unlock();
325 
326  fMutexReadList.lock();
327  fReadBlocks->Clear();
328  fMutexReadList.unlock();
329  }
330 
331  fFile = file;
332  if (!fThreadJoined) {
333  fSemChangeFile->Post();
334  }
335 }
336 
337 
338 ////////////////////////////////////////////////////////////////////////////////
339 /// Used to start the consumer thread.
340 
342 {
343  int rc;
344 
345  fConsumer = new TThread((TThread::VoidRtnFunc_t) ThreadProc, (void*) this);
346  rc = fConsumer->Run();
347  if ( !rc ) {
348  fThreadJoined = kFALSE;
349  }
350  return rc;
351 }
352 
353 
354 ////////////////////////////////////////////////////////////////////////////////
355 /// Execution loop of the consumer thread.
356 
358 {
359  TFilePrefetch* pClass = (TFilePrefetch*) arg;
360 
361  while (!pClass->IsPrefetchFinished()) {
362  pClass->ReadListOfBlocks();
363  }
364 
365  return (TThread::VoidRtnFunc_t) 1;
366 }
367 
368 //############################# CACHING PART ###################################
369 
370 ////////////////////////////////////////////////////////////////////////////////
371 /// Sum up individual hex values to obtain a decimal value.
372 
373 Int_t TFilePrefetch::SumHex(const char *hex)
374 {
375  Int_t result = 0;
376  const char* ptr = hex;
377 
378  for(Int_t i=0; i < (Int_t)strlen(hex); i++)
379  result += xtod(ptr[i]);
380 
381  return result;
382 }
383 
384 ////////////////////////////////////////////////////////////////////////////////
385 /// Test if the block is in cache.
386 
388 {
389  if (fPathCache == "")
390  return false;
391 
392  Bool_t found = false;
393  TString fullPath(fPathCache); // path of the cached files.
394 
395  Int_t value = 0;
396 
397  if (!gSystem->OpenDirectory(fullPath))
398  gSystem->mkdir(fullPath);
399 
400  //dir is SHA1 value modulo 16; filename is the value of the SHA1(offset+len)
401  TMD5* md = new TMD5();
402 
403  TString concatStr;
404  for (Int_t i=0; i < block->GetNoElem(); i++){
405  concatStr.Form("%lld", block->GetPos(i));
406  md->Update((UChar_t*)concatStr.Data(), concatStr.Length());
407  }
408 
409  md->Final();
410  TString fileName( md->AsString() );
411  value = SumHex(fileName);
412  value = value % 16;
413  TString dirName;
414  dirName.Form("%i", value);
415 
416  fullPath += "/" + dirName + "/" + fileName;
417 
418  FileStat_t stat;
419  if (gSystem->GetPathInfo(fullPath, stat) == 0) {
420  path = new char[fullPath.Length() + 1];
421  strlcpy(path, fullPath,fullPath.Length() + 1);
422  found = true;
423  } else
424  found = false;
425 
426  delete md;
427  return found;
428 }
429 
430 ////////////////////////////////////////////////////////////////////////////////
431 /// Return a buffer from cache.
432 
433 char* TFilePrefetch::GetBlockFromCache(const char* path, Int_t length)
434 {
435  char *buffer = 0;
436  TString strPath = path;
437 
438  strPath += "?filetype=raw";
439  TFile* file = new TFile(strPath);
440 
441  Double_t start = 0;
442  if (gPerfStats != 0) start = TTimeStamp();
443 
444  buffer = (char*) calloc(length, sizeof(char));
445  file->ReadBuffer(buffer, 0, length);
446 
447  fFile->fBytesRead += length;
448  fFile->fgBytesRead += length;
449  fFile->SetReadCalls(fFile->GetReadCalls() + 1);
450  fFile->fgReadCalls++;
451 
452  if (gMonitoringWriter)
454  if (gPerfStats != 0) {
455  gPerfStats->FileReadEvent(fFile, length, start);
456  }
457 
458  file->Close();
459  delete file;
460  return buffer;
461 }
462 
463 ////////////////////////////////////////////////////////////////////////////////
464 /// Save the block content in cache.
465 
467 {
468  if (fPathCache == "")
469  return;
470 
471  //dir is SHA1 value modulo 16; filename is the value of the SHA1
472  TMD5* md = new TMD5();
473 
474  TString concatStr;
475  for(Int_t i=0; i< block->GetNoElem(); i++){
476  concatStr.Form("%lld", block->GetPos(i));
477  md->Update((UChar_t*)concatStr.Data(), concatStr.Length());
478  }
479  md->Final();
480 
481  TString fileName( md->AsString() );
482  Int_t value = SumHex(fileName);
483  value = value % 16;
484 
485  TString fullPath( fPathCache );
486  TString dirName;
487  dirName.Form("%i", value);
488  fullPath += ("/" + dirName);
489 
490  if (!gSystem->OpenDirectory(fullPath))
491  gSystem->mkdir(fullPath);
492 
493  TFile* file = 0;
494  fullPath += ("/" + fileName);
495  FileStat_t stat;
496  if (gSystem->GetPathInfo(fullPath, stat) == 0) {
497  fullPath += "?filetype=raw";
498  file = TFile::Open(fullPath, "update");
499  } else{
500  fullPath += "?filetype=raw";
501  file = TFile::Open(fullPath, "new");
502  }
503 
504  if (file) {
505  // coverity[unchecked_value] We do not print error message, have not error
506  // return code and close the file anyway, not need to check the return value.
507  file->WriteBuffer(block->GetBuffer(), block->GetDataSize());
508  file->Close();
509  delete file;
510  }
511  delete md;
512 }
513 
514 
515 ////////////////////////////////////////////////////////////////////////////////
516 /// Set the path of the cache directory.
517 
519 {
520  fPathCache = path;
521 
522  if (!gSystem->OpenDirectory(path)){
523  return (!gSystem->mkdir(path) ? true : false);
524  }
525 
526  // Directory already exists
527  return true;
528 }
529 
void AddPendingBlock(TFPBlock *)
Safe method to add a block to the pendingList.
void *(* VoidRtnFunc_t)(void *)
Definition: TThread.h:59
void ReadAsync(TFPBlock *, Bool_t &)
Read one block and insert it in prefetchBuffers list.
Bool_t BinarySearchReadList(TFPBlock *, Long64_t, Int_t, Int_t *)
Search for a requested element in a block and return the index.
long long Long64_t
Definition: RtypesCore.h:69
you should not use this method at all Int_t Int_t Double_t Double_t Double_t Int_t mid
Definition: TRolke.cxx:630
void ReallocBlock(Long64_t *, Int_t *, Int_t)
Reallocate the block&#39;s buffer based on the length of the elements it will contain.
Definition: TFPBlock.cxx:94
Bool_t CheckBlockInCache(char *&, TFPBlock *)
Test if the block is in cache.
void Final()
MD5 finalization, ends an MD5 message-digest operation, writing the the message digest and zeroizing ...
Definition: TMD5.cxx:167
Bool_t IsPrefetchFinished() const
return c
static TThread::VoidRtnFunc_t ThreadProc(void *)
Execution loop of the consumer thread.
int GetPathInfo(const char *path, Long_t *id, Long_t *size, Long_t *flags, Long_t *modtime)
Get info about a file: id, size, flags, modification time.
Definition: TSystem.cxx:1364
int xtod(char c)
static const int kMAX_READ_SIZE
Long64_t GetWaitTime()
Return the time spent wating for buffer to be read in microseconds.
const char * AsString() const
Return message digest as string.
Definition: TMD5.cxx:220
Int_t GetNoElem() const
Return number of elements in the block.
Definition: TFPBlock.h:87
char * GetPtrToPiece(Int_t index) const
Get block buffer.
Definition: TFPBlock.h:111
int Int_t
Definition: RtypesCore.h:41
bool Bool_t
Definition: RtypesCore.h:59
const Bool_t kFALSE
Definition: Rtypes.h:92
STL namespace.
Int_t ThreadStart()
Used to start the consumer thread.
virtual int mkdir(const char *name, Bool_t recursive=kFALSE)
Make a file system directory.
Definition: TSystem.cxx:901
char * GetBuffer() const
Get block buffer.
Definition: TFPBlock.h:105
void SaveBlockInCache(TFPBlock *)
Save the block content in cache.
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=1, Int_t netopt=0)
Create / open a file.
Definition: TFile.cxx:3907
#define SafeDelete(p)
Definition: RConfig.h:507
void SetPos(Int_t, Long64_t)
Set pos value for index idx.
Definition: TFPBlock.cxx:72
This code implements the MD5 message-digest algorithm.
Definition: TMD5.h:46
R__EXTERN TVirtualMonitoringWriter * gMonitoringWriter
void SetBuffer(char *)
Set block buffer.
Definition: TFPBlock.cxx:81
void SetFile(TFile *)
Change the file.
This class represents the encapsulation of a block request.
Definition: TFPBlock.h:24
A doubly linked list.
Definition: TList.h:47
void AddReadBlock(TFPBlock *)
Safe method to add a block to the readList.
void Update(const UChar_t *buf, UInt_t len)
Update TMD5 object to reflect the concatenation of another buffer full of bytes.
Definition: TMD5.cxx:108
R__EXTERN TSystem * gSystem
Definition: TSystem.h:549
void WaitFinishPrefetch()
Killing the async prefetching thread.
TObject * Next()
Definition: TCollection.h:158
#define calloc
Definition: civetweb.c:819
Bool_t ReadBuffer(char *, Long64_t, Int_t)
Return a prefetched element.
Long64_t GetPos(Int_t) const
Get position of the element at index i.
Definition: TFPBlock.h:93
Int_t SumHex(const char *)
Sum up individual hex values to obtain a decimal value.
#define gPerfStats
Long64_t GetDataSize() const
Return size of the data in the block.
Definition: TFPBlock.h:75
void ReadListOfBlocks()
Get blocks specified in prefetchBlocks.
virtual ~TFilePrefetch()
Destructor.
#define ClassImp(name)
Definition: Rtypes.h:279
double Double_t
Definition: RtypesCore.h:55
TThread * GetThread() const
Return reference to the consumer thread.
The TTimeStamp encapsulates seconds and ns since EPOCH.
Definition: TTimeStamp.h:76
The prefetching mechanism uses two classes (TFilePrefetch and TFPBlock) to prefetch in advance a bloc...
Definition: TFilePrefetch.h:51
Int_t GetLen(Int_t) const
Get length of the element at index i.
Definition: TFPBlock.h:99
virtual Bool_t SendFileReadProgress(TFile *)
char * GetBlockFromCache(const char *, Int_t)
Return a buffer from cache.
Definition: file.py:1
Bool_t SetCache(const char *)
Set the path of the cache directory.
void ReadBlock(Long64_t *, Int_t *, Int_t)
Create a TFPBlock object or recycle one and add it to the prefetchBlocks list.
virtual void * OpenDirectory(const char *name)
Open a directory. Returns 0 if directory does not exist.
Definition: TSystem.cxx:831
double result[121]
unsigned char UChar_t
Definition: RtypesCore.h:34
Definition: first.py:1
const Bool_t kTRUE
Definition: Rtypes.h:91
TFPBlock * GetPendingBlock()
Safe method to remove a block from the pendingList.
TFPBlock * CreateBlockObj(Long64_t *, Int_t *, Int_t)
Create a new block or recycle an old one.