Parallel Unzipping TTreeCache has been specialised in order to let additional threads free to unzip in advance its content. In this implementation we support up to 10 threads, but right now it makes more sense to limit their number to 1-2 The application reading data is carefully synchronized, in order to: - if the block it wants is not unzipped, it self-unzips it without waiting - if the block is being unzipped in parallel, it waits only for that unzip to finish - if the block has already been unzipped, it takes it This is supposed to cancel a part of the unzipping latency, at the expenses of cpu time. The default parameters are the same of the prev version, i.e. 20% of the TTreeCache cache size. To change it use TTreeCache::SetUnzipBufferSize(Long64_t bufferSize) where bufferSize must be passed in bytes.
TTreeCacheUnzip() | |
TTreeCacheUnzip(TTree* tree, Int_t buffersize = 0) | |
virtual | ~TTreeCacheUnzip() |
void | TObject::AbstractMethod(const char* method) const |
virtual void | AddBranch(TBranch* b, Bool_t subbranches = kFALSE) |
virtual void | AddBranch(const char* branch, Bool_t subbranches = kFALSE) |
virtual void | TObject::AppendPad(Option_t* option = "") |
virtual void | TObject::Browse(TBrowser* b) |
static TClass* | Class() |
virtual const char* | TObject::ClassName() const |
virtual void | TObject::Clear(Option_t* = "") |
virtual TObject* | TObject::Clone(const char* newname = "") const |
virtual Int_t | TObject::Compare(const TObject* obj) const |
virtual void | TObject::Copy(TObject& object) const |
virtual void | TObject::Delete(Option_t* option = "")MENU |
virtual Int_t | TObject::DistancetoPrimitive(Int_t px, Int_t py) |
virtual void | TObject::Draw(Option_t* option = "") |
virtual void | TObject::DrawClass() constMENU |
virtual TObject* | TObject::DrawClone(Option_t* option = "") constMENU |
virtual void | TObject::Dump() constMENU |
virtual void | TObject::Error(const char* method, const char* msgfmt) const |
virtual void | TObject::Execute(const char* method, const char* params, Int_t* error = 0) |
virtual void | TObject::Execute(TMethod* method, TObjArray* params, Int_t* error = 0) |
virtual void | TObject::ExecuteEvent(Int_t event, Int_t px, Int_t py) |
virtual void | TObject::Fatal(const char* method, const char* msgfmt) const |
virtual Bool_t | FillBuffer() |
virtual TObject* | TObject::FindObject(const char* name) const |
virtual TObject* | TObject::FindObject(const TObject* obj) const |
virtual Int_t | TFileCacheRead::GetBufferSize() const |
virtual Option_t* | TObject::GetDrawOption() const |
static Long_t | TObject::GetDtorOnly() |
Double_t | TTreeCache::GetEfficiency() |
Double_t | TTreeCache::GetEfficiencyRel() |
virtual const char* | TObject::GetIconName() const |
static Int_t | TTreeCache::GetLearnEntries() |
virtual const char* | TObject::GetName() const |
Int_t | GetNFound() |
Int_t | GetNMissed() |
Int_t | GetNUnzip() |
virtual char* | TObject::GetObjectInfo(Int_t px, Int_t py) const |
static Bool_t | TObject::GetObjectStat() |
virtual Option_t* | TObject::GetOption() const |
TTree* | TTreeCache::GetOwner() const |
static TTreeCacheUnzip::EParUnzipMode | GetParallelUnzip() |
Int_t | GetRecordHeader(char* buf, Int_t maxbytes, Int_t& nbytes, Int_t& objlen, Int_t& keylen) |
virtual const char* | TObject::GetTitle() const |
TTree* | TTreeCache::GetTree() const |
virtual UInt_t | TObject::GetUniqueID() const |
virtual Int_t | GetUnzipBuffer(char** buf, Long64_t pos, Int_t len, Bool_t* free) |
virtual Bool_t | TObject::HandleTimer(TTimer* timer) |
virtual ULong_t | TObject::Hash() const |
virtual void | TObject::Info(const char* method, const char* msgfmt) const |
virtual Bool_t | TObject::InheritsFrom(const char* classname) const |
virtual Bool_t | TObject::InheritsFrom(const TClass* cl) const |
virtual void | TObject::Inspect() constMENU |
void | TObject::InvertBit(UInt_t f) |
virtual TClass* | IsA() const |
Bool_t | IsActiveThread() |
virtual Bool_t | TFileCacheRead::IsAsyncReading() const |
virtual Bool_t | TObject::IsEqual(const TObject* obj) const |
virtual Bool_t | TObject::IsFolder() const |
virtual Bool_t | TTreeCache::IsLearning() const |
Bool_t | TObject::IsOnHeap() const |
static Bool_t | IsParallelUnzip() |
Bool_t | IsQueueEmpty() |
virtual Bool_t | TObject::IsSortable() const |
Bool_t | TObject::IsZombie() const |
virtual void | TObject::ls(Option_t* option = "") const |
void | TObject::MayNotUse(const char* method) const |
virtual Bool_t | TObject::Notify() |
static void | TObject::operator delete(void* ptr) |
static void | TObject::operator delete(void* ptr, void* vp) |
static void | TObject::operator delete[](void* ptr) |
static void | TObject::operator delete[](void* ptr, void* vp) |
void* | TObject::operator new(size_t sz) |
void* | TObject::operator new(size_t sz, void* vp) |
void* | TObject::operator new[](size_t sz) |
void* | TObject::operator new[](size_t sz, void* vp) |
virtual void | TObject::Paint(Option_t* option = "") |
virtual void | TObject::Pop() |
virtual void | TFileCacheRead::Prefetch(Long64_t pos, Int_t len) |
virtual void | Print(Option_t* option = "") const |
virtual Int_t | TObject::Read(const char* name) |
virtual Int_t | TTreeCache::ReadBuffer(char* buf, Long64_t pos, Int_t len) |
virtual Int_t | ReadBufferExt(char* buf, Long64_t pos, Int_t len, Int_t& loc) |
virtual void | TObject::RecursiveRemove(TObject* obj) |
void | TObject::ResetBit(UInt_t f) |
virtual void | ResetCache() |
virtual void | TObject::SaveAs(const char* filename = "", Option_t* option = "") constMENU |
virtual void | TObject::SavePrimitive(basic_ostream<char,char_traits<char> >& out, Option_t* option = "") |
void | SendUnzipStartSignal(Bool_t broadcast) |
void | TObject::SetBit(UInt_t f) |
void | TObject::SetBit(UInt_t f, Bool_t set) |
virtual void | TObject::SetDrawOption(Option_t* option = "")MENU |
static void | TObject::SetDtorOnly(void* obj) |
virtual void | SetEntryRange(Long64_t emin, Long64_t emax) |
virtual void | TFileCacheRead::SetFile(TFile* file) |
static void | TTreeCache::SetLearnEntries(Int_t n = 10) |
static void | TObject::SetObjectStat(Bool_t stat) |
static Int_t | SetParallelUnzip(TTreeCacheUnzip::EParUnzipMode option = TTreeCacheUnzip::kEnable) |
virtual void | TFileCacheRead::SetSkipZip(Bool_t = kTRUE) |
virtual void | TObject::SetUniqueID(UInt_t uid) |
void | SetUnzipBufferSize(Long64_t bufferSize) |
static void | SetUnzipRelBufferSize(Float_t relbufferSize) |
virtual void | ShowMembers(TMemberInspector& insp, char* parent) |
virtual void | TFileCacheRead::Sort() |
void | TTreeCache::StartLearningPhase() |
virtual void | StopLearningPhase() |
virtual void | Streamer(TBuffer& b) |
void | StreamerNVirtual(TBuffer& b) |
virtual void | TObject::SysError(const char* method, const char* msgfmt) const |
Bool_t | TObject::TestBit(UInt_t f) const |
Int_t | TObject::TestBits(UInt_t f) const |
Int_t | UnzipBuffer(char** dest, char* src) |
Int_t | UnzipCache(Int_t& startindex, Int_t& locbuffsz, char*& locbuff) |
static void* | UnzipLoop(void* arg) |
virtual void | UpdateBranches(TTree* tree, Bool_t owner = kFALSE) |
virtual void | TObject::UseCurrentStyle() |
void | WaitUnzipStartSignal() |
virtual void | TObject::Warning(const char* method, const char* msgfmt) const |
virtual Int_t | TObject::Write(const char* name = 0, Int_t option = 0, Int_t bufsize = 0) |
virtual Int_t | TObject::Write(const char* name = 0, Int_t option = 0, Int_t bufsize = 0) const |
virtual void | TObject::DoError(int level, const char* location, const char* fmt, va_list va) const |
void | TObject::MakeZombie() |
TTreeCacheUnzip(const TTreeCacheUnzip&) | |
void | Init() |
TTreeCacheUnzip& | operator=(const TTreeCacheUnzip&) |
Int_t | StartThreadUnzip(Int_t nthreads) |
Int_t | StopThreadUnzip() |
enum EParUnzipMode { | kEnable | |
kDisable | ||
kForce | ||
}; | ||
enum TObject::EStatusBits { | kCanDelete | |
kMustCleanup | ||
kObjInCanvas | ||
kIsReferenced | ||
kHasUUID | ||
kCannotPick | ||
kNoContextMenu | ||
kInvalidObject | ||
}; | ||
enum TObject::[unnamed] { | kIsOnHeap | |
kNotDeleted | ||
kZombie | ||
kBitMask | ||
kSingleKey | ||
kOverwrite | ||
kWriteDelete | ||
}; |
queue<Int_t> | fActiveBlks | The blocks which are active now |
Bool_t | fActiveThread | Used to terminate gracefully the unzippers |
Bool_t | fAsyncReading | |
Bool_t | TFileCacheRead::fAsyncReading | |
Int_t | fBlocksToGo | |
TList* | TTreeCache::fBrNames | ! list of branch names in the cache |
TObjArray* | TTreeCache::fBranches | ! List of branches to be stored in the cache |
char* | TFileCacheRead::fBuffer | [fBufferSize] buffer of contiguous prefetched blocks |
Int_t | TFileCacheRead::fBufferLen | Current buffer length (<= fBufferSize) |
Int_t | TFileCacheRead::fBufferSize | Allocated size of fBuffer (at a given time) |
Int_t | TFileCacheRead::fBufferSizeMin | Original size of fBuffer |
Int_t | fCycle | |
Long64_t | TTreeCache::fEntryCurrent | ! current lowest entry number in the cache |
Long64_t | TTreeCache::fEntryMax | ! last entry in the cache |
Long64_t | TTreeCache::fEntryMin | ! first entry in the cache |
Long64_t | TTreeCache::fEntryNext | ! next entry number where cache must be filled |
TFile* | TFileCacheRead::fFile | Pointer to file |
TMutex* | fIOMutex | |
Bool_t | TTreeCache::fIsLearning | ! true if cache is in learning mode |
Bool_t | TTreeCache::fIsManual | ! true if cache is StopLearningPhase was used |
Bool_t | TFileCacheRead::fIsSorted | True if fSeek array is sorted |
Bool_t | TFileCacheRead::fIsTransferred | True when fBuffer contains something valid |
Int_t | fLastReadPos | |
Int_t* | TFileCacheRead::fLen | [fNb] Length of long buffers |
TMutex* | fMutexList | Mutex to protect the various lists. Used by the condvars. |
Int_t | fNFound | ! number of blocks that were found in the cache |
Int_t | fNMissed | ! number of blocks that were not found in the cache and were unzipped |
Int_t | TTreeCache::fNReadMiss | Number of blocks read and not found in the chache |
Int_t | TTreeCache::fNReadOk | Number of blocks read and found in the cache |
Int_t | TTreeCache::fNReadPref | Number of blocks that were prefetched |
Int_t | fNStalls | ! number of hits which caused a stall |
Int_t | fNUnzip | ! number of blocks that were unzipped |
Int_t | TFileCacheRead::fNb | Number of long buffers |
Int_t | TTreeCache::fNbranches | ! Number of branches in the cache |
Int_t | TFileCacheRead::fNseek | Number of blocks to be prefetched |
Int_t | fNseekMax | ! fNseek can change so we need to know its max size |
Int_t | TFileCacheRead::fNtot | Total size of prefetched blocks |
TTree* | TTreeCache::fOwner | ! pointer to the owner Tree/chain |
Bool_t | fParallel | Indicate if we want to activate the parallelism (for this instance) |
Long64_t* | TFileCacheRead::fPos | [fNb] start of long buffers |
Long64_t* | TFileCacheRead::fSeek | [fNseek] Position on file of buffers to be prefetched |
Int_t* | TFileCacheRead::fSeekIndex | [fNseek] sorted index table of fSeek |
Int_t* | TFileCacheRead::fSeekLen | [fNseek] Length of buffers to be prefetched |
Int_t* | TFileCacheRead::fSeekPos | [fNseek] Position of sorted blocks in fBuffer |
Int_t | TFileCacheRead::fSeekSize | Allocated size of fSeek |
Long64_t* | TFileCacheRead::fSeekSort | [fNseek] Position on file of buffers to be prefetched (sorted) |
Int_t* | TFileCacheRead::fSeekSortLen | [fNseek] Length of buffers to be prefetched (sorted) |
Long64_t | fTotalUnzipBytes | ! The total sum of the currently unzipped blks |
TTree* | TTreeCache::fTree | ! pointer to the current Tree |
Long64_t | fUnzipBufferSize | ! Max Size for the ready unzipped blocks (default is 2*fBufferSize) |
char** | fUnzipChunks | ! [fNseek] Individual unzipped chunks. Their summed size is kept under control. |
TCondition* | fUnzipDoneCondition | Used to wait for an unzip tour to finish. Gives the Async feel. |
Int_t* | fUnzipLen | ! [fNseek] Length of the unzipped buffers |
TCondition* | fUnzipStartCondition | Used to signal the threads to start. |
Byte_t* | fUnzipStatus | ! [fNSeek] For each blk, tells us if it's unzipped or pending |
TThread* | fUnzipThread[10] | |
Long64_t | TTreeCache::fZipBytes | ! Total compressed size of branches in cache |
static Int_t | TTreeCache::fgLearnEntries | Number of entries used for learning mode |
static TTreeCacheUnzip::EParUnzipMode | fgParallel | Indicate if we want to activate the parallelism |
static Double_t | fgRelBuffSize | This is the percentage of the TTreeCacheUnzip that will be used |
char* | fCompBuffer | |
Int_t | fCompBufferSize |
Set the minimum and maximum entry number to be processed this information helps to optimize the number of baskets to read when prefetching the branch buffers.
It's the same as TTreeCache::StopLearningPhase but we guarantee that we start the unzipping just after getting the buffers
update pointer to current Tree and recompute pointers to the branches in the cache
Static function that tells wether the multithreading unzipping is activated
This will send the signal corresponfing to the queue... normally used when we want to start processing the list of buffers.
Static function that(de)activates multithreading unzipping The possible options are: kEnable _Enable_ it, which causes an automatic detection and launches the additional thread if the number of cores in the machine is greater than one kDisable _Disable_ will not activate the additional thread. kForce _Force_ will start the additional thread even if there is only one core. the default will be taken as kEnable. returns 0 if there was an error, 1 otherwise.
The Thread is only a part of the TTreeCache but it is the part that waits for info in the queue and process it... unfortunatly, a Thread is not an object an we have to deal with it in the old C-Style way Returns 0 if the thread was initialized or 1 if it was already running
To stop the thread we only need to change the value of the variable fActiveThread to false and the loop will stop (of course, we will have) to do the cleaning after that. Note: The syncronization part is important here or we will try to delete teh object while it's still processing the queue
This is a static function. This is the call that will be executed in the Thread generated by StartThreadTreeCacheUnzip... what we want to do is to inflate the next series of buffers leaving them in the second cache. Returns 0 when it finishes
Read the logical record header from the buffer buf. That must be the pointer tho the header part not the object by itself and must contain data of at least maxbytes Returns nread; In output arguments: nbytes : number of bytes in record if negative, this is a deleted record if 0, cannot read record, wrong value of argument first objlen : uncompressed object size keylen : length of logical record header Note that the arguments objlen and keylen are returned only if maxbytes >=16 Note: This was adapted from TFile... so some things dont apply
This will delete the list of buffers that are in the unzipping cache and will reset certain values in the cache. This name is ambiguos because the method doesn't reset the whole cache, only the part related to the unzipping Note: This method is completely different from TTreeCache::ResetCache(), in that method we were cleaning the prefetching buffer while here we delete the information about the unzipped buffers
We try to read a buffer that has already been unzipped Returns -1 in case of read failure, 0 in case it's not in the cache and n>0 in case read from cache (number of bytes copied). pos and len are the original values as were passed to ReadBuffer but instead we will return the inflated buffer. Note!! : If *buf == 0 we will allocate the buffer and it will be the responsability of the caller to free it... it is useful for example to pass it to the creator of TBuffer
static function: Sets the unzip relatibe buffer size FABRIZIO: PLEASE DOCUMENT and also in TTree::Set...
Sets the size for the unzipping cache... by default it should be two times the size of the prefetching cache
UNzips a ROOT specific buffer... by reading the header at the beginning. returns the size of the inflated buffer or -1 if error Note!! : If *dest == 0 we will allocate the buffer and it will be the responsability of the caller to free it... it is useful for example to pass it to the creator of TBuffer src is the original buffer with the record (header+compressed data) *dest is the inflated buffer (including the header)
This inflates all the buffers in the cache.. passing the data to a new buffer that will only wait there to be read... We can not inflate all the buffers in the cache so we will try to do it until the cache gets full... there is a member called fUnzipBufferSize which will tell us the max size we can allocate for this cache. note that we will unzip in the order they were put into the cache not the order of the transference so it has to be read in that order or the pre-unzipping will be useless. startindex is used as start index to check for blks to be unzipped returns 0 in normal conditions or -1 if error, 1 if it would like to sleep This func is supposed to compete among an indefinite number of threads to get a chunk to inflate in order to accommodate multiple unzippers Since everything is so async, we cannot use a fixed buffer, we are forced to keep the individual chunks as separate blocks, whose summed size does not exceed the maximum allowed. The pointers are kept globally in the array fUnzipChunks