ROOT  6.07/01
Reference Guide
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
TFilePrefetch.cxx
Go to the documentation of this file.
1 // @(#)root/io:$Id$
2 // Author: Elvin Sindrilaru 19/05/2011
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2011, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 #include "TFilePrefetch.h"
13 #include "TTimeStamp.h"
14 #include "TVirtualPerfStats.h"
15 #include "TVirtualMonitoring.h"
16 
17 #include <iostream>
18 #include <string>
19 #include <sstream>
20 #include <cstdio>
21 #include <cstdlib>
22 #include <cctype>
23 
24 static const int kMAX_READ_SIZE = 2; //maximum size of the read list of blocks
25 
26 inline int xtod(char c) { return (c>='0' && c<='9') ? c-'0' : ((c>='A' && c<='F') ? c-'A'+10 : ((c>='a' && c<='f') ? c-'a'+10 : 0)); }
27 
28 using namespace std;
29 
31 
32 /**
33 \class TFilePrefetch
34 \ingroup IO
35 
36 The prefetching mechanism uses two classes (TFilePrefetch and
37 TFPBlock) to prefetch in advance a block of tree entries. There is
38 a thread which takes care of actually transferring the blocks and
39 making them available to the main requesting thread. Therefore,
40 the time spent by the main thread waiting for the data before
41 processing considerably decreases. Besides the prefetching
42 mechanisms there is also a local caching option which can be
43 enabled by the user. Both capabilities are disabled by default
44 and must be explicitly enabled by the user.
45 */
46 
47 
48 ////////////////////////////////////////////////////////////////////////////////
49 /// Constructor.
50 
51 TFilePrefetch::TFilePrefetch(TFile* file) :
52  fFile(file),
53  fConsumer(0),
54  fThreadJoined(kTRUE),
55  fPrefetchFinished(kFALSE)
56 {
57  fPendingBlocks = new TList();
58  fReadBlocks = new TList();
59 
60  fPendingBlocks->SetOwner();
61  fReadBlocks->SetOwner();
62 
63  fSemChangeFile = new TSemaphore(0);
64 }
65 
66 ////////////////////////////////////////////////////////////////////////////////
67 /// Destructor
68 
70 {
71  if (!fThreadJoined) {
72  WaitFinishPrefetch();
73  }
74 
75  SafeDelete(fConsumer);
76  SafeDelete(fPendingBlocks);
77  SafeDelete(fReadBlocks);
78  SafeDelete(fSemChangeFile);
79 }
80 
81 
82 ////////////////////////////////////////////////////////////////////////////////
83 /// Killing the async prefetching thread
84 
86 {
87  // Inform the consumer thread that prefetching is over
88  {
89  std::lock_guard<std::mutex> lk(fMutexPendingList);
90  fPrefetchFinished = kTRUE;
91  }
92  fNewBlockAdded.notify_one();
93 
94  fConsumer->Join();
95  fThreadJoined = kTRUE;
96  fPrefetchFinished = kFALSE;
97 }
98 
99 
100 ////////////////////////////////////////////////////////////////////////////////
101 /// Read one block and insert it in prefetchBuffers list.
102 
104 {
105  char* path = 0;
106 
107  if (CheckBlockInCache(path, block)){
108  block->SetBuffer(GetBlockFromCache(path, block->GetDataSize()));
109  inCache = kTRUE;
110  }
111  else{
112  fFile->ReadBuffers(block->GetBuffer(), block->GetPos(), block->GetLen(), block->GetNoElem());
113  if (fFile->GetArchive()) {
114  for (Int_t i = 0; i < block->GetNoElem(); i++)
115  block->SetPos(i, block->GetPos(i) - fFile->GetArchiveOffset());
116  }
117  inCache =kFALSE;
118  }
119  delete[] path;
120 }
121 
122 ////////////////////////////////////////////////////////////////////////////////
123 /// Get blocks specified in prefetchBlocks.
124 
126 {
127  Bool_t inCache = kFALSE;
128  TFPBlock* block = 0;
129 
130  while((block = GetPendingBlock())){
131  ReadAsync(block, inCache);
132  AddReadBlock(block);
133  if (!inCache)
134  SaveBlockInCache(block);
135  }
136 }
137 
138 ////////////////////////////////////////////////////////////////////////////////
139 /// Search for a requested element in a block and return the index.
140 
142 {
143  Int_t first = 0, last = -1, mid = -1;
144  last = (Int_t) blockObj->GetNoElem()-1;
145 
146  while (first <= last){
147  mid = first + (last - first) / 2;
148  if ((offset >= blockObj->GetPos(mid) && offset <= (blockObj->GetPos(mid) + blockObj->GetLen(mid))
149  && ( (offset + len) <= blockObj->GetPos(mid) + blockObj->GetLen(mid)))){
150 
151  *index = mid;
152  return true;
153  }
154  else if (blockObj->GetPos(mid) < offset){
155  first = mid + 1;
156  }
157  else{
158  last = mid - 1;
159  }
160  }
161  return false;
162 }
163 
164 ////////////////////////////////////////////////////////////////////////////////
165 /// Return the time spent wating for buffer to be read in microseconds.
166 
168 {
169  return Long64_t(fWaitTime.RealTime()*1.e+6);
170 }
171 
172 ////////////////////////////////////////////////////////////////////////////////
173 /// Return a prefetched element.
174 
176 {
177  Bool_t found = false;
178  TFPBlock* blockObj = 0;
179  Int_t index = -1;
180 
181  std::unique_lock<std::mutex> lk(fMutexReadList);
182  while (1){
183  TIter iter(fReadBlocks);
184  while ((blockObj = (TFPBlock*) iter.Next())){
185  index = -1;
186  if (BinarySearchReadList(blockObj, offset, len, &index)){
187  found = true;
188  break;
189  }
190  }
191  if (found)
192  break;
193  else{
194  fWaitTime.Start(kFALSE);
195  fReadBlockAdded.wait(lk); //wait for a new block to be added
196  fWaitTime.Stop();
197  }
198  }
199 
200  if (found){
201  char *pBuff = blockObj->GetPtrToPiece(index);
202  pBuff += (offset - blockObj->GetPos(index));
203  memcpy(buf, pBuff, len);
204  }
205  return found;
206 }
207 
208 ////////////////////////////////////////////////////////////////////////////////
209 /// Create a TFPBlock object or recycle one and add it to the prefetchBlocks list.
210 
212 {
213  TFPBlock* block = CreateBlockObj(offset, len, nblock);
214  AddPendingBlock(block);
215 }
216 
217 ////////////////////////////////////////////////////////////////////////////////
218 /// Safe method to add a block to the pendingList.
219 
221 {
222  fMutexPendingList.lock();
223  fPendingBlocks->Add(block);
224  fMutexPendingList.unlock();
225 
226  fNewBlockAdded.notify_one();
227 }
228 
229 ////////////////////////////////////////////////////////////////////////////////
230 /// Safe method to remove a block from the pendingList.
231 
233 {
234  TFPBlock* block = 0;
235 
236  // Use the semaphore to deal with the case when the file pointer
237  // is changed on the fly by TChain
238  fSemChangeFile->Post();
239  std::unique_lock<std::mutex> lk(fMutexPendingList);
240  // Wait unless there is a pending block or prefetching is over
241  fNewBlockAdded.wait(lk, [&]{ return fPendingBlocks->GetSize() > 0 || fPrefetchFinished; });
242  lk.unlock();
243  fSemChangeFile->Wait();
244 
245  lk.lock();
246  if (fPendingBlocks->GetSize()){
247  block = (TFPBlock*)fPendingBlocks->First();
248  block = (TFPBlock*)fPendingBlocks->Remove(block);
249  }
250  return block;
251 }
252 
253 ////////////////////////////////////////////////////////////////////////////////
254 /// Safe method to add a block to the readList.
255 
257 {
258  fMutexReadList.lock();
259 
260  if (fReadBlocks->GetSize() >= kMAX_READ_SIZE){
261  TFPBlock* movedBlock = (TFPBlock*) fReadBlocks->First();
262  movedBlock = (TFPBlock*)fReadBlocks->Remove(movedBlock);
263  delete movedBlock;
264  movedBlock = 0;
265  }
266 
267  fReadBlocks->Add(block);
268  fMutexReadList.unlock();
269 
270  //signal the addition of a new block
271  fReadBlockAdded.notify_one();
272 }
273 
274 
275 ////////////////////////////////////////////////////////////////////////////////
276 /// Create a new block or recycle an old one.
277 
279 {
280  TFPBlock* blockObj = 0;
281 
282  fMutexReadList.lock();
283 
284  if (fReadBlocks->GetSize() >= kMAX_READ_SIZE){
285  blockObj = static_cast<TFPBlock*>(fReadBlocks->First());
286  fReadBlocks->Remove(blockObj);
287  fMutexReadList.unlock();
288  blockObj->ReallocBlock(offset, len, noblock);
289  }
290  else{
291  fMutexReadList.unlock();
292  blockObj = new TFPBlock(offset, len, noblock);
293  }
294  return blockObj;
295 }
296 
297 ////////////////////////////////////////////////////////////////////////////////
298 /// Return reference to the consumer thread.
299 
301 {
302  return fConsumer;
303 }
304 
305 
306 ////////////////////////////////////////////////////////////////////////////////
307 /// Change the file
308 ///
309 /// When prefetching is enabled we also need to:
310 /// - make sure the async thread is not doing any work
311 /// - clear all blocks from prefetching and read list
312 /// - reset the file pointer
313 
314 void TFilePrefetch::SetFile(TFile *file)
315 {
316  if (!fThreadJoined) {
317  fSemChangeFile->Wait();
318  }
319 
320  if (fFile) {
321  // Remove all pending and read blocks
322  fMutexPendingList.lock();
323  fPendingBlocks->Clear();
324  fMutexPendingList.unlock();
325 
326  fMutexReadList.lock();
327  fReadBlocks->Clear();
328  fMutexReadList.unlock();
329  }
330 
331  fFile = file;
332  if (!fThreadJoined) {
333  fSemChangeFile->Post();
334  }
335 }
336 
337 
338 ////////////////////////////////////////////////////////////////////////////////
339 /// Used to start the consumer thread.
340 
342 {
343  int rc;
344 
345  fConsumer = new TThread((TThread::VoidRtnFunc_t) ThreadProc, (void*) this);
346  rc = fConsumer->Run();
347  if ( !rc ) {
348  fThreadJoined = kFALSE;
349  }
350  return rc;
351 }
352 
353 
354 ////////////////////////////////////////////////////////////////////////////////
355 /// Execution loop of the consumer thread.
356 
358 {
359  TFilePrefetch* pClass = (TFilePrefetch*) arg;
360 
361  while (!pClass->IsPrefetchFinished()) {
362  pClass->ReadListOfBlocks();
363  }
364 
365  return (TThread::VoidRtnFunc_t) 1;
366 }
367 
368 //############################# CACHING PART ###################################
369 
370 ////////////////////////////////////////////////////////////////////////////////
371 /// Sum up individual hex values to obtain a decimal value.
372 
374 {
375  Int_t result = 0;
376  const char* ptr = hex;
377 
378  for(Int_t i=0; i < (Int_t)strlen(hex); i++)
379  result += xtod(ptr[i]);
380 
381  return result;
382 }
383 
384 ////////////////////////////////////////////////////////////////////////////////
385 /// Test if the block is in cache.
386 
388 {
389  if (fPathCache == "")
390  return false;
391 
392  Bool_t found = false;
393  TString fullPath(fPathCache); // path of the cached files.
394 
395  Int_t value = 0;
396 
397  if (!gSystem->OpenDirectory(fullPath))
398  gSystem->mkdir(fullPath);
399 
400  //dir is SHA1 value modulo 16; filename is the value of the SHA1(offset+len)
401  TMD5* md = new TMD5();
402 
403  TString concatStr;
404  for (Int_t i=0; i < block->GetNoElem(); i++){
405  concatStr.Form("%lld", block->GetPos(i));
406  md->Update((UChar_t*)concatStr.Data(), concatStr.Length());
407  }
408 
409  md->Final();
410  TString fileName( md->AsString() );
411  value = SumHex(fileName);
412  value = value % 16;
414  dirName.Form("%i", value);
415 
416  fullPath += "/" + dirName + "/" + fileName;
417 
418  FileStat_t stat;
419  if (gSystem->GetPathInfo(fullPath, stat) == 0) {
420  path = new char[fullPath.Length() + 1];
421  strlcpy(path, fullPath,fullPath.Length() + 1);
422  found = true;
423  } else
424  found = false;
425 
426  delete md;
427  return found;
428 }
429 
430 ////////////////////////////////////////////////////////////////////////////////
431 /// Return a buffer from cache.
432 
434 {
435  char *buffer = 0;
436  TString strPath = path;
437 
438  strPath += "?filetype=raw";
439  TFile* file = new TFile(strPath);
440 
441  Double_t start = 0;
442  if (gPerfStats != 0) start = TTimeStamp();
443 
444  buffer = (char*) calloc(length, sizeof(char));
445  file->ReadBuffer(buffer, 0, length);
446 
447  fFile->fBytesRead += length;
448  fFile->fgBytesRead += length;
449  fFile->SetReadCalls(fFile->GetReadCalls() + 1);
450  fFile->fgReadCalls++;
451 
452  if (gMonitoringWriter)
454  if (gPerfStats != 0) {
455  gPerfStats->FileReadEvent(fFile, length, start);
456  }
457 
458  file->Close();
459  delete file;
460  return buffer;
461 }
462 
463 ////////////////////////////////////////////////////////////////////////////////
464 /// Save the block content in cache.
465 
467 {
468  if (fPathCache == "")
469  return;
470 
471  //dir is SHA1 value modulo 16; filename is the value of the SHA1
472  TMD5* md = new TMD5();
473 
474  TString concatStr;
475  for(Int_t i=0; i< block->GetNoElem(); i++){
476  concatStr.Form("%lld", block->GetPos(i));
477  md->Update((UChar_t*)concatStr.Data(), concatStr.Length());
478  }
479  md->Final();
480 
481  TString fileName( md->AsString() );
482  Int_t value = SumHex(fileName);
483  value = value % 16;
484 
485  TString fullPath( fPathCache );
487  dirName.Form("%i", value);
488  fullPath += ("/" + dirName);
489 
490  if (!gSystem->OpenDirectory(fullPath))
491  gSystem->mkdir(fullPath);
492 
493  TFile* file = 0;
494  fullPath += ("/" + fileName);
495  FileStat_t stat;
496  if (gSystem->GetPathInfo(fullPath, stat) == 0) {
497  fullPath += "?filetype=raw";
498  file = TFile::Open(fullPath, "update");
499  } else{
500  fullPath += "?filetype=raw";
501  file = TFile::Open(fullPath, "new");
502  }
503 
504  if (file) {
505  // coverity[unchecked_value] We do not print error message, have not error
506  // return code and close the file anyway, not need to check the return value.
507  file->WriteBuffer(block->GetBuffer(), block->GetDataSize());
508  file->Close();
509  delete file;
510  }
511  delete md;
512 }
513 
514 
515 ////////////////////////////////////////////////////////////////////////////////
516 /// Set the path of the cache directory.
517 
519 {
520  fPathCache = path;
521 
522  if (!gSystem->OpenDirectory(path)){
523  return (!gSystem->mkdir(path) ? true : false);
524  }
525 
526  // Directory already exists
527  return true;
528 }
529 
void AddPendingBlock(TFPBlock *)
Safe method to add a block to the pendingList.
void ReadAsync(TFPBlock *, Bool_t &)
Read one block and insert it in prefetchBuffers list.
Bool_t BinarySearchReadList(TFPBlock *, Long64_t, Int_t, Int_t *)
Search for a requested element in a block and return the index.
tuple buffer
Definition: tree.py:99
long long Long64_t
Definition: RtypesCore.h:69
unsigned int hex
Definition: math.cpp:442
void ReallocBlock(Long64_t *, Int_t *, Int_t)
Reallocate the block's buffer based on the length of the elements it will contain.
Definition: TFPBlock.cxx:94
Bool_t CheckBlockInCache(char *&, TFPBlock *)
Test if the block is in cache.
void Final()
MD5 finalization, ends an MD5 message-digest operation, writing the the message digest and zeroizing ...
Definition: TMD5.cxx:166
char * GetPtrToPiece(Int_t index) const
Get block buffer.
Definition: TFPBlock.h:111
Ssiz_t Length() const
Definition: TString.h:390
return c
static TThread::VoidRtnFunc_t ThreadProc(void *)
Execution loop of the consumer thread.
tuple offset
Definition: tree.py:93
int GetPathInfo(const char *path, Long_t *id, Long_t *size, Long_t *flags, Long_t *modtime)
Get info about a file: id, size, flags, modification time.
Definition: TSystem.cxx:1311
int xtod(char c)
static const int kMAX_READ_SIZE
Long64_t GetWaitTime()
Return the time spent wating for buffer to be read in microseconds.
Basic string class.
Definition: TString.h:137
int Int_t
Definition: RtypesCore.h:41
bool Bool_t
Definition: RtypesCore.h:59
const Bool_t kFALSE
Definition: Rtypes.h:92
Int_t ThreadStart()
Used to start the consumer thread.
virtual int mkdir(const char *name, Bool_t recursive=kFALSE)
Make a file system directory.
Definition: TSystem.cxx:884
void SaveBlockInCache(TFPBlock *)
Save the block content in cache.
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=1, Int_t netopt=0)
Create / open a file.
Definition: TFile.cxx:3851
const char * Data() const
Definition: TString.h:349
#define SafeDelete(p)
Definition: RConfig.h:436
void SetPos(Int_t, Long64_t)
Set pos value for index idx.
Definition: TFPBlock.cxx:72
const char * AsString() const
Return message digest as string.
Definition: TMD5.cxx:219
This code implements the MD5 message-digest algorithm.
Definition: TMD5.h:46
std::map< std::string, std::string >::const_iterator iter
Definition: TAlienJob.cxx:54
R__EXTERN TVirtualMonitoringWriter * gMonitoringWriter
Bool_t IsPrefetchFinished() const
void SetBuffer(char *)
Set block buffer.
Definition: TFPBlock.cxx:81
void SetFile(TFile *)
Change the file.
This class represents the encapsulation of a block request.
Definition: TFPBlock.h:24
A doubly linked list.
Definition: TList.h:47
void AddReadBlock(TFPBlock *)
Safe method to add a block to the readList.
TThread * GetThread() const
Return reference to the consumer thread.
char * GetBuffer() const
Get block buffer.
Definition: TFPBlock.h:105
TString dirName
Definition: demos.C:9
Double_t length(const TVector2 &v)
Definition: CsgOps.cxx:347
void Update(const UChar_t *buf, UInt_t len)
Update TMD5 object to reflect the concatenation of another buffer full of bytes.
Definition: TMD5.cxx:107
R__EXTERN TSystem * gSystem
Definition: TSystem.h:545
void WaitFinishPrefetch()
Killing the async prefetching thread.
TObject * Next()
Definition: TCollection.h:158
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
Definition: TString.cxx:2308
bool first
Definition: line3Dfit.C:48
Int_t GetLen(Int_t) const
Get length of the element at index i.
Definition: TFPBlock.h:99
Bool_t ReadBuffer(char *, Long64_t, Int_t)
Return a prefetched element.
Int_t SumHex(const char *)
Sum up individual hex values to obtain a decimal value.
#define gPerfStats
void ReadListOfBlocks()
Get blocks specified in prefetchBlocks.
virtual ~TFilePrefetch()
Destructor.
#define ClassImp(name)
Definition: Rtypes.h:279
void *(* VoidRtnFunc_t)(void *)
Definition: TThread.h:59
tuple file
Definition: fildir.py:20
Long64_t GetPos(Int_t) const
Get position of the element at index i.
Definition: TFPBlock.h:93
double Double_t
Definition: RtypesCore.h:55
The TTimeStamp encapsulates seconds and ns since EPOCH.
Definition: TTimeStamp.h:76
The prefetching mechanism uses two classes (TFilePrefetch and TFPBlock) to prefetch in advance a bloc...
Definition: TFilePrefetch.h:51
virtual Bool_t SendFileReadProgress(TFile *)
char * GetBlockFromCache(const char *, Int_t)
Return a buffer from cache.
Int_t GetNoElem() const
Return number of elements in the block.
Definition: TFPBlock.h:87
Bool_t SetCache(const char *)
Set the path of the cache directory.
void ReadBlock(Long64_t *, Int_t *, Int_t)
Create a TFPBlock object or recycle one and add it to the prefetchBlocks list.
virtual void * OpenDirectory(const char *name)
Open a directory. Returns 0 if directory does not exist.
Definition: TSystem.cxx:830
Long64_t GetDataSize() const
Return size of the data in the block.
Definition: TFPBlock.h:75
double result[121]
unsigned char UChar_t
Definition: RtypesCore.h:34
const Bool_t kTRUE
Definition: Rtypes.h:91
float value
Definition: math.cpp:443
TFPBlock * GetPendingBlock()
Safe method to remove a block from the pendingList.
TFPBlock * CreateBlockObj(Long64_t *, Int_t *, Int_t)
Create a new block or recycle an old one.