Logo ROOT  
Reference Guide
TFilePrefetch.cxx
Go to the documentation of this file.
1// @(#)root/io:$Id$
2// Author: Elvin Sindrilaru 19/05/2011
3
4/*************************************************************************
5 * Copyright (C) 1995-2011, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12#include "TFilePrefetch.h"
13#include "TTimeStamp.h"
14#include "TSystem.h"
15#include "TMD5.h"
16#include "TVirtualPerfStats.h"
17#include "TVirtualMonitoring.h"
18#include "TSemaphore.h"
19#include "TFPBlock.h"
20#include "strlcpy.h"
21
22#include <string>
23#include <sstream>
24#include <cstdio>
25#include <cstdlib>
26#include <cctype>
27#include <cassert>
28
29static const int kMAX_READ_SIZE = 2; //maximum size of the read list of blocks
30
31inline int xtod(char c) { return (c>='0' && c<='9') ? c-'0' : ((c>='A' && c<='F') ? c-'A'+10 : ((c>='a' && c<='f') ? c-'a'+10 : 0)); }
32
33using namespace std;
34
36
37/**
38\class TFilePrefetch
39\ingroup IO
40
41The prefetching mechanism uses two classes (TFilePrefetch and
42TFPBlock) to prefetch in advance a block of tree entries. There is
43a thread which takes care of actually transferring the blocks and
44making them available to the main requesting thread. Therefore,
45the time spent by the main thread waiting for the data before
46processing considerably decreases. Besides the prefetching
47mechanisms there is also a local caching option which can be
48enabled by the user. Both capabilities are disabled by default
49and must be explicitly enabled by the user.
50*/
51
52
53////////////////////////////////////////////////////////////////////////////////
54/// Constructor.
55
57 fFile(file),
58 fConsumer(0),
59 fThreadJoined(kTRUE),
60 fPrefetchFinished(kFALSE)
61{
62 fPendingBlocks = new TList();
63 fReadBlocks = new TList();
64
67
69}
70
71////////////////////////////////////////////////////////////////////////////////
72/// Destructor
73
75{
76 if (!fThreadJoined) {
78 }
79
84}
85
86
87////////////////////////////////////////////////////////////////////////////////
88/// Killing the async prefetching thread
89
91{
92 // Inform the consumer thread that prefetching is over
93 {
94 std::lock_guard<std::mutex> lk(fMutexPendingList);
96 }
97 fNewBlockAdded.notify_one();
98
99 fConsumer->Join();
102}
103
104
105////////////////////////////////////////////////////////////////////////////////
106/// Read one block and insert it in prefetchBuffers list.
107
109{
110 char* path = 0;
111
112 if (CheckBlockInCache(path, block)){
113 block->SetBuffer(GetBlockFromCache(path, block->GetDataSize()));
114 inCache = kTRUE;
115 }
116 else{
117 fFile->ReadBuffers(block->GetBuffer(), block->GetPos(), block->GetLen(), block->GetNoElem());
118 if (fFile->GetArchive()) {
119 for (Int_t i = 0; i < block->GetNoElem(); i++)
120 block->SetPos(i, block->GetPos(i) - fFile->GetArchiveOffset());
121 }
122 inCache =kFALSE;
123 }
124 delete[] path;
125}
126
127////////////////////////////////////////////////////////////////////////////////
128/// Get blocks specified in prefetchBlocks.
129
131{
132 Bool_t inCache = kFALSE;
133 TFPBlock* block = 0;
134
135 while((block = GetPendingBlock())){
136 ReadAsync(block, inCache);
137 AddReadBlock(block);
138 if (!inCache)
139 SaveBlockInCache(block);
140 }
141}
142
143////////////////////////////////////////////////////////////////////////////////
144/// Search for a requested element in a block and return the index.
145
147{
148 Int_t first = 0, last = -1, mid = -1;
149 last = (Int_t) blockObj->GetNoElem()-1;
150
151 while (first <= last){
152 mid = first + (last - first) / 2;
153 if ((offset >= blockObj->GetPos(mid) && offset <= (blockObj->GetPos(mid) + blockObj->GetLen(mid))
154 && ( (offset + len) <= blockObj->GetPos(mid) + blockObj->GetLen(mid)))){
155
156 *index = mid;
157 return true;
158 }
159 else if (blockObj->GetPos(mid) < offset){
160 first = mid + 1;
161 }
162 else{
163 last = mid - 1;
164 }
165 }
166 return false;
167}
168
169////////////////////////////////////////////////////////////////////////////////
170/// Return the time spent wating for buffer to be read in microseconds.
171
173{
174 return Long64_t(fWaitTime.RealTime()*1.e+6);
175}
176
177////////////////////////////////////////////////////////////////////////////////
178/// Return a prefetched element.
179
181{
182 Bool_t found = false;
183 TFPBlock* blockObj = 0;
184 Int_t index = -1;
185
186 std::unique_lock<std::mutex> lk(fMutexReadList);
187 while (1){
188 TIter iter(fReadBlocks);
189 while ((blockObj = (TFPBlock*) iter.Next())){
190 index = -1;
191 if (BinarySearchReadList(blockObj, offset, len, &index)){
192 found = true;
193 break;
194 }
195 }
196 if (found)
197 break;
198 else{
200 fReadBlockAdded.wait(lk); //wait for a new block to be added
201 fWaitTime.Stop();
202 }
203 }
204
205 if (found){
206 char *pBuff = blockObj->GetPtrToPiece(index);
207 pBuff += (offset - blockObj->GetPos(index));
208 memcpy(buf, pBuff, len);
209 }
210 return found;
211}
212
213////////////////////////////////////////////////////////////////////////////////
214/// Create a TFPBlock object or recycle one and add it to the prefetchBlocks list.
215
216void TFilePrefetch::ReadBlock(Long64_t* offset, Int_t* len, Int_t nblock)
217{
218 TFPBlock* block = CreateBlockObj(offset, len, nblock);
219 AddPendingBlock(block);
220}
221
222////////////////////////////////////////////////////////////////////////////////
223/// Safe method to add a block to the pendingList.
224
226{
227 fMutexPendingList.lock();
228 fPendingBlocks->Add(block);
229 fMutexPendingList.unlock();
230
231 fNewBlockAdded.notify_one();
232}
233
234////////////////////////////////////////////////////////////////////////////////
235/// Safe method to remove a block from the pendingList.
236
238{
239 TFPBlock* block = 0;
240
241 // Use the semaphore to deal with the case when the file pointer
242 // is changed on the fly by TChain
244 std::unique_lock<std::mutex> lk(fMutexPendingList);
245 // Wait unless there is a pending block or prefetching is over
246 fNewBlockAdded.wait(lk, [&]{ return fPendingBlocks->GetSize() > 0 || fPrefetchFinished; });
247 lk.unlock();
249
250 lk.lock();
251 if (fPendingBlocks->GetSize()){
252 block = (TFPBlock*)fPendingBlocks->First();
253 block = (TFPBlock*)fPendingBlocks->Remove(block);
254 }
255 return block;
256}
257
258////////////////////////////////////////////////////////////////////////////////
259/// Safe method to add a block to the readList.
260
262{
263 fMutexReadList.lock();
264
266 TFPBlock* movedBlock = (TFPBlock*) fReadBlocks->First();
267 movedBlock = (TFPBlock*)fReadBlocks->Remove(movedBlock);
268 delete movedBlock;
269 movedBlock = 0;
270 }
271
272 fReadBlocks->Add(block);
273 fMutexReadList.unlock();
274
275 //signal the addition of a new block
276 fReadBlockAdded.notify_one();
277}
278
279
280////////////////////////////////////////////////////////////////////////////////
281/// Create a new block or recycle an old one.
282
284{
285 TFPBlock* blockObj = 0;
286
287 fMutexReadList.lock();
288
290 blockObj = static_cast<TFPBlock*>(fReadBlocks->First());
291 fReadBlocks->Remove(blockObj);
292 fMutexReadList.unlock();
293 blockObj->ReallocBlock(offset, len, noblock);
294 }
295 else{
296 fMutexReadList.unlock();
297 blockObj = new TFPBlock(offset, len, noblock);
298 }
299 return blockObj;
300}
301
302////////////////////////////////////////////////////////////////////////////////
303/// Return reference to the consumer thread.
304
306{
307 return fConsumer;
308}
309
310
311////////////////////////////////////////////////////////////////////////////////
312/// Change the file
313///
314/// When prefetching is enabled we also need to:
315/// - make sure the async thread is not doing any work
316/// - clear all blocks from prefetching and read list
317/// - reset the file pointer
318
320{
321 if (action == TFile::kDisconnect) {
322 if (!fThreadJoined) {
324 }
325
326 if (fFile) {
327 // Remove all pending and read blocks
328 fMutexPendingList.lock();
330 fMutexPendingList.unlock();
331
332 fMutexReadList.lock();
334 fMutexReadList.unlock();
335 }
336
337 fFile = file;
338 if (!fThreadJoined) {
340 }
341 } else {
342 // kDoNotDisconnect must reconnect to the same file
343 assert((fFile == file) && "kDoNotDisconnect must reattach to the same file");
344 }
345}
346
347
348////////////////////////////////////////////////////////////////////////////////
349/// Used to start the consumer thread.
350
352{
353 int rc;
354
355 fConsumer = new TThread((TThread::VoidRtnFunc_t) ThreadProc, (void*) this);
356 rc = fConsumer->Run();
357 if ( !rc ) {
359 }
360 return rc;
361}
362
363
364////////////////////////////////////////////////////////////////////////////////
365/// Execution loop of the consumer thread.
366
368{
369 TFilePrefetch* pClass = (TFilePrefetch*) arg;
370
371 while (!pClass->IsPrefetchFinished()) {
372 pClass->ReadListOfBlocks();
373 }
374
375 return (TThread::VoidRtnFunc_t) 1;
376}
377
378//############################# CACHING PART ###################################
379
380////////////////////////////////////////////////////////////////////////////////
381/// Sum up individual hex values to obtain a decimal value.
382
384{
385 Int_t result = 0;
386 const char* ptr = hex;
387
388 for(Int_t i=0; i < (Int_t)strlen(hex); i++)
389 result += xtod(ptr[i]);
390
391 return result;
392}
393
394////////////////////////////////////////////////////////////////////////////////
395/// Test if the block is in cache.
396
398{
399 if (fPathCache == "")
400 return false;
401
402 Bool_t found = false;
403 TString fullPath(fPathCache); // path of the cached files.
404
405 Int_t value = 0;
406
407 if (!gSystem->OpenDirectory(fullPath))
408 gSystem->mkdir(fullPath);
409
410 //dir is SHA1 value modulo 16; filename is the value of the SHA1(offset+len)
411 TMD5* md = new TMD5();
412
413 TString concatStr;
414 for (Int_t i=0; i < block->GetNoElem(); i++){
415 concatStr.Form("%lld", block->GetPos(i));
416 md->Update((UChar_t*)concatStr.Data(), concatStr.Length());
417 }
418
419 md->Final();
420 TString fileName( md->AsString() );
421 value = SumHex(fileName);
422 value = value % 16;
423 TString dirName;
424 dirName.Form("%i", value);
425
426 fullPath += "/" + dirName + "/" + fileName;
427
428 FileStat_t stat;
429 if (gSystem->GetPathInfo(fullPath, stat) == 0) {
430 path = new char[fullPath.Length() + 1];
431 strlcpy(path, fullPath,fullPath.Length() + 1);
432 found = true;
433 } else
434 found = false;
435
436 delete md;
437 return found;
438}
439
440////////////////////////////////////////////////////////////////////////////////
441/// Return a buffer from cache.
442
443char* TFilePrefetch::GetBlockFromCache(const char* path, Int_t length)
444{
445 char *buffer = 0;
446 TString strPath = path;
447
448 strPath += "?filetype=raw";
449 TFile* file = new TFile(strPath);
450
451 Double_t start = 0;
452 if (gPerfStats != 0) start = TTimeStamp();
453
454 buffer = (char*) calloc(length, sizeof(char));
455 file->ReadBuffer(buffer, 0, length);
456
457 fFile->fBytesRead += length;
458 fFile->fgBytesRead += length;
461
464 if (gPerfStats != 0) {
465 gPerfStats->FileReadEvent(fFile, length, start);
466 }
467
468 file->Close();
469 delete file;
470 return buffer;
471}
472
473////////////////////////////////////////////////////////////////////////////////
474/// Save the block content in cache.
475
477{
478 if (fPathCache == "")
479 return;
480
481 //dir is SHA1 value modulo 16; filename is the value of the SHA1
482 TMD5* md = new TMD5();
483
484 TString concatStr;
485 for(Int_t i=0; i< block->GetNoElem(); i++){
486 concatStr.Form("%lld", block->GetPos(i));
487 md->Update((UChar_t*)concatStr.Data(), concatStr.Length());
488 }
489 md->Final();
490
491 TString fileName( md->AsString() );
492 Int_t value = SumHex(fileName);
493 value = value % 16;
494
495 TString fullPath( fPathCache );
496 TString dirName;
497 dirName.Form("%i", value);
498 fullPath += ("/" + dirName);
499
500 if (!gSystem->OpenDirectory(fullPath))
501 gSystem->mkdir(fullPath);
502
503 TFile* file = 0;
504 fullPath += ("/" + fileName);
505 FileStat_t stat;
506 if (gSystem->GetPathInfo(fullPath, stat) == 0) {
507 fullPath += "?filetype=raw";
508 file = TFile::Open(fullPath, "update");
509 } else{
510 fullPath += "?filetype=raw";
511 file = TFile::Open(fullPath, "new");
512 }
513
514 if (file) {
515 // coverity[unchecked_value] We do not print error message, have not error
516 // return code and close the file anyway, not need to check the return value.
517 file->WriteBuffer(block->GetBuffer(), block->GetDataSize());
518 file->Close();
519 delete file;
520 }
521 delete md;
522}
523
524
525////////////////////////////////////////////////////////////////////////////////
526/// Set the path of the cache directory.
527
529{
530 fPathCache = path;
531
532 if (!gSystem->OpenDirectory(path)){
533 return (!gSystem->mkdir(path) ? true : false);
534 }
535
536 // Directory already exists
537 return true;
538}
539
#define SafeDelete(p)
Definition: RConfig.hxx:534
#define c(i)
Definition: RSha256.hxx:101
int Int_t
Definition: RtypesCore.h:45
unsigned char UChar_t
Definition: RtypesCore.h:38
const Bool_t kFALSE
Definition: RtypesCore.h:101
bool Bool_t
Definition: RtypesCore.h:63
double Double_t
Definition: RtypesCore.h:59
long long Long64_t
Definition: RtypesCore.h:80
const Bool_t kTRUE
Definition: RtypesCore.h:100
#define ClassImp(name)
Definition: Rtypes.h:364
static const int kMAX_READ_SIZE
int xtod(char c)
R__EXTERN TSystem * gSystem
Definition: TSystem.h:559
R__EXTERN TVirtualMonitoringWriter * gMonitoringWriter
#define gPerfStats
#define calloc
Definition: civetweb.c:1537
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
virtual Int_t GetSize() const
Return the capacity of the collection, i.e.
Definition: TCollection.h:184
This class represents the encapsulation of a block request.
Definition: TFPBlock.h:22
char * GetPtrToPiece(Int_t index) const
Get block buffer.
Definition: TFPBlock.h:109
void SetPos(Int_t, Long64_t)
Set pos value for index idx.
Definition: TFPBlock.cxx:72
void SetBuffer(char *)
Set block buffer.
Definition: TFPBlock.cxx:81
Long64_t GetDataSize() const
Return size of the data in the block.
Definition: TFPBlock.h:73
void ReallocBlock(Long64_t *, Int_t *, Int_t)
Reallocate the block's buffer based on the length of the elements it will contain.
Definition: TFPBlock.cxx:94
Int_t GetLen(Int_t) const
Get length of the element at index i.
Definition: TFPBlock.h:97
Int_t GetNoElem() const
Return number of elements in the block.
Definition: TFPBlock.h:85
Long64_t GetPos(Int_t) const
Get position of the element at index i.
Definition: TFPBlock.h:91
char * GetBuffer() const
Get block buffer.
Definition: TFPBlock.h:103
The prefetching mechanism uses two classes (TFilePrefetch and TFPBlock) to prefetch in advance a bloc...
Definition: TFilePrefetch.h:33
Bool_t BinarySearchReadList(TFPBlock *, Long64_t, Int_t, Int_t *)
Search for a requested element in a block and return the index.
void ReadBlock(Long64_t *, Int_t *, Int_t)
Create a TFPBlock object or recycle one and add it to the prefetchBlocks list.
static TThread::VoidRtnFunc_t ThreadProc(void *)
Execution loop of the consumer thread.
Int_t SumHex(const char *)
Sum up individual hex values to obtain a decimal value.
TFilePrefetch(TFile *)
Constructor.
TSemaphore * fSemChangeFile
semaphore used when changing a file in TChain
Definition: TFilePrefetch.h:44
void ReadListOfBlocks()
Get blocks specified in prefetchBlocks.
TFPBlock * CreateBlockObj(Long64_t *, Int_t *, Int_t)
Create a new block or recycle an old one.
TThread * fConsumer
consumer thread
Definition: TFilePrefetch.h:39
std::condition_variable fNewBlockAdded
signal the addition of a new pending block
Definition: TFilePrefetch.h:42
void AddReadBlock(TFPBlock *)
Safe method to add a block to the readList.
Bool_t CheckBlockInCache(char *&, TFPBlock *)
Test if the block is in cache.
Long64_t GetWaitTime()
Return the time spent wating for buffer to be read in microseconds.
Bool_t fThreadJoined
mark if async thread was joined
Definition: TFilePrefetch.h:47
Bool_t SetCache(const char *)
Set the path of the cache directory.
TList * fReadBlocks
list of blocks read
Definition: TFilePrefetch.h:38
Int_t ThreadStart()
Used to start the consumer thread.
std::atomic< Bool_t > fPrefetchFinished
true if prefetching is over
Definition: TFilePrefetch.h:48
void ReadAsync(TFPBlock *, Bool_t &)
Read one block and insert it in prefetchBuffers list.
TFPBlock * GetPendingBlock()
Safe method to remove a block from the pendingList.
void AddPendingBlock(TFPBlock *)
Safe method to add a block to the pendingList.
Bool_t ReadBuffer(char *, Long64_t, Int_t)
Return a prefetched element.
virtual ~TFilePrefetch()
Destructor.
TString fPathCache
path to the cache directory
Definition: TFilePrefetch.h:45
std::mutex fMutexReadList
mutex for the list of read blocks
Definition: TFilePrefetch.h:41
TStopwatch fWaitTime
time waiting to prefetch a buffer (in usec)
Definition: TFilePrefetch.h:46
std::mutex fMutexPendingList
mutex for the pending list
Definition: TFilePrefetch.h:40
std::condition_variable fReadBlockAdded
signal the addition of a new red block
Definition: TFilePrefetch.h:43
void WaitFinishPrefetch()
Killing the async prefetching thread.
TThread * GetThread() const
Return reference to the consumer thread.
TFile * fFile
reference to the file
Definition: TFilePrefetch.h:36
TList * fPendingBlocks
list of pending blocks to be read
Definition: TFilePrefetch.h:37
void SetFile(TFile *file, TFile::ECacheAction action=TFile::kDisconnect)
Change the file.
char * GetBlockFromCache(const char *, Int_t)
Return a buffer from cache.
void SaveBlockInCache(TFPBlock *)
Save the block content in cache.
Bool_t IsPrefetchFinished() const
Definition: TFilePrefetch.h:82
A ROOT file is a suite of consecutive data records (TKey instances) with a well defined format.
Definition: TFile.h:54
static std::atomic< Long64_t > fgBytesRead
Number of bytes read by all TFile objects.
Definition: TFile.h:130
Long64_t fBytesRead
Number of bytes read from this file.
Definition: TFile.h:77
Long64_t GetArchiveOffset() const
Definition: TFile.h:213
virtual Bool_t ReadBuffers(char *buf, Long64_t *pos, Int_t *len, Int_t nbuf)
Read the nbuf blocks described in arrays pos and len.
Definition: TFile.cxx:1747
virtual Int_t GetReadCalls() const
Definition: TFile.h:236
virtual void SetReadCalls(Int_t readcalls=0)
Definition: TFile.h:282
ECacheAction
TTreeCache flushing semantics.
Definition: TFile.h:71
@ kDisconnect
Definition: TFile.h:71
TArchiveFile * GetArchive() const
Definition: TFile.h:212
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
Create / open a file.
Definition: TFile.cxx:4011
static std::atomic< Int_t > fgReadCalls
Number of bytes read from all TFile objects.
Definition: TFile.h:132
TObject * Next()
Definition: TCollection.h:251
A doubly linked list.
Definition: TList.h:44
virtual void Add(TObject *obj)
Definition: TList.h:87
virtual TObject * Remove(TObject *obj)
Remove object from the list.
Definition: TList.cxx:822
virtual TObject * First() const
Return the first object in the list. Returns 0 when list is empty.
Definition: TList.cxx:659
virtual void Clear(Option_t *option="")
Remove all objects from the list.
Definition: TList.cxx:402
This code implements the MD5 message-digest algorithm.
Definition: TMD5.h:44
const char * AsString() const
Return message digest as string.
Definition: TMD5.cxx:220
void Update(const UChar_t *buf, UInt_t len)
Update TMD5 object to reflect the concatenation of another buffer full of bytes.
Definition: TMD5.cxx:108
void Final()
MD5 finalization, ends an MD5 message-digest operation, writing the the message digest and zeroizing ...
Definition: TMD5.cxx:167
Int_t Post()
Increment the value of the semaphore.
Definition: TSemaphore.cxx:103
Int_t Wait()
If the semaphore value is > 0 then decrement it and carry on, else block, waiting on the condition un...
Definition: TSemaphore.cxx:35
Double_t RealTime()
Stop the stopwatch (if it is running) and return the realtime (in seconds) passed between the start a...
Definition: TStopwatch.cxx:110
void Start(Bool_t reset=kTRUE)
Start the stopwatch.
Definition: TStopwatch.cxx:58
void Stop()
Stop the stopwatch.
Definition: TStopwatch.cxx:77
Basic string class.
Definition: TString.h:136
Ssiz_t Length() const
Definition: TString.h:410
const char * Data() const
Definition: TString.h:369
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
Definition: TString.cxx:2314
virtual void * OpenDirectory(const char *name)
Open a directory. Returns 0 if directory does not exist.
Definition: TSystem.cxx:837
virtual int mkdir(const char *name, Bool_t recursive=kFALSE)
Make a file system directory.
Definition: TSystem.cxx:907
int GetPathInfo(const char *path, Long_t *id, Long_t *size, Long_t *flags, Long_t *modtime)
Get info about a file: id, size, flags, modification time.
Definition: TSystem.cxx:1398
Long_t Join(void **ret=nullptr)
Join this thread.
Definition: TThread.cxx:515
Int_t Run(void *arg=nullptr, const int affinity=-1)
Start the thread.
Definition: TThread.cxx:571
void *(* VoidRtnFunc_t)(void *)
Definition: TThread.h:52
The TTimeStamp encapsulates seconds and ns since EPOCH.
Definition: TTimeStamp.h:71
virtual Bool_t SendFileReadProgress(TFile *)
Definition: file.py:1
Definition: first.py:1