75 for (
Int_t i = 0; i < size; i++) {
117 std::vector<Int_t> aUnzipLen = std::vector<Int_t>(newSize, 0);
118 std::unique_ptr<char[]> *aUnzipChunks =
new std::unique_ptr<char[]>[newSize];
119 std::atomic<Byte_t> *aUnzipStatus =
new std::atomic<Byte_t>[newSize];
120 memset(aUnzipStatus, 0, newSize*
sizeof(std::atomic<Byte_t>));
122 for (
Int_t i = 0; i < oldSize; i++) {
173 return fUnzipStatus[index].compare_exchange_weak(oldValue, newValue, std::memory_order_release, std::memory_order_relaxed);
234 Info(
"TTreeCacheUnzip",
"Enabling Parallel Unzipping");
240 Warning(
"TTreeCacheUnzip",
"Parallel Option unknown");
305 if (entry == -1) entry = 0;
339 if (!lbaskets || !entries)
continue;
343 for (
Int_t j=0;j<nb;j++) {
345 if (j<blistsize && b->GetListOfBaskets()->UncheckedAt(j))
continue;
348 Int_t len = lbaskets[j];
349 if (pos <= 0 || len <= 0)
continue;
352 if (entries[j] < entry && (j < nb - 1 && entries[j+1] <= entry))
continue;
355 if (j < nb - 1) emax = entries[j+1] - 1;
356 if (!elist->
ContainsRange(entries[j] + chainOffset, emax + chainOffset))
continue;
495 Int_t nread = maxbytes;
498 if (nb < 0)
return nread;
500 const Int_t headerSize = 16;
501 if (nread < headerSize)
return nread;
506 if (!olen) olen = nbytes - klen;
549 const Int_t hlen = 128;
550 Int_t objlen = 0, keylen = 0;
559 rdoffs =
fSeek[index];
575 locbuff =
new char[rdlen];
576 }
else if (rdlen * 3 < 16384) {
577 locbuff =
new char[rdlen * 2];
579 locbuff =
new char[16384];
586 if (locbuff)
delete [] locbuff;
592 Int_t len = (objlen > nbytes - keylen) ? keylen + objlen : nbytes;
599 Info(
"UnzipCache",
"Block %d is too big, skipping.", index);
602 if (locbuff)
delete [] locbuff;
609 if ((loclen > 0) && (loclen == objlen + keylen)) {
612 if (locbuff)
delete [] locbuff;
621 if (locbuff)
delete [] locbuff;
633 auto mapFunction = [&]() {
634 auto unzipFunction = [&](
const std::vector<Int_t> &indices) {
638 for (
auto ii : indices) {
643 Info(
"UnzipCache",
"Unzipping failed or cache is in learning state");
650 std::vector<std::vector<Int_t>> basketIndices;
651 std::vector<Int_t> indices;
656 indices.push_back(i);
658 if (i >= fNseek)
break;
661 basketIndices.push_back(indices);
666 pool.
Foreach(unzipFunction, basketIndices);
745 Int_t idx = (seekidx + 1 + ii) % fNseek;
760 if ( myCycle !=
fCycle ) {
762 Info(
"GetUnzipBuffer",
"Sudden paging Break!!! fNseek: %d, fIsLearning:%d",
875 const Int_t hlen = 128;
876 Int_t nbytes = 0, objlen = 0, keylen = 0;
883 if(objlen > nbytes - keylen &&
R__unzip_header(&nin, bufcur, &nbuf) != 0) {
884 Error(
"UnzipBuffer",
"Inconsistency found in header (nin=%d, nbuf=%d)", nin, nbuf);
888 Int_t l = keylen + objlen;
898 Bool_t oldCase = objlen == nbytes - keylen
902 if (objlen > nbytes-keylen || oldCase) {
905 memcpy(*dest, src, keylen);
908 char *objbuf = *dest + keylen;
918 Info(
"UnzipBuffer",
" nin:%d, nbuf:%d, bufcur[3] :%d, bufcur[4] :%d, bufcur[5] :%d ",
919 nin, nbuf, bufcur[3], bufcur[4], bufcur[5]);
920 if (oldCase && (nin > objlen || nbuf > objlen)) {
922 Info(
"UnzipBuffer",
"oldcase objlen :%d ", objlen);
925 memcpy(*dest + keylen, src + keylen, objlen);
930 R__unzip(&nin, bufcur, &nbuf, objbuf, &nout);
933 Info(
"UnzipBuffer",
"R__unzip nin:%d, bufcur:%p, nbuf:%d, objbuf:%p, nout:%d",
934 nin, bufcur, nbuf, objbuf, nout);
938 if (noutot >= objlen)
break;
943 if (noutot != objlen) {
944 Error(
"UnzipBuffer",
"nbytes = %d, keylen = %d, objlen = %d, noutot = %d, nout=%d, nin=%d, nbuf=%d",
945 nbytes,keylen,objlen, noutot,nout,nin,nbuf);
947 if(alloc)
delete [] *
dest;
953 memcpy(*dest, src, keylen);
955 memcpy(*dest + keylen, src + keylen, objlen);
965 printf(
"******TreeCacheUnzip statistics for file: %s ******\n",
fFile->
GetName());
967 printf(
"Number of blocks unzipped by threads: %d\n",
fNUnzip);
968 printf(
"Number of hits: %d\n",
fNFound);
969 printf(
"Number of stalls: %d\n",
fNStalls);
970 printf(
"Number of misses: %d\n",
fNMissed);
void Foreach(F func, unsigned nTimes)
Execute func (with no arguments) nTimes in parallel.
std::unique_ptr< char[]> * fUnzipChunks
! [fNseek] Individual unzipped chunks. Their summed size is kept under control.
virtual const char * GetName() const
Returns name of object.
Int_t fNtot
Total size of prefetched blocks.
void frombuf(char *&buf, Bool_t *x)
Long64_t fEntryMax
! last entry in the cache
Bool_t IsProgress(Int_t index) const
Long64_t * GetBasketEntry() const
Int_t fNUnzip
! number of blocks that were unzipped
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
TFile * fFile
Pointer to file.
void UpdateBranches(TTree *tree)
update pointer to current Tree and recompute pointers to the branches in the cache ...
TObjArray * GetListOfBaskets()
virtual void ResetCache()
This will delete the list of buffers that are in the unzipping cache and will reset certain values in...
Int_t fNStalls
! number of hits which caused a stall
TObjArray * fBranches
! List of branches to be stored in the cache
Int_t GetRecordHeader(char *buf, Int_t maxbytes, Int_t &nbytes, Int_t &objlen, Int_t &keylen)
Read the logical record header from the buffer buf.
Bool_t TryUnzipping(Int_t index)
Start unzipping the basket if it is untouched yet.
virtual void StopLearningPhase()
It's the same as TTreeCache::StopLearningPhase but we guarantee that we start the unzipping just afte...
A specialized TFileCacheRead object for a TTree.
virtual void Seek(Long64_t offset, ERelativeTo pos=kBeg)
Seek to a specific position in the file. Pos it either kBeg, kCur or kEnd.
static Int_t SetParallelUnzip(TTreeCacheUnzip::EParUnzipMode option=TTreeCacheUnzip::kEnable)
Static function that (de)activates multithreading unzipping.
virtual Bool_t ReadBuffer(char *buf, Int_t len)
Read a buffer from the file.
virtual Int_t SetBufferSize(Int_t buffersize)
Change the underlying buffer size of the cache.
TDirectory * GetDirectory() const
Long64_t fEntryMin
! first entry in the cache
Bool_t FillBuffer()
Fill the cache buffer with the branches in the cache.
virtual void Prefetch(Long64_t pos, Int_t len)
Add block of length len at position pos in the list of blocks to be prefetched.
virtual void StopLearningPhase()
This is the counterpart of StartLearningPhase() and can be used to stop the learning phase...
void Clear(Int_t size)
Clear all baskets' state arrays.
virtual Int_t AddBranch(TBranch *b, Bool_t subgbranches=kFALSE)
Add a branch to the list of branches to be stored in the cache this function is called by TBranch::Ge...
Long64_t * GetTreeOffset() const
void SetUnzipBufferSize(Long64_t bufferSize)
Sets the size for the unzipping cache...
Int_t fNFound
! number of blocks that were found in the cache
Bool_t IsUnzipped(Int_t index) const
Check if the basket is unzipped already.
static Bool_t IsParallelUnzip()
Static function that tells wether the multithreading unzipping is activated.
Int_t UnzipBuffer(char **dest, char *src)
Unzips a ROOT specific buffer...
virtual Int_t GetUnzipBuffer(char **buf, Long64_t pos, Int_t len, Bool_t *free)
We try to read a buffer that has already been unzipped Returns -1 in case of read failure...
virtual Int_t ReadBufferExt(char *buf, Long64_t pos, Int_t len, Int_t &loc)
std::unique_ptr< ROOT::Experimental::TTaskGroup > fUnzipTaskGroup
Helper class to iterate over cluster of baskets.
Int_t * GetBasketBytes() const
void SetUnzipped(Int_t index, char *buf, Int_t len)
void SetMissed(Int_t index)
Int_t * fSeekLen
[fNseek] Length of buffers to be prefetched
virtual Long64_t GetReadEntry() const
Bool_t fIsTransferred
True when fBuffer contains something valid.
Int_t fNMissed
! number of blocks that were not found in the cache and were unzipped
virtual Bool_t ContainsRange(Long64_t entrymin, Long64_t entrymax)
Return TRUE if list contains entries from entrymin to entrymax included.
virtual TClusterIterator GetClusterIterator(Long64_t firstentry)
Return an iterator over the cluster of baskets starting at firstentry.
int R__unzip_header(Int_t *nin, UChar_t *bufin, Int_t *lout)
void Print(Option_t *option="") const
Print cache statistics.
Specialization of TTreeCache for parallel Unzipping.
void Reset(Int_t oldSize, Int_t newSize)
Reset all baskets' state arrays.
Bool_t fParallel
Indicate if we want to activate the parallelism (for this instance)
virtual TFile * GetFile() const
virtual Int_t ReadBufferExt(char *buf, Long64_t pos, Int_t len, Int_t &loc)
TTree * fTree
! pointer to the current Tree
virtual Int_t AddBranch(TBranch *b, Bool_t subbranches=kFALSE)
Add a branch to the list of branches to be stored in the cache this function is called by TBranch::Ge...
static Double_t fgRelBuffSize
This is the percentage of the TTreeCacheUnzip that will be used.
This class provides a simple interface to execute the same task multiple times in parallel...
virtual ~TTreeCacheUnzip()
Destructor. (in general called by the TFile destructor)
virtual void SetEntryRange(Long64_t emin, Long64_t emax)
Set the minimum and maximum entry number to be processed this information helps to optimize the numbe...
Int_t CreateTasks()
We create a TTaskGroup and asynchronously maps each group of baskets(> 100 kB in total) to a task...
Int_t * fSeekIndex
[fNseek] sorted index table of fSeek
Int_t fNseekMax
! fNseek can change so we need to know its max size
Bool_t IsUntouched(Int_t index) const
A class to manage the asynchronous execution of work items.
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Int_t GetMaxBaskets() const
static EParUnzipMode GetParallelUnzip()
Static function that returns the parallel option (to indicate an additional thread) ...
A TEventList object is a list of selected events (entries) in a TTree.
std::atomic< Byte_t > * fUnzipStatus
! [fNSeek]
Int_t fNReadPref
Number of blocks that were prefetched.
virtual Long64_t GetBasketSeek(Int_t basket) const
Return address of basket in the file.
Long64_t fEntryCurrent
! current lowest entry number in the cache
TObject * UncheckedAt(Int_t i) const
virtual void Print(Option_t *option="") const
Print cache statistics.
Int_t fUnzipGroupSize
! Min accumulated size of a group of baskets ready to be unzipped by a IMT task
Bool_t fIsLearning
! true if cache is in learning mode
Long64_t fUnzipBufferSize
! Max Size for the ready unzipped blocks (default is 2*fBufferSize)
virtual Int_t GetTreeNumber() const
virtual Int_t GetBufferSize() const
#define R__LOCKGUARD(mutex)
Bool_t IsFinished(Int_t index) const
void Init()
Initialization procedure common to all the constructors.
static void SetUnzipRelBufferSize(Float_t relbufferSize)
static function: Sets the unzip relatibe buffer size
std::vector< Int_t > fUnzipLen
! [fNseek] Length of the unzipped buffers
static TTreeCacheUnzip::EParUnzipMode fgParallel
Indicate if we want to activate the parallelism.
virtual Int_t SetBufferSize(Int_t buffersize)
Change the underlying buffer size of the cache.
void R__unzip(Int_t *nin, UChar_t *bufin, Int_t *lout, char *bufout, Int_t *nout)
virtual Long64_t GetEntries() const
Int_t fNbranches
! Number of branches in the cache
Long64_t * fSeekSort
[fNseek] Position on file of buffers to be prefetched (sorted)
#define dest(otri, vertexptr)
Long64_t * fSeek
[fNseek] Position on file of buffers to be prefetched
A chain is a collection of files containing TTree objects.
you should not use this method at all Int_t Int_t Double_t Double_t Double_t Int_t Double_t Double_t Double_t Double_t b
A TTree object has a header with a name and a title.
TEventList * GetEventList() const
void SetFinished(Int_t index)
Set cache as finished.
virtual const char * GetName() const
Returns name of object.
virtual Int_t GetSize() const
Return the capacity of the collection, i.e.
A TTree is a list of TBranches.
Long64_t fEntryNext
! next entry number where cache must be filled
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
void SetEntryRange(Long64_t emin, Long64_t emax)
Set the minimum and maximum entry number to be processed this information helps to optimize the numbe...
virtual void UpdateBranches(TTree *tree)
Update pointer to current Tree and recompute pointers to the branches in the cache.
Int_t UnzipCache(Int_t index)
This inflates a basket in the cache.
Long64_t BinarySearch(Long64_t n, const T *array, T value)
Binary search in an array of n values to locate value.
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
virtual Bool_t ReadBufferAsync(Long64_t offs, Int_t len)
Int_t fNseek
Number of blocks to be prefetched.