52   for (
Int_t i = 0; i < size; i++) {
 
   64   return fUnzipStatus[index].load() == 
kUntouched;
 
   70   return fUnzipStatus[index].load() == 
kProgress;
 
   76   return fUnzipStatus[index].load() == 
kFinished;
 
   84   return (fUnzipStatus[index].load() == 
kFinished) && (fUnzipChunks[index].get()) && (fUnzipLen[index] > 0);
 
   94   std::vector<Int_t>       aUnzipLen    = std::vector<Int_t>(newSize, 0);
 
   95   std::unique_ptr<char[]> *aUnzipChunks = 
new std::unique_ptr<char[]>[newSize];
 
   96   std::atomic<Byte_t>     *aUnzipStatus = 
new std::atomic<Byte_t>[newSize];
 
   98   for (
Int_t i = 0; i < newSize; ++i)
 
   99      aUnzipStatus[i].store(0);
 
  101   for (
Int_t i = 0; i < oldSize; i++) {
 
  102      aUnzipLen[i]    = fUnzipLen[i];
 
  103      aUnzipChunks[i] = std::move(fUnzipChunks[i]);
 
  104      aUnzipStatus[i].store(fUnzipStatus[i].load());
 
  107   if (fUnzipChunks) 
delete [] fUnzipChunks;
 
  108   if (fUnzipStatus) 
delete [] fUnzipStatus;
 
  110   fUnzipLen    = aUnzipLen;
 
  111   fUnzipChunks = aUnzipChunks;
 
  112   fUnzipStatus = aUnzipStatus;
 
  125   fUnzipLen[index] = 0;
 
  126   fUnzipChunks[index].reset();
 
  133   fUnzipChunks[index].reset();
 
  141   fUnzipLen[index] = len;
 
  142   fUnzipChunks[index].reset(buf);
 
  152   return fUnzipStatus[index].compare_exchange_weak(oldValue, newValue, std::memory_order_release, std::memory_order_relaxed);
 
  213         Info(
"TTreeCacheUnzip", 
"Enabling Parallel Unzipping");
 
  219      Warning(
"TTreeCacheUnzip", 
"Parallel Option unknown");
 
  283   if (entry == -1)  entry = 0;
 
  312      if (
b->GetDirectory() == 0) 
continue;
 
  313      if (
b->GetDirectory()->GetFile() != 
fFile) 
continue;
 
  314      Int_t nb = 
b->GetMaxBaskets();
 
  315      Int_t *lbaskets   = 
b->GetBasketBytes();
 
  317      if (!lbaskets || !entries) 
continue;
 
  320      Int_t blistsize = 
b->GetListOfBaskets()->GetSize();
 
  321      for (
Int_t j=0;j<nb;j++) {
 
  323         if (j<blistsize && b->GetListOfBaskets()->UncheckedAt(j)) 
continue;
 
  326         Int_t len = lbaskets[j];
 
  327         if (pos <= 0 || len <= 0) 
continue;
 
  330         if (entries[j] < entry && (j < nb - 1 && entries[j+1] <= entry)) 
continue;
 
  333            if (j < nb - 1) emax = entries[j+1] - 1;
 
  334            if (!elist->
ContainsRange(entries[j] + chainOffset, emax + chainOffset)) 
continue;
 
  473   Int_t nread = maxbytes;
 
  476   if (nb < 0) 
return nread;
 
  478   const Int_t headerSize = 16;
 
  479   if (nread < headerSize) 
return nread;
 
  484   if (!olen) olen = nbytes - klen;
 
  527   const Int_t hlen = 128;
 
  528   Int_t objlen = 0, keylen = 0;
 
  537   rdoffs = 
fSeek[index];
 
  553      locbuff = 
new char[rdlen];
 
  554   } 
else if (rdlen * 3 < 16384) {
 
  555      locbuff = 
new char[rdlen * 2];
 
  557      locbuff = 
new char[16384];
 
  564      if (locbuff) 
delete [] locbuff;
 
  570   Int_t len = (objlen > nbytes - keylen) ? keylen + objlen : nbytes;
 
  577                   Info(
"UnzipCache", 
"Block %d is too big, skipping.", index);
 
  580           if (locbuff) 
delete [] locbuff;
 
  587   if ((loclen > 0) && (loclen == objlen + keylen)) {
 
  590         if (locbuff) 
delete [] locbuff;
 
  599   if (locbuff) 
delete [] locbuff;
 
  611   auto mapFunction = [&]() {
 
  612      auto unzipFunction = [&](
const std::vector<Int_t> &indices) {
 
  616         for (
auto ii : indices) {
 
  621                     Info(
"UnzipCache", 
"Unzipping failed or cache is in learning state");
 
  628      std::vector<std::vector<Int_t>> basketIndices;
 
  629      std::vector<Int_t> indices;
 
  634            indices.push_back(i);
 
  639         basketIndices.push_back(indices);
 
  644      pool.
Foreach(unzipFunction, basketIndices);
 
  738               if ( myCycle != 
fCycle ) {
 
  740                     Info(
"GetUnzipBuffer", 
"Sudden paging Break!!! fNseek: %d, fIsLearning:%d",
 
  855   const Int_t hlen = 128;
 
  856   Int_t nbytes = 0, objlen = 0, keylen = 0;
 
  863      if(objlen > nbytes - keylen && 
R__unzip_header(&nin, bufcur, &nbuf) != 0) {
 
  864         Error(
"UnzipBuffer", 
"Inconsistency found in header (nin=%d, nbuf=%d)", nin, nbuf);
 
  868      Int_t l = keylen + objlen;
 
  878   Bool_t oldCase = objlen == nbytes - keylen
 
  882   if (objlen > nbytes-keylen || oldCase) {
 
  885      memcpy(*
dest, src, keylen);
 
  888      char *objbuf = *
dest + keylen;
 
  898            Info(
"UnzipBuffer", 
" nin:%d, nbuf:%d, bufcur[3] :%d, bufcur[4] :%d, bufcur[5] :%d ",
 
  899                 nin, nbuf, bufcur[3], bufcur[4], bufcur[5]);
 
  900         if (oldCase && (nin > objlen || nbuf > objlen)) {
 
  902               Info(
"UnzipBuffer", 
"oldcase objlen :%d ", objlen);
 
  905            memcpy(*
dest + keylen, src + keylen, objlen);
 
  910         R__unzip(&nin, bufcur, &nbuf, objbuf, &nout);
 
  913            Info(
"UnzipBuffer", 
"R__unzip nin:%d, bufcur:%p, nbuf:%d, objbuf:%p, nout:%d",
 
  914                 nin, bufcur, nbuf, objbuf, nout);
 
  918         if (noutot >= objlen) 
break;
 
  923      if (noutot != objlen) {
 
  924         Error(
"UnzipBuffer", 
"nbytes = %d, keylen = %d, objlen = %d, noutot = %d, nout=%d, nin=%d, nbuf=%d",
 
  925               nbytes,keylen,objlen, noutot,nout,nin,nbuf);
 
  927         if(alloc) 
delete [] *
dest;
 
  933      memcpy(*
dest, src, keylen);
 
  935      memcpy(*
dest + keylen, src + keylen, objlen);
 
  945   printf(
"******TreeCacheUnzip statistics for file: %s ******\n",
fFile->
GetName());
 
  947   printf(
"Number of blocks unzipped by threads: %d\n", 
fNUnzip);
 
  948   printf(
"Number of hits: %d\n", 
fNFound);
 
  949   printf(
"Number of stalls: %d\n", 
fNStalls);
 
  950   printf(
"Number of misses: %d\n", 
fNMissed);
 
void frombuf(char *&buf, Bool_t *x)
 
void R__unzip(Int_t *nin, UChar_t *bufin, Int_t *lout, char *bufout, Int_t *nout)
 
int R__unzip_header(Int_t *nin, UChar_t *bufin, Int_t *lout)
 
#define R__LOCKGUARD(mutex)
 
A class to manage the asynchronous execution of work items.
 
This class provides a simple interface to execute the same task multiple times in parallel,...
 
void Foreach(F func, unsigned nTimes, unsigned nChunks=0)
Execute func (with no arguments) nTimes in parallel.
 
A TTree is a list of TBranches.
 
A chain is a collection of files containing TTree objects.
 
virtual Int_t GetTreeNumber() const
 
Long64_t * GetTreeOffset() const
 
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
 
A TEventList object is a list of selected events (entries) in a TTree.
 
virtual Bool_t ContainsRange(Long64_t entrymin, Long64_t entrymax)
Return TRUE if list contains entries from entrymin to entrymax included.
 
Int_t * fSeekIndex
[fNseek] sorted index table of fSeek
 
virtual Int_t ReadBufferExt(char *buf, Long64_t pos, Int_t len, Int_t &loc)
 
Long64_t * fSeekSort
[fNseek] Position on file of buffers to be prefetched (sorted)
 
Int_t * fSeekLen
[fNseek] Length of buffers to be prefetched
 
Int_t fNtot
Total size of prefetched blocks.
 
virtual void Prefetch(Long64_t pos, Int_t len)
Add block of length len at position pos in the list of blocks to be prefetched.
 
Long64_t * fSeek
[fNseek] Position on file of buffers to be prefetched
 
Bool_t fIsTransferred
True when fBuffer contains something valid.
 
TFile * fFile
Pointer to file.
 
Int_t fNseek
Number of blocks to be prefetched.
 
virtual Int_t GetBufferSize() const
 
virtual void Seek(Long64_t offset, ERelativeTo pos=kBeg)
Seek to a specific position in the file. Pos it either kBeg, kCur or kEnd.
 
virtual Bool_t ReadBufferAsync(Long64_t offs, Int_t len)
 
virtual Bool_t ReadBuffer(char *buf, Int_t len)
Read a buffer from the file.
 
virtual const char * GetName() const
Returns name of object.
 
TObject * UncheckedAt(Int_t i) const
 
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
 
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
 
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
 
A TTreeCache which exploits parallelized decompression of its own content.
 
virtual void StopLearningPhase()
It's the same as TTreeCache::StopLearningPhase but we guarantee that we start the unzipping just afte...
 
Bool_t FillBuffer()
Fill the cache buffer with the branches in the cache.
 
void Init()
Initialization procedure common to all the constructors.
 
Int_t UnzipCache(Int_t index)
This inflates a basket in the cache.
 
Int_t fNMissed
! number of blocks that were not found in the cache and were unzipped
 
void SetEntryRange(Long64_t emin, Long64_t emax)
Set the minimum and maximum entry number to be processed this information helps to optimize the numbe...
 
virtual Int_t ReadBufferExt(char *buf, Long64_t pos, Int_t len, Int_t &loc)
 
Int_t fNseekMax
! fNseek can change so we need to know its max size
 
virtual ~TTreeCacheUnzip()
Destructor. (in general called by the TFile destructor)
 
void UpdateBranches(TTree *tree)
update pointer to current Tree and recompute pointers to the branches in the cache
 
Int_t fNStalls
! number of hits which caused a stall
 
static TTreeCacheUnzip::EParUnzipMode fgParallel
Indicate if we want to activate the parallelism.
 
std::unique_ptr< ROOT::Experimental::TTaskGroup > fUnzipTaskGroup
 
static Int_t SetParallelUnzip(TTreeCacheUnzip::EParUnzipMode option=TTreeCacheUnzip::kEnable)
Static function that (de)activates multithreading unzipping.
 
Int_t UnzipBuffer(char **dest, char *src)
Unzips a ROOT specific buffer... by reading the header at the beginning.
 
Int_t GetRecordHeader(char *buf, Int_t maxbytes, Int_t &nbytes, Int_t &objlen, Int_t &keylen)
Read the logical record header from the buffer buf.
 
static void SetUnzipRelBufferSize(Float_t relbufferSize)
static function: Sets the unzip relatibe buffer size
 
void Print(Option_t *option="") const
Print cache statistics.
 
std::unique_ptr< TMutex > fIOMutex
 
Int_t fNUnzip
! number of blocks that were unzipped
 
Int_t CreateTasks()
We create a TTaskGroup and asynchronously maps each group of baskets(> 100 kB in total) to a task.
 
Bool_t fParallel
Indicate if we want to activate the parallelism (for this instance)
 
Long64_t fUnzipBufferSize
! Max Size for the ready unzipped blocks (default is 2*fBufferSize)
 
static Bool_t IsParallelUnzip()
Static function that tells wether the multithreading unzipping is activated.
 
void SetUnzipBufferSize(Long64_t bufferSize)
Sets the size for the unzipping cache... by default it should be two times the size of the prefetchin...
 
Int_t fUnzipGroupSize
! Min accumulated size of a group of baskets ready to be unzipped by a IMT task
 
virtual Int_t SetBufferSize(Int_t buffersize)
Change the underlying buffer size of the cache.
 
Int_t fNFound
! number of blocks that were found in the cache
 
virtual Int_t GetUnzipBuffer(char **buf, Long64_t pos, Int_t len, Bool_t *free)
We try to read a buffer that has already been unzipped Returns -1 in case of read failure,...
 
virtual Int_t AddBranch(TBranch *b, Bool_t subbranches=kFALSE)
Add a branch to the list of branches to be stored in the cache this function is called by TBranch::Ge...
 
static Double_t fgRelBuffSize
This is the percentage of the TTreeCacheUnzip that will be used.
 
virtual void ResetCache()
This will delete the list of buffers that are in the unzipping cache and will reset certain values in...
 
static EParUnzipMode GetParallelUnzip()
Static function that returns the parallel option (to indicate an additional thread)
 
A cache to speed-up the reading of ROOT datasets.
 
virtual Int_t AddBranch(TBranch *b, Bool_t subgbranches=kFALSE)
Add a branch to the list of branches to be stored in the cache this function is called by the user vi...
 
virtual void UpdateBranches(TTree *tree)
Update pointer to current Tree and recompute pointers to the branches in the cache.
 
virtual Int_t SetBufferSize(Int_t buffersize)
Change the underlying buffer size of the cache.
 
Long64_t fEntryMin
! first entry in the cache
 
Long64_t fEntryNext
! next entry number where cache must be filled
 
Bool_t fIsLearning
! true if cache is in learning mode
 
virtual void SetEntryRange(Long64_t emin, Long64_t emax)
Set the minimum and maximum entry number to be processed this information helps to optimize the numbe...
 
Long64_t fEntryMax
! last entry in the cache
 
Long64_t fEntryCurrent
! current lowest entry number in the cache
 
Int_t fNReadPref
Number of blocks that were prefetched.
 
TTree * fTree
! pointer to the current Tree
 
virtual void StopLearningPhase()
This is the counterpart of StartLearningPhase() and can be used to stop the learning phase.
 
Int_t fNbranches
! Number of branches in the cache
 
virtual void Print(Option_t *option="") const
Print cache statistics.
 
TObjArray * fBranches
! List of branches to be stored in the cache
 
Helper class to iterate over cluster of baskets.
 
A TTree represents a columnar dataset.
 
TEventList * GetEventList() const
 
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
 
Long64_t BinarySearch(Long64_t n, const T *array, T value)
 
Bool_t IsFinished(Int_t index) const
 
void Reset(Int_t oldSize, Int_t newSize)
Reset all baskets' state arrays.
 
void Clear(Int_t size)
Clear all baskets' state arrays.
 
Bool_t IsProgress(Int_t index) const
 
std::atomic< Byte_t > * fUnzipStatus
! [fNSeek]
 
void SetUnzipped(Int_t index, char *buf, Int_t len)
 
std::vector< Int_t > fUnzipLen
! [fNseek] Length of the unzipped buffers
 
Bool_t TryUnzipping(Int_t index)
Start unzipping the basket if it is untouched yet.
 
void SetMissed(Int_t index)
 
Bool_t IsUnzipped(Int_t index) const
Check if the basket is unzipped already.
 
Bool_t IsUntouched(Int_t index) const
 
void SetFinished(Int_t index)
Set cache as finished.
 
std::unique_ptr< char[]> * fUnzipChunks
! [fNseek] Individual unzipped chunks. Their summed size is kept under control.
 
#define dest(otri, vertexptr)