Logo ROOT   6.08/07
Reference Guide
TTreeCacheUnzip.cxx
Go to the documentation of this file.
1 // @(#)root/tree:$Id$
2 // Author: Leandro Franco 10/04/2008
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 /** \class TTreeCacheUnzip
13 \ingroup tree
14 
15 Specialization of TTreeCache for parallel Unzipping.
16 
17 Fabrizio Furano (CERN) Aug 2009
18 Core TTree-related code borrowed from the previous version
19  by Leandro Franco and Rene Brun
20 
21 ## Parallel Unzipping
22 
23 TTreeCache has been specialised in order to let additional threads
24 free to unzip in advance its content. In this implementation we
25 support up to 10 threads, but right now it makes more sense to
26 limit their number to 1-2
27 
28 The application reading data is carefully synchronized, in order to:
29  - if the block it wants is not unzipped, it self-unzips it without
30  waiting
31  - if the block is being unzipped in parallel, it waits only
32  for that unzip to finish
33  - if the block has already been unzipped, it takes it
34 
35 This is supposed to cancel a part of the unzipping latency, at the
36 expenses of cpu time.
37 
38 The default parameters are the same of the prev version, i.e. 20%
39 of the TTreeCache cache size. To change it use
40 TTreeCache::SetUnzipBufferSize(Long64_t bufferSize)
41 where bufferSize must be passed in bytes.
42 */
43 
44 #include "TTreeCacheUnzip.h"
45 #include "TChain.h"
46 #include "TBranch.h"
47 #include "TFile.h"
48 #include "TEventList.h"
49 #include "TMutex.h"
50 #include "TVirtualMutex.h"
51 #include "TThread.h"
52 #include "TCondition.h"
53 #include "TMath.h"
54 #include "Bytes.h"
55 
56 #include "TEnv.h"
57 
58 #define THREADCNT 2
59 extern "C" void R__unzip(Int_t *nin, UChar_t *bufin, Int_t *lout, char *bufout, Int_t *nout);
60 extern "C" int R__unzip_header(Int_t *nin, UChar_t *bufin, Int_t *lout);
61 
63 
64 // The unzip cache does not consume memory by itself, it just allocates in advance
65 // mem blocks which are then picked as they are by the baskets.
66 // Hence there is no good reason to limit it too much
68 
70 
71 ////////////////////////////////////////////////////////////////////////////////
72 
74 
75  fActiveThread(kFALSE),
76  fAsyncReading(kFALSE),
77  fCycle(0),
78  fLastReadPos(0),
79  fBlocksToGo(0),
80  fUnzipLen(0),
81  fUnzipChunks(0),
82  fUnzipStatus(0),
83  fTotalUnzipBytes(0),
84  fNseekMax(0),
85  fUnzipBufferSize(0),
86  fNUnzip(0),
87  fNFound(0),
88  fNStalls(0),
89  fNMissed(0)
90 
91 {
92  // Default Constructor.
93 
94  Init();
95 }
96 
97 ////////////////////////////////////////////////////////////////////////////////
98 /// Constructor.
99 
100 TTreeCacheUnzip::TTreeCacheUnzip(TTree *tree, Int_t buffersize) : TTreeCache(tree,buffersize),
101  fActiveThread(kFALSE),
102  fAsyncReading(kFALSE),
103  fCycle(0),
104  fLastReadPos(0),
105  fBlocksToGo(0),
106  fUnzipLen(0),
107  fUnzipChunks(0),
108  fUnzipStatus(0),
109  fTotalUnzipBytes(0),
110  fNseekMax(0),
111  fUnzipBufferSize(0),
112  fNUnzip(0),
113  fNFound(0),
114  fNStalls(0),
115  fNMissed(0)
116 {
117  Init();
118 }
119 
120 ////////////////////////////////////////////////////////////////////////////////
121 /// Initialization procedure common to all the constructors.
122 
124 {
125  fMutexList = new TMutex(kTRUE);
126  fIOMutex = new TMutex(kTRUE);
127 
130 
131  fTotalUnzipBytes = 0;
132 
133  fCompBuffer = new char[16384];
134  fCompBufferSize = 16384;
135 
136  if (fgParallel == kDisable) {
137  fParallel = kFALSE;
138  }
139  else if(fgParallel == kEnable || fgParallel == kForce) {
140  SysInfo_t info;
141  gSystem->GetSysInfo(&info);
142 
144 
145  if(gDebug > 0)
146  Info("TTreeCacheUnzip", "Enabling Parallel Unzipping");
147 
148  fParallel = kTRUE;
149 
150  for (Int_t i = 0; i < 10; i++) fUnzipThread[i] = 0;
151 
153 
154  }
155  else {
156  Warning("TTreeCacheUnzip", "Parallel Option unknown");
157  }
158 
159  // Check if asynchronous reading is supported by this TFile specialization
160  if (gEnv->GetValue("TFile.AsyncReading", 1)) {
161  if (fFile && !(fFile->ReadBufferAsync(0, 0)))
163  }
164 
165 }
166 
167 ////////////////////////////////////////////////////////////////////////////////
168 /// Destructor. (in general called by the TFile destructor)
169 
171 {
172  ResetCache();
173 
174  if (IsActiveThread())
175  StopThreadUnzip();
176 
177  delete [] fUnzipLen;
178 
179  delete fUnzipStartCondition;
180  delete fUnzipDoneCondition;
181 
182  delete fMutexList;
183  delete fIOMutex;
184 
185  delete [] fUnzipStatus;
186  delete [] fUnzipChunks;
187 }
188 
189 ////////////////////////////////////////////////////////////////////////////////
190 /// Add a branch to the list of branches to be stored in the cache
191 /// this function is called by TBranch::GetBasket
192 /// Returns:
193 /// - 0 branch added or already included
194 /// - -1 on error
195 
196 Int_t TTreeCacheUnzip::AddBranch(TBranch *b, Bool_t subbranches /*= kFALSE*/)
197 {
199 
200  return TTreeCache::AddBranch(b, subbranches);
201 }
202 
203 ////////////////////////////////////////////////////////////////////////////////
204 /// Add a branch to the list of branches to be stored in the cache
205 /// this function is called by TBranch::GetBasket
206 /// Returns:
207 /// - 0 branch added or already included
208 /// - -1 on error
209 
210 Int_t TTreeCacheUnzip::AddBranch(const char *branch, Bool_t subbranches /*= kFALSE*/)
211 {
213 
214  return TTreeCache::AddBranch(branch, subbranches);
215 }
216 
217 ////////////////////////////////////////////////////////////////////////////////
218 
220 {
221  if (fNbranches <= 0) return kFALSE;
222  {
223  // Fill the cache buffer with the branches in the cache.
226 
228  Long64_t entry = tree->GetReadEntry();
229 
230  // If the entry is in the range we previously prefetched, there is
231  // no point in retrying. Note that this will also return false
232  // during the training phase (fEntryNext is then set intentional to
233  // the end of the training phase).
234  if (fEntryCurrent <= entry && entry < fEntryNext) return kFALSE;
235 
236  // Triggered by the user, not the learning phase
237  if (entry == -1) entry=0;
238 
239  TTree::TClusterIterator clusterIter = tree->GetClusterIterator(entry);
240  fEntryCurrent = clusterIter();
241  fEntryNext = clusterIter.GetNextEntry();
242 
244  if (fEntryMax <= 0) fEntryMax = tree->GetEntries();
246 
247  // Check if owner has a TEventList set. If yes we optimize for this
248  // Special case reading only the baskets containing entries in the
249  // list.
250  TEventList *elist = fTree->GetEventList();
251  Long64_t chainOffset = 0;
252  if (elist) {
253  if (fTree->IsA() ==TChain::Class()) {
254  TChain *chain = (TChain*)fTree;
255  Int_t t = chain->GetTreeNumber();
256  chainOffset = chain->GetTreeOffset()[t];
257  }
258  }
259 
260  //clear cache buffer
262 
263  //store baskets
264  for (Int_t i=0;i<fNbranches;i++) {
266  if (b->GetDirectory()==0) continue;
267  if (b->GetDirectory()->GetFile() != fFile) continue;
268  Int_t nb = b->GetMaxBaskets();
269  Int_t *lbaskets = b->GetBasketBytes();
270  Long64_t *entries = b->GetBasketEntry();
271  if (!lbaskets || !entries) continue;
272  //we have found the branch. We now register all its baskets
273  //from the requested offset to the basket below fEntrymax
274  Int_t blistsize = b->GetListOfBaskets()->GetSize();
275  for (Int_t j=0;j<nb;j++) {
276  // This basket has already been read, skip it
277  if (j<blistsize && b->GetListOfBaskets()->UncheckedAt(j)) continue;
278 
279  Long64_t pos = b->GetBasketSeek(j);
280  Int_t len = lbaskets[j];
281  if (pos <= 0 || len <= 0) continue;
282  //important: do not try to read fEntryNext, otherwise you jump to the next autoflush
283  if (entries[j] >= fEntryNext) continue;
284  if (entries[j] < entry && (j<nb-1 && entries[j+1] <= entry)) continue;
285  if (elist) {
286  Long64_t emax = fEntryMax;
287  if (j<nb-1) emax = entries[j+1]-1;
288  if (!elist->ContainsRange(entries[j]+chainOffset,emax+chainOffset)) continue;
289  }
290  fNReadPref++;
291 
292  TFileCacheRead::Prefetch(pos,len);
293  }
294  if (gDebug > 0) printf("Entry: %lld, registering baskets branch %s, fEntryNext=%lld, fNseek=%d, fNtot=%d\n",entry,((TBranch*)fBranches->UncheckedAt(i))->GetName(),fEntryNext,fNseek,fNtot);
295  }
296 
297  // Now fix the size of the status arrays
298  ResetCache();
299 
301 
302  }
303 
304  return kTRUE;
305 }
306 
307 ////////////////////////////////////////////////////////////////////////////////
308 /// Change the underlying buffer size of the cache.
309 /// Returns:
310 /// - 0 if the buffer content is still available
311 /// - 1 if some or all of the buffer content has been made unavailable
312 /// - -1 on error
313 
315 {
317 
318  Int_t res = TTreeCache::SetBufferSize(buffersize);
319  if (res < 0) {
320  return res;
321  }
323  ResetCache();
324  return 1;
325 }
326 
327 ////////////////////////////////////////////////////////////////////////////////
328 /// Set the minimum and maximum entry number to be processed
329 /// this information helps to optimize the number of baskets to read
330 /// when prefetching the branch buffers.
331 
333 {
335 
336  TTreeCache::SetEntryRange(emin, emax);
337 }
338 
339 ////////////////////////////////////////////////////////////////////////////////
340 /// It's the same as TTreeCache::StopLearningPhase but we guarantee that
341 /// we start the unzipping just after getting the buffers
342 
344 {
346 
348 
349 }
350 
351 ////////////////////////////////////////////////////////////////////////////////
352 ///update pointer to current Tree and recompute pointers to the branches in the cache
353 
355 {
357 
359 }
360 
361 ////////////////////////////////////////////////////////////////////////////////
362 // //
363 // From now on we have the methods concerning the threading part of the cache //
364 // //
365 ////////////////////////////////////////////////////////////////////////////////
366 
367 ////////////////////////////////////////////////////////////////////////////////
368 /// Static function that returns the parallel option
369 /// (to indicate an additional thread)
370 
372 {
373  return fgParallel;
374 }
375 
376 ////////////////////////////////////////////////////////////////////////////////
377 /// Static function that tells wether the multithreading unzipping is activated
378 
380 {
381  if (fgParallel == kEnable || fgParallel == kForce)
382  return kTRUE;
383 
384  return kFALSE;
385 }
386 
387 ////////////////////////////////////////////////////////////////////////////////
388 /// This indicates if the thread is active in this moment...
389 /// this variable is very important because if we change it from true to
390 /// false the thread will stop... ( see StopThreadTreeCacheUnzip() )
391 
393 {
395 
396  return fActiveThread;
397 }
398 
399 ////////////////////////////////////////////////////////////////////////////////
400 /// It says if the queue is empty... useful to see if we have to process it.
401 
403 {
405 
406  if ( fIsLearning )
407  return kTRUE;
408 
409  return kFALSE;
410 }
411 
413 {
414  // Here the threads sleep waiting for some blocks to unzip
415 
417 
418 }
419 
420 ////////////////////////////////////////////////////////////////////////////////
421 /// This will send the signal corresponfing to the queue... normally used
422 /// when we want to start processing the list of buffers.
423 
425 {
426  if (gDebug > 0) Info("SendSignal", " fUnzipCondition->Signal()");
427 
428  if (broadcast)
430  else
432 }
433 
434 ////////////////////////////////////////////////////////////////////////////////
435 /// Static function that (de)activates multithreading unzipping
436 ///
437 /// The possible options are:
438 /// - kEnable _Enable_ it, which causes an automatic detection and launches the
439 /// additional thread if the number of cores in the machine is greater than
440 /// one
441 /// - kDisable _Disable_ will not activate the additional thread.
442 /// - kForce _Force_ will start the additional thread even if there is only one
443 /// core. the default will be taken as kEnable.
444 ///
445 /// Returns 0 if there was an error, 1 otherwise.
446 
448 {
450  fgParallel = option;
451  return 1;
452  }
453  return 0;
454 }
455 
456 class TTreeCacheUnzipData {
457 public:
458  TTreeCacheUnzip *fInstance;
459  Int_t fCount;
460 };
461 
462 ////////////////////////////////////////////////////////////////////////////////
463 /// The Thread is only a part of the TTreeCache but it is the part that
464 /// waits for info in the queue and process it... unfortunatly, a Thread is
465 /// not an object an we have to deal with it in the old C-Style way
466 /// Returns 0 if the thread was initialized or 1 if it was already running
467 
469 {
470  Int_t nt = nthreads;
471  if (nt > 10) nt = 10;
472 
473  if (gDebug > 0)
474  Info("StartThreadUnzip", "Going to start %d threads.", nt);
475 
476  for (Int_t i = 0; i < nt; i++) {
477  if (!fUnzipThread[i]) {
478  TString nm("UnzipLoop");
479  nm += i;
480 
481  if (gDebug > 0)
482  Info("StartThreadUnzip", "Going to start thread '%s'", nm.Data());
483 
484  TTreeCacheUnzipData *d = new TTreeCacheUnzipData;
485  d->fInstance = this;
486  d->fCount = i;
487 
488  fUnzipThread[i] = new TThread(nm.Data(), UnzipLoop, (void*)d);
489  if (!fUnzipThread[i])
490  Error("TTreeCacheUnzip::StartThreadUnzip", " Unable to create new thread.");
491 
492  fUnzipThread[i]->Run();
493 
494  // There is at least one active thread
496 
497  }
498  }
499 
500  return (fActiveThread == kTRUE);
501 }
502 
503 ////////////////////////////////////////////////////////////////////////////////
504 /// To stop the thread we only need to change the value of the variable
505 /// fActiveThread to false and the loop will stop (of course, we will have)
506 /// to do the cleaning after that.
507 ///
508 /// Note: The syncronization part is important here or we will try to delete
509 /// teh object while it's still processing the queue
510 
512 {
514 
515  for (Int_t i = 0; i < 1; i++) {
516  if(fUnzipThread[i]){
517 
519 
520  if (fUnzipThread[i]->Exists()) {
521  fUnzipThread[i]->Join();
522  delete fUnzipThread[i];
523  }
524  }
525 
526  }
527 
528  return 1;
529 }
530 
531 ////////////////////////////////////////////////////////////////////////////////
532 /// This is a static function.
533 ///
534 /// This is the call that will be executed in the Thread generated by
535 /// StartThreadTreeCacheUnzip... what we want to do is to inflate the next
536 /// series of buffers leaving them in the second cache.
537 ///
538 /// Returns 0 when it finishes
539 
541 {
542  TTreeCacheUnzipData *d = (TTreeCacheUnzipData *)arg;
543  TTreeCacheUnzip *unzipMng = d->fInstance;
544 
547 
548  Int_t thrnum = d->fCount;
549  Int_t startindex = thrnum;
550  Int_t locbuffsz = 16384;
551  char *locbuff = new char[16384];
552  Int_t res = 0;
553  Int_t myCycle = 0;
554 
555  while( unzipMng->IsActiveThread() ) {
556  res = 1;
557 
558  {
559  R__LOCKGUARD(unzipMng->fMutexList);
560  if (myCycle != unzipMng->fCycle) startindex = thrnum;
561  myCycle = unzipMng->fCycle;
562  if (unzipMng->fNseek) startindex = startindex % unzipMng->fNseek;
563  else startindex = -1;
564  }
565 
566  if (startindex >= 0)
567  res = unzipMng->UnzipCache(startindex, locbuffsz, locbuff);
568 
569  {
570  R__LOCKGUARD(unzipMng->fMutexList);
571 
572  if(!unzipMng->IsActiveThread()) break;
573 
574  if ((res == 1) || (!unzipMng->fIsTransferred)) {
575  unzipMng->WaitUnzipStartSignal();
576  startindex = unzipMng->fLastReadPos+3+thrnum;
577  }
578  }
579 
580  }
581 
582  delete d;
583  delete [] locbuff;
584  return (void *)0;
585 }
586 
587 ////////////////////////////////////////////////////////////////////////////////
588 // //
589 // From now on we have the methods concerning the unzipping part of the cache //
590 // //
591 ////////////////////////////////////////////////////////////////////////////////
592 
593 ////////////////////////////////////////////////////////////////////////////////
594 /// Read the logical record header from the buffer buf.
595 /// That must be the pointer tho the header part not the object by itself and
596 /// must contain data of at least maxbytes
597 /// Returns nread;
598 ///
599 /// In output arguments:
600 ///
601 /// - nbytes : number of bytes in record
602 /// if negative, this is a deleted record
603 /// if 0, cannot read record, wrong value of argument first
604 /// - objlen : uncompressed object size
605 /// - keylen : length of logical record header
606 ///
607 /// Note that the arguments objlen and keylen are returned only
608 /// if maxbytes >=16
609 /// Note: This was adapted from TFile... so some things dont apply
610 
611 Int_t TTreeCacheUnzip::GetRecordHeader(char *buf, Int_t maxbytes, Int_t &nbytes, Int_t &objlen, Int_t &keylen)
612 {
613  Version_t versionkey;
614  Short_t klen;
615  UInt_t datime;
616  Int_t nb = 0,olen;
617  Int_t nread = maxbytes;
618  frombuf(buf,&nb);
619  nbytes = nb;
620  if (nb < 0) return nread;
621  // const Int_t headerSize = Int_t(sizeof(nb) +sizeof(versionkey) +sizeof(olen) +sizeof(datime) +sizeof(klen));
622  const Int_t headerSize = 16;
623  if (nread < headerSize) return nread;
624  frombuf(buf, &versionkey);
625  frombuf(buf, &olen);
626  frombuf(buf, &datime);
627  frombuf(buf, &klen);
628  if (!olen) olen = nbytes-klen;
629  objlen = olen;
630  keylen = klen;
631  return nread;
632 }
633 
634 ////////////////////////////////////////////////////////////////////////////////
635 /// This will delete the list of buffers that are in the unzipping cache
636 /// and will reset certain values in the cache.
637 /// This name is ambiguos because the method doesn't reset the whole cache,
638 /// only the part related to the unzipping
639 /// Note: This method is completely different from TTreeCache::ResetCache(),
640 /// in that method we were cleaning the prefetching buffer while here we
641 /// delete the information about the unzipped buffers
642 
644 {
645  {
647 
648  if (gDebug > 0)
649  Info("ResetCache", "Thread: %ld -- Resetting the cache. fNseek:%d fNSeekMax:%d fTotalUnzipBytes:%lld", TThread::SelfId(), fNseek, fNseekMax, fTotalUnzipBytes);
650 
651  // Reset all the lists and wipe all the chunks
652  fCycle++;
653  for (Int_t i = 0; i < fNseekMax; i++) {
654  if (fUnzipLen) fUnzipLen[i] = 0;
655  if (fUnzipChunks) {
656  if (fUnzipChunks[i]) delete [] fUnzipChunks[i];
657  fUnzipChunks[i] = 0;
658  }
659  if (fUnzipStatus) fUnzipStatus[i] = 0;
660 
661  }
662 
663  while (fActiveBlks.size()) fActiveBlks.pop();
664 
665  if(fNseekMax < fNseek){
666  if (gDebug > 0)
667  Info("ResetCache", "Changing fNseekMax from:%d to:%d", fNseekMax, fNseek);
668 
669  Byte_t *aUnzipStatus = new Byte_t[fNseek];
670  memset(aUnzipStatus, 0, fNseek*sizeof(Byte_t));
671 
672  Int_t *aUnzipLen = new Int_t[fNseek];
673  memset(aUnzipLen, 0, fNseek*sizeof(Int_t));
674 
675  char **aUnzipChunks = new char *[fNseek];
676  memset(aUnzipChunks, 0, fNseek*sizeof(char *));
677 
678  if (fUnzipStatus) delete [] fUnzipStatus;
679  if (fUnzipLen) delete [] fUnzipLen;
680  if (fUnzipChunks) delete [] fUnzipChunks;
681 
682  fUnzipStatus = aUnzipStatus;
683  fUnzipLen = aUnzipLen;
684  fUnzipChunks = aUnzipChunks;
685 
686  fNseekMax = fNseek;
687  }
688 
689  fLastReadPos = 0;
690  fTotalUnzipBytes = 0;
692  }
693 
695 
696 }
697 
698 ////////////////////////////////////////////////////////////////////////////////
699 /// We try to read a buffer that has already been unzipped
700 /// Returns -1 in case of read failure, 0 in case it's not in the
701 /// cache and n>0 in case read from cache (number of bytes copied).
702 /// pos and len are the original values as were passed to ReadBuffer
703 /// but instead we will return the inflated buffer.
704 /// Note!! : If *buf == 0 we will allocate the buffer and it will be the
705 /// responsability of the caller to free it... it is useful for example
706 /// to pass it to the creator of TBuffer
707 
709 {
710  Int_t res = 0;
711  Int_t loc = -1;
712 
713  {
715 
716  // We go straight to TTreeCache/TfileCacheRead, in order to get the info we need
717  // pointer to the original zipped chunk
718  // its index in the original unsorted offsets lists
719  //
720  // Actually there are situations in which copying the buffer is not
721  // useful. But the choice is among doing once more a small memcpy or a binary search in a large array. I prefer the former.
722  // Also, here we prefer not to trigger the (re)population of the chunks in the TFileCacheRead. That is
723  // better to be done in the main thread.
724 
725  // And now loc is the position of the chunk in the array of the sorted chunks
726  Int_t myCycle = fCycle;
727 
728  if (fParallel && !fIsLearning) {
729 
730  if(fNseekMax < fNseek){
731  if (gDebug > 0)
732  Info("GetUnzipBuffer", "Changing fNseekMax from:%d to:%d", fNseekMax, fNseek);
733 
734  Byte_t *aUnzipStatus = new Byte_t[fNseek];
735  memset(aUnzipStatus, 0, fNseek*sizeof(Byte_t));
736 
737  Int_t *aUnzipLen = new Int_t[fNseek];
738  memset(aUnzipLen, 0, fNseek*sizeof(Int_t));
739 
740  char **aUnzipChunks = new char *[fNseek];
741  memset(aUnzipChunks, 0, fNseek*sizeof(char *));
742 
743  for (Int_t i = 0; i < fNseekMax; i++) {
744  aUnzipStatus[i] = fUnzipStatus[i];
745  aUnzipLen[i] = fUnzipLen[i];
746  aUnzipChunks[i] = fUnzipChunks[i];
747  }
748 
749  if (fUnzipStatus) delete [] fUnzipStatus;
750  if (fUnzipLen) delete [] fUnzipLen;
751  if (fUnzipChunks) delete [] fUnzipChunks;
752 
753  fUnzipStatus = aUnzipStatus;
754  fUnzipLen = aUnzipLen;
755  fUnzipChunks = aUnzipChunks;
756 
757  fNseekMax = fNseek;
758  }
759 
761  if ( (fCycle == myCycle) && (loc >= 0) && (loc < fNseek) && (pos == fSeekSort[loc]) ) {
762 
763  // The buffer is, at minimum, in the file cache. We must know its index in the requests list
764  // In order to get its info
765  Int_t seekidx = fSeekIndex[loc];
766 
767  fLastReadPos = seekidx;
768 
769  do {
770 
771  // If the block is ready we get it immediately.
772  // And also we don't have to alloc the blks. This is supposed to be
773  // the main thread of the app.
774  if ((fUnzipStatus[seekidx] == 2) && (fUnzipChunks[seekidx]) && (fUnzipLen[seekidx] > 0)) {
775 
776  //if (gDebug > 0)
777  // Info("GetUnzipBuffer", "++++++++++++++++++++ CacheHIT Block wanted: %d len:%d req_len:%d fNseek:%d", seekidx, fUnzipLen[seekidx], len, fNseek);
778 
779  if(!(*buf)) {
780  *buf = fUnzipChunks[seekidx];
781  fUnzipChunks[seekidx] = 0;
782  fTotalUnzipBytes -= fUnzipLen[seekidx];
784  *free = kTRUE;
785  }
786  else {
787  memcpy(*buf, fUnzipChunks[seekidx], fUnzipLen[seekidx]);
788  delete fUnzipChunks[seekidx];
789  fTotalUnzipBytes -= fUnzipLen[seekidx];
790  fUnzipChunks[seekidx] = 0;
792  *free = kFALSE;
793  }
794 
795  fNFound++;
796 
797  return fUnzipLen[seekidx];
798  }
799 
800  // If the status of the unzipped chunk is pending
801  // we wait on the condvar, hoping that the next signal is the good one
802  if ( fUnzipStatus[seekidx] == 1 ) {
803  //fMutexList->UnLock();
805  //fMutexList->Lock();
806 
807  if ( myCycle != fCycle ) {
808  if (gDebug > 0)
809  Info("GetUnzipBuffer", "Sudden paging Break!!! IsActiveThread(): %d, fNseek: %d, fIsLearning:%d",
811 
812  fLastReadPos = 0;
813 
814  seekidx = -1;
815  break;
816  }
817 
818  }
819 
820  } while ( fUnzipStatus[seekidx] == 1 );
821 
822  //if (gDebug > 0)
823  // Info("GetUnzipBuffer", "------- Block wanted: %d status: %d len: %d chunk: %llx ", seekidx, fUnzipStatus[seekidx], fUnzipLen[seekidx], fUnzipChunks[seekidx]);
824 
825  // Here the block is not pending. It could be done or aborted or not yet being processed.
826  if ( (seekidx >= 0) && (fUnzipStatus[seekidx] == 2) && (fUnzipChunks[seekidx]) && (fUnzipLen[seekidx] > 0) ) {
827 
828  //if (gDebug > 0)
829  // Info("GetUnzipBuffer", "++++++++++++++++++++ CacheLateHIT Block wanted: %d len:%d fNseek:%d", seekidx, fUnzipLen[seekidx], fNseek);
830 
831  if(!(*buf)) {
832  *buf = fUnzipChunks[seekidx];
833  fUnzipChunks[seekidx] = 0;
834  fTotalUnzipBytes -= fUnzipLen[seekidx];
836  *free = kTRUE;
837  }
838  else {
839  memcpy(*buf, fUnzipChunks[seekidx], fUnzipLen[seekidx]);
840  delete fUnzipChunks[seekidx];
841  fTotalUnzipBytes -= fUnzipLen[seekidx];
842  fUnzipChunks[seekidx] = 0;
844  *free = kFALSE;
845  }
846 
847  fNStalls++;
848 
849  return fUnzipLen[seekidx];
850  }
851  else {
852  // This is a complete miss. We want to avoid the threads
853  // to try unzipping this block in the future.
854  fUnzipStatus[seekidx] = 2;
855  fUnzipChunks[seekidx] = 0;
856 
859 
860  //if (gDebug > 0)
861  // Info("GetUnzipBuffer", "++++++++++++++++++++ CacheMISS Block wanted: %d len:%d fNseek:%d", seekidx, len, fNseek);
862  }
863 
864  } else {
865  loc = -1;
866  //fLastReadPos = 0;
868  }
869 
870  } else {
871  // We need to reset it for new transferences...
872  //ResetCache();
873  //TFileCacheRead::Prefetch(0,0);
874  }
875 
876  } // scope of the lock!
877 
878  if (len > fCompBufferSize) {
879  delete [] fCompBuffer;
880  fCompBuffer = new char[len];
881  fCompBufferSize = len;
882  } else {
883  if (fCompBufferSize > len*4) {
884  delete [] fCompBuffer;
885  fCompBuffer = new char[len*2];
886  fCompBufferSize = len*2;
887  }
888  }
889 
890  {
892  // Here we know that the async unzip of the wanted chunk
893  // was not done for some reason. We continue.
894 
895  res = 0;
896  if (!ReadBufferExt(fCompBuffer, pos, len, loc)) {
897  //Info("GetUnzipBuffer", "++++++++++++++++++++ CacheMISS %d %d", loc, fNseek);
898  fFile->Seek(pos);
899  res = fFile->ReadBuffer(fCompBuffer, len);
900  }
901 
902  if (res) res = -1;
903 
904  } // scope of the lock!
905 
906  if (!res) {
907  res = UnzipBuffer(buf, fCompBuffer);
908  *free = kTRUE;
909  }
910 
911  if (!fIsLearning) {
912  fNMissed++;
913  }
914 
915  return res;
916 
917 }
918 
919 ////////////////////////////////////////////////////////////////////////////////
920 /// static function: Sets the unzip relatibe buffer size
921 
923 {
924  fgRelBuffSize = relbufferSize;
925 }
926 
927 ////////////////////////////////////////////////////////////////////////////////
928 /// Sets the size for the unzipping cache... by default it should be
929 /// two times the size of the prefetching cache
930 
932 {
934 
935  fUnzipBufferSize = bufferSize;
936 }
937 
938 ////////////////////////////////////////////////////////////////////////////////
939 /// Unzips a ROOT specific buffer... by reading the header at the beginning.
940 /// returns the size of the inflated buffer or -1 if error
941 /// Note!! : If *dest == 0 we will allocate the buffer and it will be the
942 /// responsability of the caller to free it... it is useful for example
943 /// to pass it to the creator of TBuffer
944 /// src is the original buffer with the record (header+compressed data)
945 /// *dest is the inflated buffer (including the header)
946 
948 {
949  Int_t uzlen = 0;
950  Bool_t alloc = kFALSE;
951 
952  // Here we read the header of the buffer
953  const Int_t hlen=128;
954  Int_t nbytes=0, objlen=0, keylen=0;
955  GetRecordHeader(src, hlen, nbytes, objlen, keylen);
956 
957  if (!(*dest)) {
958  /* early consistency check */
959  UChar_t *bufcur = (UChar_t *) (src + keylen);
960  Int_t nin, nbuf;
961  if(R__unzip_header(&nin, bufcur, &nbuf)!=0) {
962  Error("UnzipBuffer", "Inconsistency found in header (nin=%d, nbuf=%d)", nin, nbuf);
963  uzlen = -1;
964  return uzlen;
965  }
966  Int_t l = keylen+objlen;
967  *dest = new char[l];
968  alloc = kTRUE;
969  }
970  // Must unzip the buffer
971  // fSeekPos[ind]; adress of zipped buffer
972  // fSeekLen[ind]; len of the zipped buffer
973  // &fBuffer[fSeekPos[ind]]; memory address
974 
975  // This is similar to TBasket::ReadBasketBuffers
976  Bool_t oldCase = objlen==nbytes-keylen
977  && ((TBranch*)fBranches->UncheckedAt(0))->GetCompressionLevel()!=0
978  && fFile->GetVersion()<=30401;
979 
980  if (objlen > nbytes-keylen || oldCase) {
981 
982  // Copy the key
983  memcpy(*dest, src, keylen);
984  uzlen += keylen;
985 
986  char *objbuf = *dest + keylen;
987  UChar_t *bufcur = (UChar_t *) (src + keylen);
988  Int_t nin, nbuf;
989  Int_t nout = 0;
990  Int_t noutot = 0;
991 
992  while (1) {
993  Int_t hc = R__unzip_header(&nin, bufcur, &nbuf);
994  if (hc!=0) break;
995  if (gDebug > 2)
996  Info("UnzipBuffer", " nin:%d, nbuf:%d, bufcur[3] :%d, bufcur[4] :%d, bufcur[5] :%d ",
997  nin, nbuf, bufcur[3], bufcur[4], bufcur[5]);
998  if (oldCase && (nin > objlen || nbuf > objlen)) {
999  if (gDebug > 2)
1000  Info("UnzipBuffer", "oldcase objlen :%d ", objlen);
1001 
1002  //buffer was very likely not compressed in an old version
1003  memcpy( *dest + keylen, src + keylen, objlen);
1004  uzlen += objlen;
1005  return uzlen;
1006  }
1007 
1008  R__unzip(&nin, bufcur, &nbuf, objbuf, &nout);
1009 
1010  if (gDebug > 2)
1011  Info("UnzipBuffer", "R__unzip nin:%d, bufcur:%p, nbuf:%d, objbuf:%p, nout:%d",
1012  nin, bufcur, nbuf, objbuf, nout);
1013 
1014  if (!nout) break;
1015  noutot += nout;
1016  if (noutot >= objlen) break;
1017  bufcur += nin;
1018  objbuf += nout;
1019  }
1020 
1021  if (noutot != objlen) {
1022  Error("UnzipBuffer", "nbytes = %d, keylen = %d, objlen = %d, noutot = %d, nout=%d, nin=%d, nbuf=%d",
1023  nbytes,keylen,objlen, noutot,nout,nin,nbuf);
1024  uzlen = -1;
1025  if(alloc) delete [] *dest;
1026  *dest = 0;
1027  return uzlen;
1028  }
1029  uzlen += objlen;
1030  } else {
1031  memcpy(*dest, src, keylen);
1032  uzlen += keylen;
1033  memcpy(*dest + keylen, src + keylen, objlen);
1034  uzlen += objlen;
1035  }
1036  return uzlen;
1037 }
1038 
1039 ////////////////////////////////////////////////////////////////////////////////
1040 /// This inflates all the buffers in the cache.. passing the data to a new
1041 /// buffer that will only wait there to be read...
1042 /// We can not inflate all the buffers in the cache so we will try to do
1043 /// it until the cache gets full... there is a member called fUnzipBufferSize which will
1044 /// tell us the max size we can allocate for this cache.
1045 ///
1046 /// note that we will unzip in the order they were put into the cache not
1047 /// the order of the transference so it has to be read in that order or the
1048 /// pre-unzipping will be useless.
1049 ///
1050 /// startindex is used as start index to check for blks to be unzipped
1051 ///
1052 /// returns 0 in normal conditions or -1 if error, 1 if it would like to sleep
1053 ///
1054 /// This func is supposed to compete among an indefinite number of threads to get a chunk to inflate
1055 /// in order to accommodate multiple unzippers
1056 /// Since everything is so async, we cannot use a fixed buffer, we are forced to keep
1057 /// the individual chunks as separate blocks, whose summed size does not exceed the maximum
1058 /// allowed. The pointers are kept globally in the array fUnzipChunks
1059 
1060 Int_t TTreeCacheUnzip::UnzipCache(Int_t &startindex, Int_t &locbuffsz, char *&locbuff)
1061 {
1062  Int_t myCycle;
1063  const Int_t hlen=128;
1064  Int_t objlen=0, keylen=0;
1065  Int_t nbytes=0;
1066  Int_t readbuf = 0;
1067 
1068  Int_t idxtounzip = -1;
1069  Long64_t rdoffs = 0;
1070  Int_t rdlen = 0;
1071  {
1073 
1074  if (!IsActiveThread() || !fNseek || fIsLearning || !fIsTransferred) {
1075  if (gDebug > 0)
1076  Info("UnzipCache", "Sudden Break!!! IsActiveThread(): %d, fNseek: %d, fIsLearning:%d",
1078  return 1;
1079  }
1080 
1081  // To synchronize with the 'paging'
1082  myCycle = fCycle;
1083 
1084  // Try to look for a blk to unzip
1085  idxtounzip = -1;
1086  rdoffs = 0;
1087  rdlen = 0;
1089 
1090  if (fBlocksToGo > 0) {
1091  for (Int_t ii=0; ii < fNseek; ii++) {
1092  Int_t reqi = (startindex+ii) % fNseek;
1093  if (!fUnzipStatus[reqi] && (fSeekLen[reqi] > 256) ) {
1094  // We found a chunk which is not unzipped nor pending
1095  fUnzipStatus[reqi] = 1; // Set it as pending
1096  idxtounzip = reqi;
1097 
1098  rdoffs = fSeek[idxtounzip];
1099  rdlen = fSeekLen[idxtounzip];
1100  break;
1101  }
1102  }
1103  if (idxtounzip < 0) fBlocksToGo = 0;
1104  }
1105  }
1106 
1107  } // lock scope
1108 
1109  if (idxtounzip < 0) {
1110  if (gDebug > 0)
1111  Info("UnzipCache", "Nothing to do... startindex:%d fTotalUnzipBytes:%lld fUnzipBufferSize:%lld fNseek:%d",
1112  startindex, fTotalUnzipBytes, fUnzipBufferSize, fNseek );
1113  return 1;
1114  }
1115 
1116  // And here we have a new blk to unzip
1117  startindex = idxtounzip+THREADCNT;
1118 
1119  if (!IsActiveThread() || !fNseek || fIsLearning ) {
1120  if (gDebug > 0)
1121  Info("UnzipCache", "Sudden Break!!! IsActiveThread(): %d, fNseek: %d, fIsLearning:%d",
1123  return 1;
1124  }
1125 
1126  Int_t loc = -1;
1127 
1128  // Prepare a static tmp buf of adequate size
1129  if(locbuffsz < rdlen) {
1130  if (locbuff) delete [] locbuff;
1131  locbuffsz = rdlen;
1132  locbuff = new char[locbuffsz];
1133  //memset(locbuff, 0, locbuffsz);
1134  } else if(locbuffsz > rdlen*3) {
1135  if (locbuff) delete [] locbuff;
1136  locbuffsz = rdlen*2;
1137  locbuff = new char[locbuffsz];
1138  //memset(locbuff, 0, locbuffsz);
1139  }
1140 
1141  if (gDebug > 0)
1142  Info("UnzipCache", "Going to unzip block %d", idxtounzip);
1143 
1144  readbuf = ReadBufferExt(locbuff, rdoffs, rdlen, loc);
1145 
1146  {
1148 
1149  if ( (myCycle != fCycle) || !fIsTransferred ) {
1150  if (gDebug > 0)
1151  Info("UnzipCache", "Sudden paging Break!!! IsActiveThread(): %d, fNseek: %d, fIsLearning:%d",
1153 
1154  fUnzipStatus[idxtounzip] = 2; // Set it as not done
1155  fUnzipChunks[idxtounzip] = 0;
1156  fUnzipLen[idxtounzip] = 0;
1158 
1159  startindex = 0;
1160  return 1;
1161  }
1162 
1163  if (readbuf <= 0) {
1164  fUnzipStatus[idxtounzip] = 2; // Set it as not done
1165  fUnzipChunks[idxtounzip] = 0;
1166  fUnzipLen[idxtounzip] = 0;
1167  if (gDebug > 0)
1168  Info("UnzipCache", "Block %d not done. rdoffs=%lld rdlen=%d readbuf=%d", idxtounzip, rdoffs, rdlen, readbuf);
1169  return -1;
1170  }
1171 
1172  GetRecordHeader(locbuff, hlen, nbytes, objlen, keylen);
1173 
1174  Int_t len = (objlen > nbytes-keylen)? keylen+objlen : nbytes;
1175 
1176  // If the single unzipped chunk is really too big, reset it to not processable
1177  // I.e. mark it as done but set the pointer to 0
1178  // This block will be unzipped synchronously in the main thread
1179  if (len > 4*fUnzipBufferSize) {
1180 
1181  //if (gDebug > 0)
1182  Info("UnzipCache", "Block %d is too big, skipping.", idxtounzip);
1183 
1184  fUnzipStatus[idxtounzip] = 2; // Set it as done
1185  fUnzipChunks[idxtounzip] = 0;
1186  fUnzipLen[idxtounzip] = 0;
1187 
1189  return 0;
1190  }
1191 
1192  } // Scope of the lock
1193 
1194  // Unzip it into a new blk
1195  char *ptr = 0;
1196  Int_t loclen = 0;
1197 
1198  loclen = UnzipBuffer(&ptr, locbuff);
1199 
1200  if ((loclen > 0) && (loclen == objlen+keylen)) {
1202 
1203  if ( (myCycle != fCycle) || !fIsTransferred) {
1204  if (gDebug > 0)
1205  Info("UnzipCache", "Sudden paging Break!!! IsActiveThread(): %d, fNseek: %d, fIsLearning:%d",
1207  delete [] ptr;
1208 
1209  fUnzipStatus[idxtounzip] = 2; // Set it as not done
1210  fUnzipChunks[idxtounzip] = 0;
1211  fUnzipLen[idxtounzip] = 0;
1212 
1213  startindex = 0;
1215  return 1;
1216  }
1217 
1218  fUnzipStatus[idxtounzip] = 2; // Set it as done
1219  fUnzipChunks[idxtounzip] = ptr;
1220  fUnzipLen[idxtounzip] = loclen;
1221  fTotalUnzipBytes += loclen;
1222 
1223  fActiveBlks.push(idxtounzip);
1224 
1225  if (gDebug > 0)
1226  Info("UnzipCache", "reqi:%d, rdoffs:%lld, rdlen: %d, loclen:%d",
1227  idxtounzip, rdoffs, rdlen, loclen);
1228 
1229  fNUnzip++;
1230  }
1231  else {
1233  Info("argh", "loclen:%d objlen:%d loc:%d readbuf:%d", loclen, objlen, loc, readbuf);
1234  fUnzipStatus[idxtounzip] = 2; // Set it as done
1235  fUnzipChunks[idxtounzip] = 0;
1236  fUnzipLen[idxtounzip] = 0;
1237  }
1238 
1240 
1241  delete [] ptr;
1242  return 0;
1243 }
1244 
1245 void TTreeCacheUnzip::Print(Option_t* option) const {
1246 
1247  printf("******TreeCacheUnzip statistics for file: %s ******\n",fFile->GetName());
1248  printf("Max allowed mem for pending buffers: %lld\n", fUnzipBufferSize);
1249  printf("Number of blocks unzipped by threads: %d\n", fNUnzip);
1250  printf("Number of hits: %d\n", fNFound);
1251  printf("Number of stalls: %d\n", fNStalls);
1252  printf("Number of misses: %d\n", fNMissed);
1253 
1254  TTreeCache::Print(option);
1255 }
1256 
1257 ////////////////////////////////////////////////////////////////////////////////
1258 
1261  return TTreeCache::ReadBufferExt(buf, pos, len, loc);
1262 
1263 }
TCondition * fUnzipStartCondition
Used to signal the threads to start.
virtual const char * GetName() const
Returns name of object.
Definition: TNamed.h:51
static Int_t SetCancelDeferred()
Static method to set the cancellation response type of the calling thread to deferred, i.e.
Definition: TThread.cxx:653
Int_t fNtot
Total size of prefetched blocks.
Definition: TMutex.h:34
void frombuf(char *&buf, Bool_t *x)
Definition: Bytes.h:282
Long64_t fEntryMax
! last entry in the cache
Definition: TTreeCache.h:41
Long64_t * GetBasketEntry() const
Definition: TBranch.h:150
Int_t fNUnzip
! number of blocks that were unzipped
Long64_t GetNextEntry()
Definition: TTree.h:289
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition: TObject.cxx:899
TFile * fFile
Pointer to file.
long long Long64_t
Definition: RtypesCore.h:69
Int_t StopThreadUnzip()
To stop the thread we only need to change the value of the variable fActiveThread to false and the lo...
short Version_t
Definition: RtypesCore.h:61
void UpdateBranches(TTree *tree)
update pointer to current Tree and recompute pointers to the branches in the cache ...
TObjArray * GetListOfBaskets()
Definition: TBranch.h:177
virtual void ResetCache()
This will delete the list of buffers that are in the unzipping cache and will reset certain values in...
float Float_t
Definition: RtypesCore.h:53
Int_t fNStalls
! number of hits which caused a stall
TObjArray * fBranches
! List of branches to be stored in the cache
Definition: TTreeCache.h:48
const char Option_t
Definition: RtypesCore.h:62
Int_t GetRecordHeader(char *buf, Int_t maxbytes, Int_t &nbytes, Int_t &objlen, Int_t &keylen)
Read the logical record header from the buffer buf.
virtual void StopLearningPhase()
It&#39;s the same as TTreeCache::StopLearningPhase but we guarantee that we start the unzipping just afte...
TMutex * fMutexList
Mutex to protect the various lists. Used by the condvars.
A specialized TFileCacheRead object for a TTree.
Definition: TTreeCache.h:34
virtual void Seek(Long64_t offset, ERelativeTo pos=kBeg)
Seek to a specific position in the file. Pos it either kBeg, kCur or kEnd.
Definition: TFile.cxx:2105
static Int_t SetParallelUnzip(TTreeCacheUnzip::EParUnzipMode option=TTreeCacheUnzip::kEnable)
Static function that (de)activates multithreading unzipping.
Byte_t * fUnzipStatus
! [fNSeek] For each blk, tells us if it&#39;s unzipped or pending
virtual Bool_t ReadBuffer(char *buf, Int_t len)
Read a buffer from the file.
Definition: TFile.cxx:1602
virtual Int_t SetBufferSize(Int_t buffersize)
Change the underlying buffer size of the cache.
TDirectory * GetDirectory() const
Definition: TBranch.h:158
Long64_t fEntryMin
! first entry in the cache
Definition: TTreeCache.h:40
Basic string class.
Definition: TString.h:137
Bool_t FillBuffer()
Fill the cache buffer with the branches in the cache.
static Int_t SetCancelOn()
Static method to turn on thread cancellation.
Definition: TThread.cxx:633
virtual void Prefetch(Long64_t pos, Int_t len)
Add block of length len at position pos in the list of blocks to be prefetched.
int Int_t
Definition: RtypesCore.h:41
bool Bool_t
Definition: RtypesCore.h:59
const Bool_t kFALSE
Definition: Rtypes.h:92
Int_t * fUnzipLen
! [fNseek] Length of the unzipped buffers
virtual void StopLearningPhase()
This is the counterpart of StartLearningPhase() and can be used to stop the learning phase...
const char * Class
Definition: TXMLSetup.cxx:64
TThread * fUnzipThread[10]
virtual Int_t AddBranch(TBranch *b, Bool_t subgbranches=kFALSE)
Add a branch to the list of branches to be stored in the cache this function is called by TBranch::Ge...
Definition: TTreeCache.cxx:332
Long64_t * GetTreeOffset() const
Definition: TChain.h:119
void SetUnzipBufferSize(Long64_t bufferSize)
Sets the size for the unzipping cache...
Bool_t fActiveThread
Used to terminate gracefully the unzippers.
Int_t fNFound
! number of blocks that were found in the cache
static Bool_t IsParallelUnzip()
Static function that tells wether the multithreading unzipping is activated.
Int_t UnzipBuffer(char **dest, char *src)
Unzips a ROOT specific buffer...
virtual Int_t GetUnzipBuffer(char **buf, Long64_t pos, Int_t len, Bool_t *free)
We try to read a buffer that has already been unzipped Returns -1 in case of read failure...
std::queue< Int_t > fActiveBlks
The blocks which are active now.
virtual Int_t ReadBufferExt(char *buf, Long64_t pos, Int_t len, Int_t &loc)
Helper class to iterate over cluster of baskets.
Definition: TTree.h:256
Int_t * GetBasketBytes() const
Definition: TBranch.h:149
Int_t GetVersion() const
Definition: TFile.h:214
static void * UnzipLoop(void *arg)
This is a static function.
virtual int GetSysInfo(SysInfo_t *info) const
Returns static system info, like OS type, CPU type, number of CPUs RAM size, etc into the SysInfo_t s...
Definition: TSystem.cxx:2440
#define THREADCNT
Int_t * fSeekLen
[fNseek] Length of buffers to be prefetched
unsigned char Byte_t
Definition: RtypesCore.h:60
virtual Long64_t GetReadEntry() const
Definition: TTree.h:436
Bool_t fIsTransferred
True when fBuffer contains something valid.
Int_t fNMissed
! number of blocks that were not found in the cache and were unzipped
virtual Bool_t ContainsRange(Long64_t entrymin, Long64_t entrymax)
Return TRUE if list contains entries from entrymin to entrymax included.
Definition: TEventList.cxx:171
void Init(TClassEdit::TInterpreterLookupHelper *helper)
Definition: TClassEdit.cxx:119
virtual TClusterIterator GetClusterIterator(Long64_t firstentry)
Return an iterator over the cluster of baskets starting at firstentry.
Definition: TTree.cxx:5042
int R__unzip_header(Int_t *nin, UChar_t *bufin, Int_t *lout)
void Print(Option_t *option="") const
Print cache statistics.
static Long_t SelfId()
Static method returning the id for the current thread.
Definition: TThread.cxx:538
Specialization of TTreeCache for parallel Unzipping.
Int_t Broadcast()
Definition: TCondition.h:58
Int_t Run(void *arg=0)
Start the thread.
Definition: TThread.cxx:552
Bool_t fParallel
Indicate if we want to activate the parallelism (for this instance)
virtual TFile * GetFile() const
Definition: TDirectory.h:155
virtual Int_t ReadBufferExt(char *buf, Long64_t pos, Int_t len, Int_t &loc)
TTree * fTree
! pointer to the current Tree
Definition: TTreeCache.h:50
virtual Int_t AddBranch(TBranch *b, Bool_t subbranches=kFALSE)
Add a branch to the list of branches to be stored in the cache this function is called by TBranch::Ge...
static Double_t fgRelBuffSize
This is the percentage of the TTreeCacheUnzip that will be used.
void SendUnzipStartSignal(Bool_t broadcast)
This will send the signal corresponfing to the queue...
virtual ~TTreeCacheUnzip()
Destructor. (in general called by the TFile destructor)
virtual void SetEntryRange(Long64_t emin, Long64_t emax)
Set the minimum and maximum entry number to be processed this information helps to optimize the numbe...
R__EXTERN TSystem * gSystem
Definition: TSystem.h:549
Int_t * fSeekIndex
[fNseek] sorted index table of fSeek
TTree * GetTree() const
Definition: TTreeCache.h:88
virtual Int_t GetValue(const char *name, Int_t dflt)
Returns the integer value for a resource.
Definition: TEnv.cxx:496
Int_t fNseekMax
! fNseek can change so we need to know its max size
unsigned int UInt_t
Definition: RtypesCore.h:42
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition: TObject.cxx:925
Int_t GetMaxBaskets() const
Definition: TBranch.h:180
static EParUnzipMode GetParallelUnzip()
Static function that returns the parallel option (to indicate an additional thread) ...
short Short_t
Definition: RtypesCore.h:35
A TEventList object is a list of selected events (entries) in a TTree.
Definition: TEventList.h:33
TLine * l
Definition: textangle.C:4
Long_t Join(void **ret=0)
Join this thread.
Definition: TThread.cxx:499
Int_t fNReadPref
Number of blocks that were prefetched.
Definition: TTreeCache.h:47
virtual Long64_t GetBasketSeek(Int_t basket) const
Return address of basket in the file.
Definition: TBranch.cxx:1160
Long64_t fEntryCurrent
! current lowest entry number in the cache
Definition: TTreeCache.h:42
Bool_t IsQueueEmpty()
It says if the queue is empty... useful to see if we have to process it.
TObject * UncheckedAt(Int_t i) const
Definition: TObjArray.h:91
virtual void Print(Option_t *option="") const
Print cache statistics.
#define ClassImp(name)
Definition: Rtypes.h:279
double Double_t
Definition: RtypesCore.h:55
Bool_t fIsLearning
! true if cache is in learning mode
Definition: TTreeCache.h:51
Long64_t fUnzipBufferSize
! Max Size for the ready unzipped blocks (default is 2*fBufferSize)
virtual Int_t GetTreeNumber() const
Definition: TChain.h:118
R__EXTERN TEnv * gEnv
Definition: TEnv.h:174
virtual Int_t GetBufferSize() const
TCondition * fUnzipDoneCondition
Used to wait for an unzip tour to finish. Gives the Async feel.
#define free
Definition: civetweb.c:821
#define R__LOCKGUARD(mutex)
void Init()
Initialization procedure common to all the constructors.
static void SetUnzipRelBufferSize(Float_t relbufferSize)
static function: Sets the unzip relatibe buffer size
static TTreeCacheUnzip::EParUnzipMode fgParallel
Indicate if we want to activate the parallelism.
virtual Int_t SetBufferSize(Int_t buffersize)
Change the underlying buffer size of the cache.
void R__unzip(Int_t *nin, UChar_t *bufin, Int_t *lout, char *bufout, Int_t *nout)
virtual Long64_t GetEntries() const
Definition: TTree.h:393
char ** fUnzipChunks
! [fNseek] Individual unzipped chunks. Their summed size is kept under control.
Int_t fNbranches
! Number of branches in the cache
Definition: TTreeCache.h:44
Long64_t * fSeekSort
[fNseek] Position on file of buffers to be prefetched (sorted)
Bool_t IsActiveThread()
This indicates if the thread is active in this moment...
Int_t StartThreadUnzip(Int_t nthreads)
The Thread is only a part of the TTreeCache but it is the part that waits for info in the queue and p...
#define dest(otri, vertexptr)
Definition: triangle.c:1040
Long64_t * fSeek
[fNseek] Position on file of buffers to be prefetched
A chain is a collection of files containg TTree objects.
Definition: TChain.h:35
you should not use this method at all Int_t Int_t Double_t Double_t Double_t Int_t Double_t Double_t Double_t Double_t b
Definition: TRolke.cxx:630
R__EXTERN Int_t gDebug
Definition: Rtypes.h:128
Definition: tree.py:1
Int_t TimedWaitRelative(ULong_t ms)
Wait to be signaled or till the timer times out.
Definition: TCondition.cxx:114
A TTree object has a header with a name and a title.
Definition: TTree.h:98
unsigned char UChar_t
Definition: RtypesCore.h:34
TEventList * GetEventList() const
Definition: TTree.h:403
virtual const char * GetName() const
Returns name of object.
Definition: TObject.cxx:416
virtual Int_t GetSize() const
Definition: TCollection.h:95
A TTree is a list of TBranches.
Definition: TBranch.h:58
Long64_t fEntryNext
! next entry number where cache must be filled
Definition: TTreeCache.h:43
Long64_t fTotalUnzipBytes
! The total sum of the currently unzipped blks
const Bool_t kTRUE
Definition: Rtypes.h:91
Int_t UnzipCache(Int_t &startindex, Int_t &locbuffsz, char *&locbuff)
This inflates all the buffers in the cache.
void SetEntryRange(Long64_t emin, Long64_t emax)
Set the minimum and maximum entry number to be processed this information helps to optimize the numbe...
virtual void UpdateBranches(TTree *tree)
Update pointer to current Tree and recompute pointers to the branches in the cache.
Long64_t BinarySearch(Long64_t n, const T *array, T value)
Definition: TMath.h:931
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition: TObject.cxx:911
virtual Bool_t ReadBufferAsync(Long64_t offs, Int_t len)
Definition: TFile.cxx:5011
Int_t fNseek
Number of blocks to be prefetched.
Int_t Signal()
Definition: TCondition.h:57
const char * Data() const
Definition: TString.h:349