Logo ROOT  
Reference Guide
TFilePrefetch.cxx
Go to the documentation of this file.
1// @(#)root/io:$Id$
2// Author: Elvin Sindrilaru 19/05/2011
3
4/*************************************************************************
5 * Copyright (C) 1995-2011, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12#include "TFilePrefetch.h"
13#include "TTimeStamp.h"
14#include "TSystem.h"
15#include "TMD5.h"
16#include "TVirtualPerfStats.h"
17#include "TVirtualMonitoring.h"
18
19#include <iostream>
20#include <string>
21#include <sstream>
22#include <cstdio>
23#include <cstdlib>
24#include <cctype>
25#include <cassert>
26
27static const int kMAX_READ_SIZE = 2; //maximum size of the read list of blocks
28
29inline int xtod(char c) { return (c>='0' && c<='9') ? c-'0' : ((c>='A' && c<='F') ? c-'A'+10 : ((c>='a' && c<='f') ? c-'a'+10 : 0)); }
30
31using namespace std;
32
34
35/**
36\class TFilePrefetch
37\ingroup IO
38
39The prefetching mechanism uses two classes (TFilePrefetch and
40TFPBlock) to prefetch in advance a block of tree entries. There is
41a thread which takes care of actually transferring the blocks and
42making them available to the main requesting thread. Therefore,
43the time spent by the main thread waiting for the data before
44processing considerably decreases. Besides the prefetching
45mechanisms there is also a local caching option which can be
46enabled by the user. Both capabilities are disabled by default
47and must be explicitly enabled by the user.
48*/
49
50
51////////////////////////////////////////////////////////////////////////////////
52/// Constructor.
53
55 fFile(file),
56 fConsumer(0),
57 fThreadJoined(kTRUE),
58 fPrefetchFinished(kFALSE)
59{
60 fPendingBlocks = new TList();
61 fReadBlocks = new TList();
62
65
67}
68
69////////////////////////////////////////////////////////////////////////////////
70/// Destructor
71
73{
74 if (!fThreadJoined) {
76 }
77
82}
83
84
85////////////////////////////////////////////////////////////////////////////////
86/// Killing the async prefetching thread
87
89{
90 // Inform the consumer thread that prefetching is over
91 {
92 std::lock_guard<std::mutex> lk(fMutexPendingList);
94 }
95 fNewBlockAdded.notify_one();
96
97 fConsumer->Join();
100}
101
102
103////////////////////////////////////////////////////////////////////////////////
104/// Read one block and insert it in prefetchBuffers list.
105
107{
108 char* path = 0;
109
110 if (CheckBlockInCache(path, block)){
111 block->SetBuffer(GetBlockFromCache(path, block->GetDataSize()));
112 inCache = kTRUE;
113 }
114 else{
115 fFile->ReadBuffers(block->GetBuffer(), block->GetPos(), block->GetLen(), block->GetNoElem());
116 if (fFile->GetArchive()) {
117 for (Int_t i = 0; i < block->GetNoElem(); i++)
118 block->SetPos(i, block->GetPos(i) - fFile->GetArchiveOffset());
119 }
120 inCache =kFALSE;
121 }
122 delete[] path;
123}
124
125////////////////////////////////////////////////////////////////////////////////
126/// Get blocks specified in prefetchBlocks.
127
129{
130 Bool_t inCache = kFALSE;
131 TFPBlock* block = 0;
132
133 while((block = GetPendingBlock())){
134 ReadAsync(block, inCache);
136 if (!inCache)
138 }
139}
140
141////////////////////////////////////////////////////////////////////////////////
142/// Search for a requested element in a block and return the index.
143
145{
146 Int_t first = 0, last = -1, mid = -1;
147 last = (Int_t) blockObj->GetNoElem()-1;
148
149 while (first <= last){
150 mid = first + (last - first) / 2;
151 if ((offset >= blockObj->GetPos(mid) && offset <= (blockObj->GetPos(mid) + blockObj->GetLen(mid))
152 && ( (offset + len) <= blockObj->GetPos(mid) + blockObj->GetLen(mid)))){
153
154 *index = mid;
155 return true;
156 }
157 else if (blockObj->GetPos(mid) < offset){
158 first = mid + 1;
159 }
160 else{
161 last = mid - 1;
162 }
163 }
164 return false;
165}
166
167////////////////////////////////////////////////////////////////////////////////
168/// Return the time spent wating for buffer to be read in microseconds.
169
171{
172 return Long64_t(fWaitTime.RealTime()*1.e+6);
173}
174
175////////////////////////////////////////////////////////////////////////////////
176/// Return a prefetched element.
177
179{
180 Bool_t found = false;
181 TFPBlock* blockObj = 0;
182 Int_t index = -1;
183
184 std::unique_lock<std::mutex> lk(fMutexReadList);
185 while (1){
186 TIter iter(fReadBlocks);
187 while ((blockObj = (TFPBlock*) iter.Next())){
188 index = -1;
189 if (BinarySearchReadList(blockObj, offset, len, &index)){
190 found = true;
191 break;
192 }
193 }
194 if (found)
195 break;
196 else{
198 fReadBlockAdded.wait(lk); //wait for a new block to be added
199 fWaitTime.Stop();
200 }
201 }
202
203 if (found){
204 char *pBuff = blockObj->GetPtrToPiece(index);
205 pBuff += (offset - blockObj->GetPos(index));
206 memcpy(buf, pBuff, len);
207 }
208 return found;
209}
210
211////////////////////////////////////////////////////////////////////////////////
212/// Create a TFPBlock object or recycle one and add it to the prefetchBlocks list.
213
214void TFilePrefetch::ReadBlock(Long64_t* offset, Int_t* len, Int_t nblock)
215{
216 TFPBlock* block = CreateBlockObj(offset, len, nblock);
218}
219
220////////////////////////////////////////////////////////////////////////////////
221/// Safe method to add a block to the pendingList.
222
224{
225 fMutexPendingList.lock();
227 fMutexPendingList.unlock();
228
229 fNewBlockAdded.notify_one();
230}
231
232////////////////////////////////////////////////////////////////////////////////
233/// Safe method to remove a block from the pendingList.
234
236{
237 TFPBlock* block = 0;
238
239 // Use the semaphore to deal with the case when the file pointer
240 // is changed on the fly by TChain
242 std::unique_lock<std::mutex> lk(fMutexPendingList);
243 // Wait unless there is a pending block or prefetching is over
244 fNewBlockAdded.wait(lk, [&]{ return fPendingBlocks->GetSize() > 0 || fPrefetchFinished; });
245 lk.unlock();
247
248 lk.lock();
249 if (fPendingBlocks->GetSize()){
252 }
253 return block;
254}
255
256////////////////////////////////////////////////////////////////////////////////
257/// Safe method to add a block to the readList.
258
260{
261 fMutexReadList.lock();
262
264 TFPBlock* movedBlock = (TFPBlock*) fReadBlocks->First();
265 movedBlock = (TFPBlock*)fReadBlocks->Remove(movedBlock);
266 delete movedBlock;
267 movedBlock = 0;
268 }
269
271 fMutexReadList.unlock();
272
273 //signal the addition of a new block
274 fReadBlockAdded.notify_one();
275}
276
277
278////////////////////////////////////////////////////////////////////////////////
279/// Create a new block or recycle an old one.
280
282{
283 TFPBlock* blockObj = 0;
284
285 fMutexReadList.lock();
286
288 blockObj = static_cast<TFPBlock*>(fReadBlocks->First());
289 fReadBlocks->Remove(blockObj);
290 fMutexReadList.unlock();
291 blockObj->ReallocBlock(offset, len, noblock);
292 }
293 else{
294 fMutexReadList.unlock();
295 blockObj = new TFPBlock(offset, len, noblock);
296 }
297 return blockObj;
298}
299
300////////////////////////////////////////////////////////////////////////////////
301/// Return reference to the consumer thread.
302
304{
305 return fConsumer;
306}
307
308
309////////////////////////////////////////////////////////////////////////////////
310/// Change the file
311///
312/// When prefetching is enabled we also need to:
313/// - make sure the async thread is not doing any work
314/// - clear all blocks from prefetching and read list
315/// - reset the file pointer
316
318{
319 if (action == TFile::kDisconnect) {
320 if (!fThreadJoined) {
322 }
323
324 if (fFile) {
325 // Remove all pending and read blocks
326 fMutexPendingList.lock();
328 fMutexPendingList.unlock();
329
330 fMutexReadList.lock();
332 fMutexReadList.unlock();
333 }
334
335 fFile = file;
336 if (!fThreadJoined) {
338 }
339 } else {
340 // kDoNotDisconnect must reconnect to the same file
341 assert((fFile == file) && "kDoNotDisconnect must reattach to the same file");
342 }
343}
344
345
346////////////////////////////////////////////////////////////////////////////////
347/// Used to start the consumer thread.
348
350{
351 int rc;
352
353 fConsumer = new TThread((TThread::VoidRtnFunc_t) ThreadProc, (void*) this);
354 rc = fConsumer->Run();
355 if ( !rc ) {
357 }
358 return rc;
359}
360
361
362////////////////////////////////////////////////////////////////////////////////
363/// Execution loop of the consumer thread.
364
366{
367 TFilePrefetch* pClass = (TFilePrefetch*) arg;
368
369 while (!pClass->IsPrefetchFinished()) {
370 pClass->ReadListOfBlocks();
371 }
372
373 return (TThread::VoidRtnFunc_t) 1;
374}
375
376//############################# CACHING PART ###################################
377
378////////////////////////////////////////////////////////////////////////////////
379/// Sum up individual hex values to obtain a decimal value.
380
382{
383 Int_t result = 0;
384 const char* ptr = hex;
385
386 for(Int_t i=0; i < (Int_t)strlen(hex); i++)
387 result += xtod(ptr[i]);
388
389 return result;
390}
391
392////////////////////////////////////////////////////////////////////////////////
393/// Test if the block is in cache.
394
396{
397 if (fPathCache == "")
398 return false;
399
400 Bool_t found = false;
401 TString fullPath(fPathCache); // path of the cached files.
402
403 Int_t value = 0;
404
405 if (!gSystem->OpenDirectory(fullPath))
406 gSystem->mkdir(fullPath);
407
408 //dir is SHA1 value modulo 16; filename is the value of the SHA1(offset+len)
409 TMD5* md = new TMD5();
410
411 TString concatStr;
412 for (Int_t i=0; i < block->GetNoElem(); i++){
413 concatStr.Form("%lld", block->GetPos(i));
414 md->Update((UChar_t*)concatStr.Data(), concatStr.Length());
415 }
416
417 md->Final();
418 TString fileName( md->AsString() );
419 value = SumHex(fileName);
420 value = value % 16;
421 TString dirName;
422 dirName.Form("%i", value);
423
424 fullPath += "/" + dirName + "/" + fileName;
425
426 FileStat_t stat;
427 if (gSystem->GetPathInfo(fullPath, stat) == 0) {
428 path = new char[fullPath.Length() + 1];
429 strlcpy(path, fullPath,fullPath.Length() + 1);
430 found = true;
431 } else
432 found = false;
433
434 delete md;
435 return found;
436}
437
438////////////////////////////////////////////////////////////////////////////////
439/// Return a buffer from cache.
440
441char* TFilePrefetch::GetBlockFromCache(const char* path, Int_t length)
442{
443 char *buffer = 0;
444 TString strPath = path;
445
446 strPath += "?filetype=raw";
447 TFile* file = new TFile(strPath);
448
449 Double_t start = 0;
450 if (gPerfStats != 0) start = TTimeStamp();
451
452 buffer = (char*) calloc(length, sizeof(char));
453 file->ReadBuffer(buffer, 0, length);
454
455 fFile->fBytesRead += length;
456 fFile->fgBytesRead += length;
459
462 if (gPerfStats != 0) {
463 gPerfStats->FileReadEvent(fFile, length, start);
464 }
465
466 file->Close();
467 delete file;
468 return buffer;
469}
470
471////////////////////////////////////////////////////////////////////////////////
472/// Save the block content in cache.
473
475{
476 if (fPathCache == "")
477 return;
478
479 //dir is SHA1 value modulo 16; filename is the value of the SHA1
480 TMD5* md = new TMD5();
481
482 TString concatStr;
483 for(Int_t i=0; i< block->GetNoElem(); i++){
484 concatStr.Form("%lld", block->GetPos(i));
485 md->Update((UChar_t*)concatStr.Data(), concatStr.Length());
486 }
487 md->Final();
488
489 TString fileName( md->AsString() );
490 Int_t value = SumHex(fileName);
491 value = value % 16;
492
493 TString fullPath( fPathCache );
494 TString dirName;
495 dirName.Form("%i", value);
496 fullPath += ("/" + dirName);
497
498 if (!gSystem->OpenDirectory(fullPath))
499 gSystem->mkdir(fullPath);
500
501 TFile* file = 0;
502 fullPath += ("/" + fileName);
503 FileStat_t stat;
504 if (gSystem->GetPathInfo(fullPath, stat) == 0) {
505 fullPath += "?filetype=raw";
506 file = TFile::Open(fullPath, "update");
507 } else{
508 fullPath += "?filetype=raw";
509 file = TFile::Open(fullPath, "new");
510 }
511
512 if (file) {
513 // coverity[unchecked_value] We do not print error message, have not error
514 // return code and close the file anyway, not need to check the return value.
515 file->WriteBuffer(block->GetBuffer(), block->GetDataSize());
516 file->Close();
517 delete file;
518 }
519 delete md;
520}
521
522
523////////////////////////////////////////////////////////////////////////////////
524/// Set the path of the cache directory.
525
527{
528 fPathCache = path;
529
530 if (!gSystem->OpenDirectory(path)){
531 return (!gSystem->mkdir(path) ? true : false);
532 }
533
534 // Directory already exists
535 return true;
536}
537
#define SafeDelete(p)
Definition: RConfig.hxx:543
#define c(i)
Definition: RSha256.hxx:101
int Int_t
Definition: RtypesCore.h:43
unsigned char UChar_t
Definition: RtypesCore.h:36
const Bool_t kFALSE
Definition: RtypesCore.h:90
long long Long64_t
Definition: RtypesCore.h:71
const Bool_t kTRUE
Definition: RtypesCore.h:89
#define ClassImp(name)
Definition: Rtypes.h:361
static const int kMAX_READ_SIZE
int xtod(char c)
R__EXTERN TSystem * gSystem
Definition: TSystem.h:556
R__EXTERN TVirtualMonitoringWriter * gMonitoringWriter
#define gPerfStats
#define calloc
Definition: civetweb.c:1537
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
virtual Int_t GetSize() const
Return the capacity of the collection, i.e.
Definition: TCollection.h:182
This class represents the encapsulation of a block request.
Definition: TFPBlock.h:22
char * GetPtrToPiece(Int_t index) const
Get block buffer.
Definition: TFPBlock.h:109
void ReallocBlock(Long64_t *, Int_t *, Int_t)
Reallocate the block's buffer based on the length of the elements it will contain.
Definition: TFPBlock.cxx:94
Int_t GetLen(Int_t) const
Get length of the element at index i.
Definition: TFPBlock.h:97
Int_t GetNoElem() const
Return number of elements in the block.
Definition: TFPBlock.h:85
Long64_t GetPos(Int_t) const
Get position of the element at index i.
Definition: TFPBlock.h:91
The prefetching mechanism uses two classes (TFilePrefetch and TFPBlock) to prefetch in advance a bloc...
Definition: TFilePrefetch.h:28
Bool_t BinarySearchReadList(TFPBlock *, Long64_t, Int_t, Int_t *)
Search for a requested element in a block and return the index.
void ReadBlock(Long64_t *, Int_t *, Int_t)
Create a TFPBlock object or recycle one and add it to the prefetchBlocks list.
static TThread::VoidRtnFunc_t ThreadProc(void *)
Execution loop of the consumer thread.
Int_t SumHex(const char *)
Sum up individual hex values to obtain a decimal value.
TFilePrefetch(TFile *)
Constructor.
TSemaphore * fSemChangeFile
Definition: TFilePrefetch.h:39
void ReadListOfBlocks()
Get blocks specified in prefetchBlocks.
TFPBlock * CreateBlockObj(Long64_t *, Int_t *, Int_t)
Create a new block or recycle an old one.
TThread * fConsumer
Definition: TFilePrefetch.h:34
std::condition_variable fNewBlockAdded
Definition: TFilePrefetch.h:37
void AddReadBlock(TFPBlock *)
Safe method to add a block to the readList.
Bool_t CheckBlockInCache(char *&, TFPBlock *)
Test if the block is in cache.
Long64_t GetWaitTime()
Return the time spent wating for buffer to be read in microseconds.
Bool_t fThreadJoined
Definition: TFilePrefetch.h:42
Bool_t SetCache(const char *)
Set the path of the cache directory.
TList * fReadBlocks
Definition: TFilePrefetch.h:33
Int_t ThreadStart()
Used to start the consumer thread.
std::atomic< Bool_t > fPrefetchFinished
Definition: TFilePrefetch.h:43
void ReadAsync(TFPBlock *, Bool_t &)
Read one block and insert it in prefetchBuffers list.
TFPBlock * GetPendingBlock()
Safe method to remove a block from the pendingList.
void AddPendingBlock(TFPBlock *)
Safe method to add a block to the pendingList.
Bool_t ReadBuffer(char *, Long64_t, Int_t)
Return a prefetched element.
virtual ~TFilePrefetch()
Destructor.
TString fPathCache
Definition: TFilePrefetch.h:40
std::mutex fMutexReadList
Definition: TFilePrefetch.h:36
TStopwatch fWaitTime
Definition: TFilePrefetch.h:41
std::mutex fMutexPendingList
Definition: TFilePrefetch.h:35
std::condition_variable fReadBlockAdded
Definition: TFilePrefetch.h:38
void WaitFinishPrefetch()
Killing the async prefetching thread.
TThread * GetThread() const
Return reference to the consumer thread.
TList * fPendingBlocks
Definition: TFilePrefetch.h:32
void SetFile(TFile *file, TFile::ECacheAction action=TFile::kDisconnect)
Change the file.
char * GetBlockFromCache(const char *, Int_t)
Return a buffer from cache.
void SaveBlockInCache(TFPBlock *)
Save the block content in cache.
Bool_t IsPrefetchFinished() const
Definition: TFilePrefetch.h:77
A ROOT file is a suite of consecutive data records (TKey instances) with a well defined format.
Definition: TFile.h:53
static std::atomic< Long64_t > fgBytesRead
Number of bytes read by all TFile objects.
Definition: TFile.h:129
Long64_t fBytesRead
Number of bytes read from this file.
Definition: TFile.h:76
Long64_t GetArchiveOffset() const
Definition: TFile.h:212
virtual Bool_t ReadBuffers(char *buf, Long64_t *pos, Int_t *len, Int_t nbuf)
Read the nbuf blocks described in arrays pos and len.
Definition: TFile.cxx:1682
virtual Int_t GetReadCalls() const
Definition: TFile.h:235
virtual void SetReadCalls(Int_t readcalls=0)
Definition: TFile.h:281
ECacheAction
TTreeCache flushing semantics.
Definition: TFile.h:70
@ kDisconnect
Definition: TFile.h:70
TArchiveFile * GetArchive() const
Definition: TFile.h:211
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
Create / open a file.
Definition: TFile.cxx:3942
static std::atomic< Int_t > fgReadCalls
Number of bytes read from all TFile objects.
Definition: TFile.h:131
TObject * Next()
Definition: TCollection.h:249
A doubly linked list.
Definition: TList.h:44
virtual void Add(TObject *obj)
Definition: TList.h:87
virtual TObject * Remove(TObject *obj)
Remove object from the list.
Definition: TList.cxx:821
virtual TObject * First() const
Return the first object in the list. Returns 0 when list is empty.
Definition: TList.cxx:658
virtual void Clear(Option_t *option="")
Remove all objects from the list.
Definition: TList.cxx:401
This code implements the MD5 message-digest algorithm.
Definition: TMD5.h:44
const char * AsString() const
Return message digest as string.
Definition: TMD5.cxx:220
void Update(const UChar_t *buf, UInt_t len)
Update TMD5 object to reflect the concatenation of another buffer full of bytes.
Definition: TMD5.cxx:108
void Final()
MD5 finalization, ends an MD5 message-digest operation, writing the the message digest and zeroizing ...
Definition: TMD5.cxx:167
Int_t Post()
Increment the value of the semaphore.
Definition: TSemaphore.cxx:103
Int_t Wait()
If the semaphore value is > 0 then decrement it and carry on, else block, waiting on the condition un...
Definition: TSemaphore.cxx:35
Double_t RealTime()
Stop the stopwatch (if it is running) and return the realtime (in seconds) passed between the start a...
Definition: TStopwatch.cxx:110
void Start(Bool_t reset=kTRUE)
Start the stopwatch.
Definition: TStopwatch.cxx:58
void Stop()
Stop the stopwatch.
Definition: TStopwatch.cxx:77
Basic string class.
Definition: TString.h:131
Ssiz_t Length() const
Definition: TString.h:405
const char * Data() const
Definition: TString.h:364
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
Definition: TString.cxx:2289
virtual void * OpenDirectory(const char *name)
Open a directory. Returns 0 if directory does not exist.
Definition: TSystem.cxx:832
virtual int mkdir(const char *name, Bool_t recursive=kFALSE)
Make a file system directory.
Definition: TSystem.cxx:902
int GetPathInfo(const char *path, Long_t *id, Long_t *size, Long_t *flags, Long_t *modtime)
Get info about a file: id, size, flags, modification time.
Definition: TSystem.cxx:1393
Long_t Join(void **ret=0)
Join this thread.
Definition: TThread.cxx:509
Int_t Run(void *arg=0)
Start the thread.
Definition: TThread.cxx:562
void *(* VoidRtnFunc_t)(void *)
Definition: TThread.h:48
The TTimeStamp encapsulates seconds and ns since EPOCH.
Definition: TTimeStamp.h:71
virtual Bool_t SendFileReadProgress(TFile *)
constexpr size_t block
Definition: BatchHelpers.h:29
Definition: file.py:1
Definition: first.py:1