ROOT  6.07/01
Reference Guide
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
TTreeCacheUnzip.cxx
Go to the documentation of this file.
1 // @(#)root/tree:$Id$
2 // Author: Leandro Franco 10/04/2008
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 /** \class TTreeCacheUnzip
13 
14 Specialization of TTreeCache for parallel Unzipping.
15 
16 Fabrizio Furano (CERN) Aug 2009
17 Core TTree-related code borrowed from the previous version
18  by Leandro Franco and Rene Brun
19 
20 ## Parallel Unzipping
21 
22 TTreeCache has been specialised in order to let additional threads
23 free to unzip in advance its content. In this implementation we
24 support up to 10 threads, but right now it makes more sense to
25 limit their number to 1-2
26 
27 The application reading data is carefully synchronized, in order to:
28  - if the block it wants is not unzipped, it self-unzips it without
29  waiting
30  - if the block is being unzipped in parallel, it waits only
31  for that unzip to finish
32  - if the block has already been unzipped, it takes it
33 
34 This is supposed to cancel a part of the unzipping latency, at the
35 expenses of cpu time.
36 
37 The default parameters are the same of the prev version, i.e. 20%
38 of the TTreeCache cache size. To change it use
39 TTreeCache::SetUnzipBufferSize(Long64_t bufferSize)
40 where bufferSize must be passed in bytes.
41 */
42 
43 #include "TTreeCacheUnzip.h"
44 #include "TChain.h"
45 #include "TBranch.h"
46 #include "TFile.h"
47 #include "TEventList.h"
48 #include "TMutex.h"
49 #include "TVirtualMutex.h"
50 #include "TThread.h"
51 #include "TCondition.h"
52 #include "TMath.h"
53 #include "Bytes.h"
54 
55 #include "TEnv.h"
56 
57 #define THREADCNT 2
58 extern "C" void R__unzip(Int_t *nin, UChar_t *bufin, Int_t *lout, char *bufout, Int_t *nout);
59 extern "C" int R__unzip_header(Int_t *nin, UChar_t *bufin, Int_t *lout);
60 
62 
63 // The unzip cache does not consume memory by itself, it just allocates in advance
64 // mem blocks which are then picked as they are by the baskets.
65 // Hence there is no good reason to limit it too much
67 
69 
70 ////////////////////////////////////////////////////////////////////////////////
71 
73 
74  fActiveThread(kFALSE),
75  fAsyncReading(kFALSE),
76  fCycle(0),
77  fLastReadPos(0),
78  fBlocksToGo(0),
79  fUnzipLen(0),
80  fUnzipChunks(0),
81  fUnzipStatus(0),
82  fTotalUnzipBytes(0),
83  fNseekMax(0),
84  fUnzipBufferSize(0),
85  fNUnzip(0),
86  fNFound(0),
87  fNStalls(0),
88  fNMissed(0)
89 
90 {
91  // Default Constructor.
92 
93  Init();
94 }
95 
96 ////////////////////////////////////////////////////////////////////////////////
97 /// Constructor.
98 
99 TTreeCacheUnzip::TTreeCacheUnzip(TTree *tree, Int_t buffersize) : TTreeCache(tree,buffersize),
100  fActiveThread(kFALSE),
101  fAsyncReading(kFALSE),
102  fCycle(0),
103  fLastReadPos(0),
104  fBlocksToGo(0),
105  fUnzipLen(0),
106  fUnzipChunks(0),
107  fUnzipStatus(0),
108  fTotalUnzipBytes(0),
109  fNseekMax(0),
110  fUnzipBufferSize(0),
111  fNUnzip(0),
112  fNFound(0),
113  fNStalls(0),
114  fNMissed(0)
115 {
116  Init();
117 }
118 
119 ////////////////////////////////////////////////////////////////////////////////
120 /// Initialization procedure common to all the constructors.
121 
123 {
124  fMutexList = new TMutex(kTRUE);
125  fIOMutex = new TMutex(kTRUE);
126 
129 
130  fTotalUnzipBytes = 0;
131 
132  fCompBuffer = new char[16384];
133  fCompBufferSize = 16384;
134 
135  if (fgParallel == kDisable) {
136  fParallel = kFALSE;
137  }
138  else if(fgParallel == kEnable || fgParallel == kForce) {
139  SysInfo_t info;
140  gSystem->GetSysInfo(&info);
141 
143 
144  if(gDebug > 0)
145  Info("TTreeCacheUnzip", "Enabling Parallel Unzipping");
146 
147  fParallel = kTRUE;
148 
149  for (Int_t i = 0; i < 10; i++) fUnzipThread[i] = 0;
150 
152 
153  }
154  else {
155  Warning("TTreeCacheUnzip", "Parallel Option unknown");
156  }
157 
158  // Check if asynchronous reading is supported by this TFile specialization
159  if (gEnv->GetValue("TFile.AsyncReading", 1)) {
160  if (fFile && !(fFile->ReadBufferAsync(0, 0)))
162  }
163 
164 }
165 
166 ////////////////////////////////////////////////////////////////////////////////
167 /// Destructor. (in general called by the TFile destructor)
168 
170 {
171  ResetCache();
172 
173  if (IsActiveThread())
174  StopThreadUnzip();
175 
176  delete [] fUnzipLen;
177 
178  delete fUnzipStartCondition;
179  delete fUnzipDoneCondition;
180 
181  delete fMutexList;
182  delete fIOMutex;
183 
184  delete [] fUnzipStatus;
185  delete [] fUnzipChunks;
186 }
187 
188 ////////////////////////////////////////////////////////////////////////////////
189 /// Add a branch to the list of branches to be stored in the cache
190 /// this function is called by TBranch::GetBasket
191 /// Returns:
192 /// - 0 branch added or already included
193 /// - -1 on error
194 
195 Int_t TTreeCacheUnzip::AddBranch(TBranch *b, Bool_t subbranches /*= kFALSE*/)
196 {
198 
199  return TTreeCache::AddBranch(b, subbranches);
200 }
201 
202 ////////////////////////////////////////////////////////////////////////////////
203 /// Add a branch to the list of branches to be stored in the cache
204 /// this function is called by TBranch::GetBasket
205 /// Returns:
206 /// - 0 branch added or already included
207 /// - -1 on error
208 
209 Int_t TTreeCacheUnzip::AddBranch(const char *branch, Bool_t subbranches /*= kFALSE*/)
210 {
212 
213  return TTreeCache::AddBranch(branch, subbranches);
214 }
215 
216 ////////////////////////////////////////////////////////////////////////////////
217 
219 {
220  if (fNbranches <= 0) return kFALSE;
221  {
222  // Fill the cache buffer with the branches in the cache.
225 
227  Long64_t entry = tree->GetReadEntry();
228 
229  // If the entry is in the range we previously prefetched, there is
230  // no point in retrying. Note that this will also return false
231  // during the training phase (fEntryNext is then set intentional to
232  // the end of the training phase).
233  if (fEntryCurrent <= entry && entry < fEntryNext) return kFALSE;
234 
235  // Triggered by the user, not the learning phase
236  if (entry == -1) entry=0;
237 
238  TTree::TClusterIterator clusterIter = tree->GetClusterIterator(entry);
239  fEntryCurrent = clusterIter();
240  fEntryNext = clusterIter.GetNextEntry();
241 
243  if (fEntryMax <= 0) fEntryMax = tree->GetEntries();
245 
246  // Check if owner has a TEventList set. If yes we optimize for this
247  // Special case reading only the baskets containing entries in the
248  // list.
249  TEventList *elist = fTree->GetEventList();
250  Long64_t chainOffset = 0;
251  if (elist) {
252  if (fTree->IsA() ==TChain::Class()) {
253  TChain *chain = (TChain*)fTree;
254  Int_t t = chain->GetTreeNumber();
255  chainOffset = chain->GetTreeOffset()[t];
256  }
257  }
258 
259  //clear cache buffer
261 
262  //store baskets
263  for (Int_t i=0;i<fNbranches;i++) {
265  if (b->GetDirectory()==0) continue;
266  if (b->GetDirectory()->GetFile() != fFile) continue;
267  Int_t nb = b->GetMaxBaskets();
268  Int_t *lbaskets = b->GetBasketBytes();
269  Long64_t *entries = b->GetBasketEntry();
270  if (!lbaskets || !entries) continue;
271  //we have found the branch. We now register all its baskets
272  //from the requested offset to the basket below fEntrymax
273  Int_t blistsize = b->GetListOfBaskets()->GetSize();
274  for (Int_t j=0;j<nb;j++) {
275  // This basket has already been read, skip it
276  if (j<blistsize && b->GetListOfBaskets()->UncheckedAt(j)) continue;
277 
278  Long64_t pos = b->GetBasketSeek(j);
279  Int_t len = lbaskets[j];
280  if (pos <= 0 || len <= 0) continue;
281  //important: do not try to read fEntryNext, otherwise you jump to the next autoflush
282  if (entries[j] >= fEntryNext) continue;
283  if (entries[j] < entry && (j<nb-1 && entries[j+1] <= entry)) continue;
284  if (elist) {
285  Long64_t emax = fEntryMax;
286  if (j<nb-1) emax = entries[j+1]-1;
287  if (!elist->ContainsRange(entries[j]+chainOffset,emax+chainOffset)) continue;
288  }
289  fNReadPref++;
290 
291  TFileCacheRead::Prefetch(pos,len);
292  }
293  if (gDebug > 0) printf("Entry: %lld, registering baskets branch %s, fEntryNext=%lld, fNseek=%d, fNtot=%d\n",entry,((TBranch*)fBranches->UncheckedAt(i))->GetName(),fEntryNext,fNseek,fNtot);
294  }
295 
296  // Now fix the size of the status arrays
297  ResetCache();
298 
300 
301  }
302 
303  return kTRUE;
304 }
305 
306 ////////////////////////////////////////////////////////////////////////////////
307 /// Change the underlying buffer size of the cache.
308 /// Returns:
309 /// - 0 if the buffer content is still available
310 /// - 1 if some or all of the buffer content has been made unavailable
311 /// - -1 on error
312 
314 {
316 
317  Int_t res = TTreeCache::SetBufferSize(buffersize);
318  if (res < 0) {
319  return res;
320  }
322  ResetCache();
323  return 1;
324 }
325 
326 ////////////////////////////////////////////////////////////////////////////////
327 /// Set the minimum and maximum entry number to be processed
328 /// this information helps to optimize the number of baskets to read
329 /// when prefetching the branch buffers.
330 
332 {
334 
335  TTreeCache::SetEntryRange(emin, emax);
336 }
337 
338 ////////////////////////////////////////////////////////////////////////////////
339 /// It's the same as TTreeCache::StopLearningPhase but we guarantee that
340 /// we start the unzipping just after getting the buffers
341 
343 {
345 
347 
348 }
349 
350 ////////////////////////////////////////////////////////////////////////////////
351 ///update pointer to current Tree and recompute pointers to the branches in the cache
352 
354 {
356 
358 }
359 
360 ////////////////////////////////////////////////////////////////////////////////
361 // //
362 // From now on we have the methods concerning the threading part of the cache //
363 // //
364 ////////////////////////////////////////////////////////////////////////////////
365 
366 ////////////////////////////////////////////////////////////////////////////////
367 /// Static function that returns the parallel option
368 /// (to indicate an additional thread)
369 
371 {
372  return fgParallel;
373 }
374 
375 ////////////////////////////////////////////////////////////////////////////////
376 /// Static function that tells wether the multithreading unzipping is activated
377 
379 {
380  if (fgParallel == kEnable || fgParallel == kForce)
381  return kTRUE;
382 
383  return kFALSE;
384 }
385 
386 ////////////////////////////////////////////////////////////////////////////////
387 /// This indicates if the thread is active in this moment...
388 /// this variable is very important because if we change it from true to
389 /// false the thread will stop... ( see StopThreadTreeCacheUnzip() )
390 
392 {
394 
395  return fActiveThread;
396 }
397 
398 ////////////////////////////////////////////////////////////////////////////////
399 /// It says if the queue is empty... useful to see if we have to process it.
400 
402 {
404 
405  if ( fIsLearning )
406  return kTRUE;
407 
408  return kFALSE;
409 }
410 
412 {
413  // Here the threads sleep waiting for some blocks to unzip
414 
416 
417 }
418 
419 ////////////////////////////////////////////////////////////////////////////////
420 /// This will send the signal corresponfing to the queue... normally used
421 /// when we want to start processing the list of buffers.
422 
424 {
425  if (gDebug > 0) Info("SendSignal", " fUnzipCondition->Signal()");
426 
427  if (broadcast)
429  else
431 }
432 
433 ////////////////////////////////////////////////////////////////////////////////
434 /// Static function that (de)activates multithreading unzipping
435 ///
436 /// The possible options are:
437 /// - kEnable _Enable_ it, which causes an automatic detection and launches the
438 /// additional thread if the number of cores in the machine is greater than
439 /// one
440 /// - kDisable _Disable_ will not activate the additional thread.
441 /// - kForce _Force_ will start the additional thread even if there is only one
442 /// core. the default will be taken as kEnable.
443 ///
444 /// Returns 0 if there was an error, 1 otherwise.
445 
447 {
449  fgParallel = option;
450  return 1;
451  }
452  return 0;
453 }
454 
455 class TTreeCacheUnzipData {
456 public:
457  TTreeCacheUnzip *fInstance;
458  Int_t fCount;
459 };
460 
461 ////////////////////////////////////////////////////////////////////////////////
462 /// The Thread is only a part of the TTreeCache but it is the part that
463 /// waits for info in the queue and process it... unfortunatly, a Thread is
464 /// not an object an we have to deal with it in the old C-Style way
465 /// Returns 0 if the thread was initialized or 1 if it was already running
466 
468 {
469  Int_t nt = nthreads;
470  if (nt > 10) nt = 10;
471 
472  if (gDebug > 0)
473  Info("StartThreadUnzip", "Going to start %d threads.", nt);
474 
475  for (Int_t i = 0; i < nt; i++) {
476  if (!fUnzipThread[i]) {
477  TString nm("UnzipLoop");
478  nm += i;
479 
480  if (gDebug > 0)
481  Info("StartThreadUnzip", "Going to start thread '%s'", nm.Data());
482 
483  TTreeCacheUnzipData *d = new TTreeCacheUnzipData;
484  d->fInstance = this;
485  d->fCount = i;
486 
487  fUnzipThread[i] = new TThread(nm.Data(), UnzipLoop, (void*)d);
488  if (!fUnzipThread[i])
489  Error("TTreeCacheUnzip::StartThreadUnzip", " Unable to create new thread.");
490 
491  fUnzipThread[i]->Run();
492 
493  // There is at least one active thread
495 
496  }
497  }
498 
499  return (fActiveThread == kTRUE);
500 }
501 
502 ////////////////////////////////////////////////////////////////////////////////
503 /// To stop the thread we only need to change the value of the variable
504 /// fActiveThread to false and the loop will stop (of course, we will have)
505 /// to do the cleaning after that.
506 ///
507 /// Note: The syncronization part is important here or we will try to delete
508 /// teh object while it's still processing the queue
509 
511 {
513 
514  for (Int_t i = 0; i < 1; i++) {
515  if(fUnzipThread[i]){
516 
518 
519  if (fUnzipThread[i]->Exists()) {
520  fUnzipThread[i]->Join();
521  delete fUnzipThread[i];
522  }
523  }
524 
525  }
526 
527  return 1;
528 }
529 
530 ////////////////////////////////////////////////////////////////////////////////
531 /// This is a static function.
532 ///
533 /// This is the call that will be executed in the Thread generated by
534 /// StartThreadTreeCacheUnzip... what we want to do is to inflate the next
535 /// series of buffers leaving them in the second cache.
536 ///
537 /// Returns 0 when it finishes
538 
540 {
541  TTreeCacheUnzipData *d = (TTreeCacheUnzipData *)arg;
542  TTreeCacheUnzip *unzipMng = d->fInstance;
543 
546 
547  Int_t thrnum = d->fCount;
548  Int_t startindex = thrnum;
549  Int_t locbuffsz = 16384;
550  char *locbuff = new char[16384];
551  Int_t res = 0;
552  Int_t myCycle = 0;
553 
554  while( unzipMng->IsActiveThread() ) {
555  res = 1;
556 
557  {
558  R__LOCKGUARD(unzipMng->fMutexList);
559  if (myCycle != unzipMng->fCycle) startindex = thrnum;
560  myCycle = unzipMng->fCycle;
561  if (unzipMng->fNseek) startindex = startindex % unzipMng->fNseek;
562  else startindex = -1;
563  }
564 
565  if (startindex >= 0)
566  res = unzipMng->UnzipCache(startindex, locbuffsz, locbuff);
567 
568  {
569  R__LOCKGUARD(unzipMng->fMutexList);
570 
571  if(!unzipMng->IsActiveThread()) break;
572 
573  if ((res == 1) || (!unzipMng->fIsTransferred)) {
574  unzipMng->WaitUnzipStartSignal();
575  startindex = unzipMng->fLastReadPos+3+thrnum;
576  }
577  }
578 
579  }
580 
581  delete d;
582  delete [] locbuff;
583  return (void *)0;
584 }
585 
586 ////////////////////////////////////////////////////////////////////////////////
587 // //
588 // From now on we have the methods concerning the unzipping part of the cache //
589 // //
590 ////////////////////////////////////////////////////////////////////////////////
591 
592 ////////////////////////////////////////////////////////////////////////////////
593 /// Read the logical record header from the buffer buf.
594 /// That must be the pointer tho the header part not the object by itself and
595 /// must contain data of at least maxbytes
596 /// Returns nread;
597 ///
598 /// In output arguments:
599 ///
600 /// - nbytes : number of bytes in record
601 /// if negative, this is a deleted record
602 /// if 0, cannot read record, wrong value of argument first
603 /// - objlen : uncompressed object size
604 /// - keylen : length of logical record header
605 ///
606 /// Note that the arguments objlen and keylen are returned only
607 /// if maxbytes >=16
608 /// Note: This was adapted from TFile... so some things dont apply
609 
610 Int_t TTreeCacheUnzip::GetRecordHeader(char *buf, Int_t maxbytes, Int_t &nbytes, Int_t &objlen, Int_t &keylen)
611 {
612  Version_t versionkey;
613  Short_t klen;
614  UInt_t datime;
615  Int_t nb = 0,olen;
616  Int_t nread = maxbytes;
617  frombuf(buf,&nb);
618  nbytes = nb;
619  if (nb < 0) return nread;
620  // const Int_t headerSize = Int_t(sizeof(nb) +sizeof(versionkey) +sizeof(olen) +sizeof(datime) +sizeof(klen));
621  const Int_t headerSize = 16;
622  if (nread < headerSize) return nread;
623  frombuf(buf, &versionkey);
624  frombuf(buf, &olen);
625  frombuf(buf, &datime);
626  frombuf(buf, &klen);
627  if (!olen) olen = nbytes-klen;
628  objlen = olen;
629  keylen = klen;
630  return nread;
631 }
632 
633 ////////////////////////////////////////////////////////////////////////////////
634 /// This will delete the list of buffers that are in the unzipping cache
635 /// and will reset certain values in the cache.
636 /// This name is ambiguos because the method doesn't reset the whole cache,
637 /// only the part related to the unzipping
638 /// Note: This method is completely different from TTreeCache::ResetCache(),
639 /// in that method we were cleaning the prefetching buffer while here we
640 /// delete the information about the unzipped buffers
641 
643 {
644  {
646 
647  if (gDebug > 0)
648  Info("ResetCache", "Thread: %ld -- Resetting the cache. fNseek:%d fNSeekMax:%d fTotalUnzipBytes:%lld", TThread::SelfId(), fNseek, fNseekMax, fTotalUnzipBytes);
649 
650  // Reset all the lists and wipe all the chunks
651  fCycle++;
652  for (Int_t i = 0; i < fNseekMax; i++) {
653  if (fUnzipLen) fUnzipLen[i] = 0;
654  if (fUnzipChunks) {
655  if (fUnzipChunks[i]) delete [] fUnzipChunks[i];
656  fUnzipChunks[i] = 0;
657  }
658  if (fUnzipStatus) fUnzipStatus[i] = 0;
659 
660  }
661 
662  while (fActiveBlks.size()) fActiveBlks.pop();
663 
664  if(fNseekMax < fNseek){
665  if (gDebug > 0)
666  Info("ResetCache", "Changing fNseekMax from:%d to:%d", fNseekMax, fNseek);
667 
668  Byte_t *aUnzipStatus = new Byte_t[fNseek];
669  memset(aUnzipStatus, 0, fNseek*sizeof(Byte_t));
670 
671  Int_t *aUnzipLen = new Int_t[fNseek];
672  memset(aUnzipLen, 0, fNseek*sizeof(Int_t));
673 
674  char **aUnzipChunks = new char *[fNseek];
675  memset(aUnzipChunks, 0, fNseek*sizeof(char *));
676 
677  if (fUnzipStatus) delete [] fUnzipStatus;
678  if (fUnzipLen) delete [] fUnzipLen;
679  if (fUnzipChunks) delete [] fUnzipChunks;
680 
681  fUnzipStatus = aUnzipStatus;
682  fUnzipLen = aUnzipLen;
683  fUnzipChunks = aUnzipChunks;
684 
685  fNseekMax = fNseek;
686  }
687 
688  fLastReadPos = 0;
689  fTotalUnzipBytes = 0;
691  }
692 
694 
695 }
696 
697 ////////////////////////////////////////////////////////////////////////////////
698 /// We try to read a buffer that has already been unzipped
699 /// Returns -1 in case of read failure, 0 in case it's not in the
700 /// cache and n>0 in case read from cache (number of bytes copied).
701 /// pos and len are the original values as were passed to ReadBuffer
702 /// but instead we will return the inflated buffer.
703 /// Note!! : If *buf == 0 we will allocate the buffer and it will be the
704 /// responsability of the caller to free it... it is useful for example
705 /// to pass it to the creator of TBuffer
706 
708 {
709  Int_t res = 0;
710  Int_t loc = -1;
711 
712  {
714 
715  // We go straight to TTreeCache/TfileCacheRead, in order to get the info we need
716  // pointer to the original zipped chunk
717  // its index in the original unsorted offsets lists
718  //
719  // Actually there are situations in which copying the buffer is not
720  // useful. But the choice is among doing once more a small memcpy or a binary search in a large array. I prefer the former.
721  // Also, here we prefer not to trigger the (re)population of the chunks in the TFileCacheRead. That is
722  // better to be done in the main thread.
723 
724  // And now loc is the position of the chunk in the array of the sorted chunks
725  Int_t myCycle = fCycle;
726 
727  if (fParallel && !fIsLearning) {
728 
729  if(fNseekMax < fNseek){
730  if (gDebug > 0)
731  Info("GetUnzipBuffer", "Changing fNseekMax from:%d to:%d", fNseekMax, fNseek);
732 
733  Byte_t *aUnzipStatus = new Byte_t[fNseek];
734  memset(aUnzipStatus, 0, fNseek*sizeof(Byte_t));
735 
736  Int_t *aUnzipLen = new Int_t[fNseek];
737  memset(aUnzipLen, 0, fNseek*sizeof(Int_t));
738 
739  char **aUnzipChunks = new char *[fNseek];
740  memset(aUnzipChunks, 0, fNseek*sizeof(char *));
741 
742  for (Int_t i = 0; i < fNseekMax; i++) {
743  aUnzipStatus[i] = fUnzipStatus[i];
744  aUnzipLen[i] = fUnzipLen[i];
745  aUnzipChunks[i] = fUnzipChunks[i];
746  }
747 
748  if (fUnzipStatus) delete [] fUnzipStatus;
749  if (fUnzipLen) delete [] fUnzipLen;
750  if (fUnzipChunks) delete [] fUnzipChunks;
751 
752  fUnzipStatus = aUnzipStatus;
753  fUnzipLen = aUnzipLen;
754  fUnzipChunks = aUnzipChunks;
755 
756  fNseekMax = fNseek;
757  }
758 
760  if ( (fCycle == myCycle) && (loc >= 0) && (loc < fNseek) && (pos == fSeekSort[loc]) ) {
761 
762  // The buffer is, at minimum, in the file cache. We must know its index in the requests list
763  // In order to get its info
764  Int_t seekidx = fSeekIndex[loc];
765 
766  fLastReadPos = seekidx;
767 
768  do {
769 
770  // If the block is ready we get it immediately.
771  // And also we don't have to alloc the blks. This is supposed to be
772  // the main thread of the app.
773  if ((fUnzipStatus[seekidx] == 2) && (fUnzipChunks[seekidx]) && (fUnzipLen[seekidx] > 0)) {
774 
775  //if (gDebug > 0)
776  // Info("GetUnzipBuffer", "++++++++++++++++++++ CacheHIT Block wanted: %d len:%d req_len:%d fNseek:%d", seekidx, fUnzipLen[seekidx], len, fNseek);
777 
778  if(!(*buf)) {
779  *buf = fUnzipChunks[seekidx];
780  fUnzipChunks[seekidx] = 0;
781  fTotalUnzipBytes -= fUnzipLen[seekidx];
783  *free = kTRUE;
784  }
785  else {
786  memcpy(*buf, fUnzipChunks[seekidx], fUnzipLen[seekidx]);
787  delete fUnzipChunks[seekidx];
788  fTotalUnzipBytes -= fUnzipLen[seekidx];
789  fUnzipChunks[seekidx] = 0;
791  *free = kFALSE;
792  }
793 
794  fNFound++;
795 
796  return fUnzipLen[seekidx];
797  }
798 
799  // If the status of the unzipped chunk is pending
800  // we wait on the condvar, hoping that the next signal is the good one
801  if ( fUnzipStatus[seekidx] == 1 ) {
802  //fMutexList->UnLock();
804  //fMutexList->Lock();
805 
806  if ( myCycle != fCycle ) {
807  if (gDebug > 0)
808  Info("GetUnzipBuffer", "Sudden paging Break!!! IsActiveThread(): %d, fNseek: %d, fIsLearning:%d",
810 
811  fLastReadPos = 0;
812 
813  seekidx = -1;
814  break;
815  }
816 
817  }
818 
819  } while ( fUnzipStatus[seekidx] == 1 );
820 
821  //if (gDebug > 0)
822  // Info("GetUnzipBuffer", "------- Block wanted: %d status: %d len: %d chunk: %llx ", seekidx, fUnzipStatus[seekidx], fUnzipLen[seekidx], fUnzipChunks[seekidx]);
823 
824  // Here the block is not pending. It could be done or aborted or not yet being processed.
825  if ( (seekidx >= 0) && (fUnzipStatus[seekidx] == 2) && (fUnzipChunks[seekidx]) && (fUnzipLen[seekidx] > 0) ) {
826 
827  //if (gDebug > 0)
828  // Info("GetUnzipBuffer", "++++++++++++++++++++ CacheLateHIT Block wanted: %d len:%d fNseek:%d", seekidx, fUnzipLen[seekidx], fNseek);
829 
830  if(!(*buf)) {
831  *buf = fUnzipChunks[seekidx];
832  fUnzipChunks[seekidx] = 0;
833  fTotalUnzipBytes -= fUnzipLen[seekidx];
835  *free = kTRUE;
836  }
837  else {
838  memcpy(*buf, fUnzipChunks[seekidx], fUnzipLen[seekidx]);
839  delete fUnzipChunks[seekidx];
840  fTotalUnzipBytes -= fUnzipLen[seekidx];
841  fUnzipChunks[seekidx] = 0;
843  *free = kFALSE;
844  }
845 
846  fNStalls++;
847 
848  return fUnzipLen[seekidx];
849  }
850  else {
851  // This is a complete miss. We want to avoid the threads
852  // to try unzipping this block in the future.
853  fUnzipStatus[seekidx] = 2;
854  fUnzipChunks[seekidx] = 0;
855 
858 
859  //if (gDebug > 0)
860  // Info("GetUnzipBuffer", "++++++++++++++++++++ CacheMISS Block wanted: %d len:%d fNseek:%d", seekidx, len, fNseek);
861  }
862 
863  } else {
864  loc = -1;
865  //fLastReadPos = 0;
867  }
868 
869  } else {
870  // We need to reset it for new transferences...
871  //ResetCache();
872  //TFileCacheRead::Prefetch(0,0);
873  }
874 
875  } // scope of the lock!
876 
877  if (len > fCompBufferSize) {
878  delete [] fCompBuffer;
879  fCompBuffer = new char[len];
880  fCompBufferSize = len;
881  } else {
882  if (fCompBufferSize > len*4) {
883  delete [] fCompBuffer;
884  fCompBuffer = new char[len*2];
885  fCompBufferSize = len*2;
886  }
887  }
888 
889  {
891  // Here we know that the async unzip of the wanted chunk
892  // was not done for some reason. We continue.
893 
894  res = 0;
895  if (!ReadBufferExt(fCompBuffer, pos, len, loc)) {
896  //Info("GetUnzipBuffer", "++++++++++++++++++++ CacheMISS %d %d", loc, fNseek);
897  fFile->Seek(pos);
898  res = fFile->ReadBuffer(fCompBuffer, len);
899  }
900 
901  if (res) res = -1;
902 
903  } // scope of the lock!
904 
905  if (!res) {
906  res = UnzipBuffer(buf, fCompBuffer);
907  *free = kTRUE;
908  }
909 
910  if (!fIsLearning) {
911  fNMissed++;
912  }
913 
914  return res;
915 
916 }
917 
918 ////////////////////////////////////////////////////////////////////////////////
919 /// static function: Sets the unzip relatibe buffer size
920 
922 {
923  fgRelBuffSize = relbufferSize;
924 }
925 
926 ////////////////////////////////////////////////////////////////////////////////
927 /// Sets the size for the unzipping cache... by default it should be
928 /// two times the size of the prefetching cache
929 
931 {
933 
934  fUnzipBufferSize = bufferSize;
935 }
936 
937 ////////////////////////////////////////////////////////////////////////////////
938 /// Unzips a ROOT specific buffer... by reading the header at the beginning.
939 /// returns the size of the inflated buffer or -1 if error
940 /// Note!! : If *dest == 0 we will allocate the buffer and it will be the
941 /// responsability of the caller to free it... it is useful for example
942 /// to pass it to the creator of TBuffer
943 /// src is the original buffer with the record (header+compressed data)
944 /// *dest is the inflated buffer (including the header)
945 
947 {
948  Int_t uzlen = 0;
949  Bool_t alloc = kFALSE;
950 
951  // Here we read the header of the buffer
952  const Int_t hlen=128;
953  Int_t nbytes=0, objlen=0, keylen=0;
954  GetRecordHeader(src, hlen, nbytes, objlen, keylen);
955 
956  if (!(*dest)) {
957  /* early consistency check */
958  UChar_t *bufcur = (UChar_t *) (src + keylen);
959  Int_t nin, nbuf;
960  if(R__unzip_header(&nin, bufcur, &nbuf)!=0) {
961  Error("UnzipBuffer", "Inconsistency found in header (nin=%d, nbuf=%d)", nin, nbuf);
962  uzlen = -1;
963  return uzlen;
964  }
965  Int_t l = keylen+objlen;
966  *dest = new char[l];
967  alloc = kTRUE;
968  }
969  // Must unzip the buffer
970  // fSeekPos[ind]; adress of zipped buffer
971  // fSeekLen[ind]; len of the zipped buffer
972  // &fBuffer[fSeekPos[ind]]; memory address
973 
974  // This is similar to TBasket::ReadBasketBuffers
975  Bool_t oldCase = objlen==nbytes-keylen
976  && ((TBranch*)fBranches->UncheckedAt(0))->GetCompressionLevel()!=0
977  && fFile->GetVersion()<=30401;
978 
979  if (objlen > nbytes-keylen || oldCase) {
980 
981  // Copy the key
982  memcpy(*dest, src, keylen);
983  uzlen += keylen;
984 
985  char *objbuf = *dest + keylen;
986  UChar_t *bufcur = (UChar_t *) (src + keylen);
987  Int_t nin, nbuf;
988  Int_t nout = 0;
989  Int_t noutot = 0;
990 
991  while (1) {
992  Int_t hc = R__unzip_header(&nin, bufcur, &nbuf);
993  if (hc!=0) break;
994  if (gDebug > 2)
995  Info("UnzipBuffer", " nin:%d, nbuf:%d, bufcur[3] :%d, bufcur[4] :%d, bufcur[5] :%d ",
996  nin, nbuf, bufcur[3], bufcur[4], bufcur[5]);
997  if (oldCase && (nin > objlen || nbuf > objlen)) {
998  if (gDebug > 2)
999  Info("UnzipBuffer", "oldcase objlen :%d ", objlen);
1000 
1001  //buffer was very likely not compressed in an old version
1002  memcpy( *dest + keylen, src + keylen, objlen);
1003  uzlen += objlen;
1004  return uzlen;
1005  }
1006 
1007  R__unzip(&nin, bufcur, &nbuf, objbuf, &nout);
1008 
1009  if (gDebug > 2)
1010  Info("UnzipBuffer", "R__unzip nin:%d, bufcur:%p, nbuf:%d, objbuf:%p, nout:%d",
1011  nin, bufcur, nbuf, objbuf, nout);
1012 
1013  if (!nout) break;
1014  noutot += nout;
1015  if (noutot >= objlen) break;
1016  bufcur += nin;
1017  objbuf += nout;
1018  }
1019 
1020  if (noutot != objlen) {
1021  Error("UnzipBuffer", "nbytes = %d, keylen = %d, objlen = %d, noutot = %d, nout=%d, nin=%d, nbuf=%d",
1022  nbytes,keylen,objlen, noutot,nout,nin,nbuf);
1023  uzlen = -1;
1024  if(alloc) delete [] *dest;
1025  *dest = 0;
1026  return uzlen;
1027  }
1028  uzlen += objlen;
1029  } else {
1030  memcpy(*dest, src, keylen);
1031  uzlen += keylen;
1032  memcpy(*dest + keylen, src + keylen, objlen);
1033  uzlen += objlen;
1034  }
1035  return uzlen;
1036 }
1037 
1038 ////////////////////////////////////////////////////////////////////////////////
1039 /// This inflates all the buffers in the cache.. passing the data to a new
1040 /// buffer that will only wait there to be read...
1041 /// We can not inflate all the buffers in the cache so we will try to do
1042 /// it until the cache gets full... there is a member called fUnzipBufferSize which will
1043 /// tell us the max size we can allocate for this cache.
1044 ///
1045 /// note that we will unzip in the order they were put into the cache not
1046 /// the order of the transference so it has to be read in that order or the
1047 /// pre-unzipping will be useless.
1048 ///
1049 /// startindex is used as start index to check for blks to be unzipped
1050 ///
1051 /// returns 0 in normal conditions or -1 if error, 1 if it would like to sleep
1052 ///
1053 /// This func is supposed to compete among an indefinite number of threads to get a chunk to inflate
1054 /// in order to accommodate multiple unzippers
1055 /// Since everything is so async, we cannot use a fixed buffer, we are forced to keep
1056 /// the individual chunks as separate blocks, whose summed size does not exceed the maximum
1057 /// allowed. The pointers are kept globally in the array fUnzipChunks
1058 
1059 Int_t TTreeCacheUnzip::UnzipCache(Int_t &startindex, Int_t &locbuffsz, char *&locbuff)
1060 {
1061  Int_t myCycle;
1062  const Int_t hlen=128;
1063  Int_t objlen=0, keylen=0;
1064  Int_t nbytes=0;
1065  Int_t readbuf = 0;
1066 
1067  Int_t idxtounzip = -1;
1068  Long64_t rdoffs = 0;
1069  Int_t rdlen = 0;
1070  {
1072 
1073  if (!IsActiveThread() || !fNseek || fIsLearning || !fIsTransferred) {
1074  if (gDebug > 0)
1075  Info("UnzipCache", "Sudden Break!!! IsActiveThread(): %d, fNseek: %d, fIsLearning:%d",
1077  return 1;
1078  }
1079 
1080  // To synchronize with the 'paging'
1081  myCycle = fCycle;
1082 
1083  // Try to look for a blk to unzip
1084  idxtounzip = -1;
1085  rdoffs = 0;
1086  rdlen = 0;
1088 
1089  if (fBlocksToGo > 0) {
1090  for (Int_t ii=0; ii < fNseek; ii++) {
1091  Int_t reqi = (startindex+ii) % fNseek;
1092  if (!fUnzipStatus[reqi] && (fSeekLen[reqi] > 256) ) {
1093  // We found a chunk which is not unzipped nor pending
1094  fUnzipStatus[reqi] = 1; // Set it as pending
1095  idxtounzip = reqi;
1096 
1097  rdoffs = fSeek[idxtounzip];
1098  rdlen = fSeekLen[idxtounzip];
1099  break;
1100  }
1101  }
1102  if (idxtounzip < 0) fBlocksToGo = 0;
1103  }
1104  }
1105 
1106  } // lock scope
1107 
1108  if (idxtounzip < 0) {
1109  if (gDebug > 0)
1110  Info("UnzipCache", "Nothing to do... startindex:%d fTotalUnzipBytes:%lld fUnzipBufferSize:%lld fNseek:%d",
1111  startindex, fTotalUnzipBytes, fUnzipBufferSize, fNseek );
1112  return 1;
1113  }
1114 
1115  // And here we have a new blk to unzip
1116  startindex = idxtounzip+THREADCNT;
1117 
1118  if (!IsActiveThread() || !fNseek || fIsLearning ) {
1119  if (gDebug > 0)
1120  Info("UnzipCache", "Sudden Break!!! IsActiveThread(): %d, fNseek: %d, fIsLearning:%d",
1122  return 1;
1123  }
1124 
1125  Int_t loc = -1;
1126 
1127  // Prepare a static tmp buf of adequate size
1128  if(locbuffsz < rdlen) {
1129  if (locbuff) delete [] locbuff;
1130  locbuffsz = rdlen;
1131  locbuff = new char[locbuffsz];
1132  //memset(locbuff, 0, locbuffsz);
1133  } else
1134  if(locbuffsz > rdlen*3) {
1135  if (locbuff) delete [] locbuff;
1136  locbuffsz = rdlen*2;
1137  locbuff = new char[locbuffsz];
1138  //memset(locbuff, 0, locbuffsz);
1139  }
1140 
1141  if (gDebug > 0)
1142  Info("UnzipCache", "Going to unzip block %d", idxtounzip);
1143 
1144  readbuf = ReadBufferExt(locbuff, rdoffs, rdlen, loc);
1145 
1146  {
1148 
1149  if ( (myCycle != fCycle) || !fIsTransferred ) {
1150  if (gDebug > 0)
1151  Info("UnzipCache", "Sudden paging Break!!! IsActiveThread(): %d, fNseek: %d, fIsLearning:%d",
1153 
1154  fUnzipStatus[idxtounzip] = 2; // Set it as not done
1155  fUnzipChunks[idxtounzip] = 0;
1156  fUnzipLen[idxtounzip] = 0;
1158 
1159  startindex = 0;
1160  return 1;
1161  }
1162 
1163  if (readbuf <= 0) {
1164  fUnzipStatus[idxtounzip] = 2; // Set it as not done
1165  fUnzipChunks[idxtounzip] = 0;
1166  fUnzipLen[idxtounzip] = 0;
1167  if (gDebug > 0)
1168  Info("UnzipCache", "Block %d not done. rdoffs=%lld rdlen=%d readbuf=%d", idxtounzip, rdoffs, rdlen, readbuf);
1169  return -1;
1170  }
1171 
1172  GetRecordHeader(locbuff, hlen, nbytes, objlen, keylen);
1173 
1174  Int_t len = (objlen > nbytes-keylen)? keylen+objlen : nbytes;
1175 
1176  // If the single unzipped chunk is really too big, reset it to not processable
1177  // I.e. mark it as done but set the pointer to 0
1178  // This block will be unzipped synchronously in the main thread
1179  if (len > 4*fUnzipBufferSize) {
1180 
1181  //if (gDebug > 0)
1182  Info("UnzipCache", "Block %d is too big, skipping.", idxtounzip);
1183 
1184  fUnzipStatus[idxtounzip] = 2; // Set it as done
1185  fUnzipChunks[idxtounzip] = 0;
1186  fUnzipLen[idxtounzip] = 0;
1187 
1189  return 0;
1190  }
1191 
1192  } // Scope of the lock
1193 
1194  // Unzip it into a new blk
1195  char *ptr = 0;
1196  Int_t loclen = 0;
1197 
1198  loclen = UnzipBuffer(&ptr, locbuff);
1199 
1200  if ((loclen > 0) && (loclen == objlen+keylen)) {
1202 
1203  if ( (myCycle != fCycle) || !fIsTransferred) {
1204  if (gDebug > 0)
1205  Info("UnzipCache", "Sudden paging Break!!! IsActiveThread(): %d, fNseek: %d, fIsLearning:%d",
1207  delete [] ptr;
1208 
1209  fUnzipStatus[idxtounzip] = 2; // Set it as not done
1210  fUnzipChunks[idxtounzip] = 0;
1211  fUnzipLen[idxtounzip] = 0;
1212 
1213  startindex = 0;
1215  return 1;
1216  }
1217 
1218  fUnzipStatus[idxtounzip] = 2; // Set it as done
1219  fUnzipChunks[idxtounzip] = ptr;
1220  fUnzipLen[idxtounzip] = loclen;
1221  fTotalUnzipBytes += loclen;
1222 
1223  fActiveBlks.push(idxtounzip);
1224 
1225  if (gDebug > 0)
1226  Info("UnzipCache", "reqi:%d, rdoffs:%lld, rdlen: %d, loclen:%d",
1227  idxtounzip, rdoffs, rdlen, loclen);
1228 
1229  fNUnzip++;
1230  }
1231  else {
1233  Info("argh", "loclen:%d objlen:%d loc:%d readbuf:%d", loclen, objlen, loc, readbuf);
1234  fUnzipStatus[idxtounzip] = 2; // Set it as done
1235  fUnzipChunks[idxtounzip] = 0;
1236  fUnzipLen[idxtounzip] = 0;
1237  }
1238 
1240 
1241  delete [] ptr;
1242  return 0;
1243 }
1244 
1245 void TTreeCacheUnzip::Print(Option_t* option) const {
1246 
1247  printf("******TreeCacheUnzip statistics for file: %s ******\n",fFile->GetName());
1248  printf("Max allowed mem for pending buffers: %lld\n", fUnzipBufferSize);
1249  printf("Number of blocks unzipped by threads: %d\n", fNUnzip);
1250  printf("Number of hits: %d\n", fNFound);
1251  printf("Number of stalls: %d\n", fNStalls);
1252  printf("Number of misses: %d\n", fNMissed);
1253 
1254  TTreeCache::Print(option);
1255 }
1256 
1257 ////////////////////////////////////////////////////////////////////////////////
1258 
1261  return TTreeCache::ReadBufferExt(buf, pos, len, loc);
1262 
1263 }
TCondition * fUnzipStartCondition
void Print(Option_t *option="") const
Print cache statistics.
static Int_t SetCancelDeferred()
Static method to set the cancellation response type of the calling thread to deferred, i.e.
Definition: TThread.cxx:653
Int_t fNtot
Total size of prefetched blocks.
Definition: TMutex.h:34
void frombuf(char *&buf, Bool_t *x)
Definition: Bytes.h:282
Long64_t fEntryMax
first entry in the cache
Definition: TTreeCache.h:41
Long64_t GetNextEntry()
Definition: TTree.h:282
TFile * fFile
Pointer to file.
long long Long64_t
Definition: RtypesCore.h:69
Int_t StopThreadUnzip()
To stop the thread we only need to change the value of the variable fActiveThread to false and the lo...
short Version_t
Definition: RtypesCore.h:61
void UpdateBranches(TTree *tree)
update pointer to current Tree and recompute pointers to the branches in the cache ...
TObjArray * GetListOfBaskets()
Definition: TBranch.h:176
virtual void ResetCache()
This will delete the list of buffers that are in the unzipping cache and will reset certain values in...
float Float_t
Definition: RtypesCore.h:53
Int_t fNStalls
number of blocks that were found in the cache
TObjArray * fBranches
Definition: TTreeCache.h:48
const char Option_t
Definition: RtypesCore.h:62
TEventList * GetEventList() const
Definition: TTree.h:396
virtual Long64_t GetReadEntry() const
Definition: TTree.h:428
Int_t GetRecordHeader(char *buf, Int_t maxbytes, Int_t &nbytes, Int_t &objlen, Int_t &keylen)
Read the logical record header from the buffer buf.
virtual void StopLearningPhase()
It's the same as TTreeCache::StopLearningPhase but we guarantee that we start the unzipping just afte...
A specialized TFileCacheRead object for a TTree.
Definition: TTreeCache.h:34
virtual void Seek(Long64_t offset, ERelativeTo pos=kBeg)
Seek to a specific position in the file. Pos it either kBeg, kCur or kEnd.
Definition: TFile.cxx:2085
static Int_t SetParallelUnzip(TTreeCacheUnzip::EParUnzipMode option=TTreeCacheUnzip::kEnable)
Static function that (de)activates multithreading unzipping.
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition: TObject.cxx:892
Byte_t * fUnzipStatus
[fNseek] Individual unzipped chunks. Their summed size is kept under control.
virtual Bool_t ReadBuffer(char *buf, Int_t len)
Read a buffer from the file.
Definition: TFile.cxx:1596
virtual Int_t SetBufferSize(Int_t buffersize)
Change the underlying buffer size of the cache.
Long64_t fEntryMin
Definition: TTreeCache.h:40
Basic string class.
Definition: TString.h:137
Bool_t FillBuffer()
Fill the cache buffer with the branches in the cache.
static Int_t SetCancelOn()
Static method to turn on thread cancellation.
Definition: TThread.cxx:633
virtual void Prefetch(Long64_t pos, Int_t len)
Add block of length len at position pos in the list of blocks to be prefetched.
int Int_t
Definition: RtypesCore.h:41
bool Bool_t
Definition: RtypesCore.h:59
const Bool_t kFALSE
Definition: Rtypes.h:92
virtual Long64_t GetBasketSeek(Int_t basket) const
Return address of basket in the file.
Definition: TBranch.cxx:1144
virtual void StopLearningPhase()
This is the counterpart of StartLearningPhase() and can be used to stop the learning phase...
TTree * GetTree() const
Definition: TTreeCache.h:88
Long64_t * GetBasketEntry() const
Definition: TBranch.h:149
TThread * fUnzipThread[10]
virtual Int_t AddBranch(TBranch *b, Bool_t subgbranches=kFALSE)
Add a branch to the list of branches to be stored in the cache this function is called by TBranch::Ge...
Definition: TTreeCache.cxx:330
void SetUnzipBufferSize(Long64_t bufferSize)
Sets the size for the unzipping cache...
virtual void Print(Option_t *option="") const
Print cache statistics.
Definition: TTreeCache.cxx:994
Int_t fNFound
number of blocks that were unzipped
static Bool_t IsParallelUnzip()
Static function that tells wether the multithreading unzipping is activated.
Int_t UnzipBuffer(char **dest, char *src)
Unzips a ROOT specific buffer...
virtual Int_t GetUnzipBuffer(char **buf, Long64_t pos, Int_t len, Bool_t *free)
We try to read a buffer that has already been unzipped Returns -1 in case of read failure...
TChain chain("h42")
const char * Data() const
Definition: TString.h:349
std::queue< Int_t > fActiveBlks
number of blocks that were not found in the cache and were unzipped
virtual Int_t ReadBufferExt(char *buf, Long64_t pos, Int_t len, Int_t &loc)
Long64_t * GetTreeOffset() const
Definition: TChain.h:119
Helper class to iterate over cluster of baskets.
Definition: TTree.h:249
static void * UnzipLoop(void *arg)
This is a static function.
#define THREADCNT
virtual Int_t GetTreeNumber() const
Definition: TChain.h:118
Int_t * fSeekLen
[fNseek] Length of buffers to be prefetched
unsigned char Byte_t
Definition: RtypesCore.h:60
void Class()
Definition: Class.C:29
Bool_t fIsTransferred
True when fBuffer contains something valid.
Int_t fNMissed
number of hits which caused a stall
Int_t * GetBasketBytes() const
Definition: TBranch.h:148
virtual Bool_t ContainsRange(Long64_t entrymin, Long64_t entrymax)
Return TRUE if list contains entries from entrymin to entrymax included.
Definition: TEventList.cxx:169
int d
Definition: tornado.py:11
void Init(TClassEdit::TInterpreterLookupHelper *helper)
Definition: TClassEdit.cxx:118
virtual Int_t GetBufferSize() const
virtual TClusterIterator GetClusterIterator(Long64_t firstentry)
Return an iterator over the cluster of baskets starting at firstentry.
Definition: TTree.cxx:4976
int R__unzip_header(Int_t *nin, UChar_t *bufin, Int_t *lout)
static Long_t SelfId()
Static method returning the id for the current thread.
Definition: TThread.cxx:538
Specialization of TTreeCache for parallel Unzipping.
Int_t Broadcast()
Definition: TCondition.h:58
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition: TObject.cxx:918
Int_t Run(void *arg=0)
Start the thread.
Definition: TThread.cxx:552
virtual Int_t ReadBufferExt(char *buf, Long64_t pos, Int_t len, Int_t &loc)
TTree * fTree
list of branch names in the cache
Definition: TTreeCache.h:50
virtual Int_t AddBranch(TBranch *b, Bool_t subbranches=kFALSE)
Add a branch to the list of branches to be stored in the cache this function is called by TBranch::Ge...
virtual TFile * GetFile() const
Definition: TDirectory.h:155
TObject * UncheckedAt(Int_t i) const
Definition: TObjArray.h:91
static Double_t fgRelBuffSize
Max Size for the ready unzipped blocks (default is 2*fBufferSize)
void SendUnzipStartSignal(Bool_t broadcast)
This will send the signal corresponfing to the queue...
virtual ~TTreeCacheUnzip()
Destructor. (in general called by the TFile destructor)
TThread * t[5]
Definition: threadsh1.C:13
virtual void SetEntryRange(Long64_t emin, Long64_t emax)
Set the minimum and maximum entry number to be processed this information helps to optimize the numbe...
R__EXTERN TSystem * gSystem
Definition: TSystem.h:545
Int_t * fSeekIndex
[fNseek] sorted index table of fSeek
virtual Int_t GetValue(const char *name, Int_t dflt)
Returns the integer value for a resource.
Definition: TEnv.cxx:494
Int_t fNseekMax
The total sum of the currently unzipped blks.
unsigned int UInt_t
Definition: RtypesCore.h:42
static EParUnzipMode GetParallelUnzip()
Static function that returns the parallel option (to indicate an additional thread) ...
short Short_t
Definition: RtypesCore.h:35
A TEventList object is a list of selected events (entries) in a TTree.
Definition: TEventList.h:33
TLine * l
Definition: textangle.C:4
virtual const char * GetName() const
Returns name of object.
Definition: TNamed.h:51
Long_t Join(void **ret=0)
Join this thread.
Definition: TThread.cxx:499
Long64_t entry
tuple free
Definition: fildir.py:30
Int_t fNReadPref
Definition: TTreeCache.h:47
Long64_t fEntryCurrent
last entry in the cache
Definition: TTreeCache.h:42
tuple tree
Definition: tree.py:24
Bool_t IsQueueEmpty()
It says if the queue is empty... useful to see if we have to process it.
virtual Int_t GetSize() const
Definition: TCollection.h:95
virtual const char * GetName() const
Returns name of object.
Definition: TObject.cxx:415
double Double_t
Definition: RtypesCore.h:55
Bool_t fIsLearning
pointer to the current Tree
Definition: TTreeCache.h:51
Long64_t fUnzipBufferSize
fNseek can change so we need to know its max size
R__EXTERN TEnv * gEnv
Definition: TEnv.h:174
TCondition * fUnzipDoneCondition
ClassImp(TMCParticle) void TMCParticle printf(": p=(%7.3f,%7.3f,%9.3f) ;", fPx, fPy, fPz)
#define R__LOCKGUARD(mutex)
void Init()
Initialization procedure common to all the constructors.
static void SetUnzipRelBufferSize(Float_t relbufferSize)
static function: Sets the unzip relatibe buffer size
static TTreeCacheUnzip::EParUnzipMode fgParallel
virtual Int_t SetBufferSize(Int_t buffersize)
Change the underlying buffer size of the cache.
void R__unzip(Int_t *nin, UChar_t *bufin, Int_t *lout, char *bufout, Int_t *nout)
char ** fUnzipChunks
[fNseek] Length of the unzipped buffers
TDirectory * GetDirectory() const
Definition: TBranch.h:157
Int_t fNbranches
next entry number where cache must be filled
Definition: TTreeCache.h:44
Long64_t * fSeekSort
[fNseek] Position on file of buffers to be prefetched (sorted)
Bool_t IsActiveThread()
This indicates if the thread is active in this moment...
Int_t GetMaxBaskets() const
Definition: TBranch.h:179
Int_t StartThreadUnzip(Int_t nthreads)
The Thread is only a part of the TTreeCache but it is the part that waits for info in the queue and p...
#define dest(otri, vertexptr)
Definition: triangle.c:1040
Int_t GetVersion() const
Definition: TFile.h:205
Long64_t * fSeek
[fNseek] Position on file of buffers to be prefetched
A chain is a collection of files containg TTree objects.
Definition: TChain.h:35
ClassImp(TTreeCacheUnzip) TTreeCacheUnzip
R__EXTERN Int_t gDebug
Definition: Rtypes.h:128
Int_t TimedWaitRelative(ULong_t ms)
Wait to be signaled or till the timer times out.
Definition: TCondition.cxx:114
virtual Long64_t GetEntries() const
Definition: TTree.h:386
A TTree object has a header with a name and a title.
Definition: TTree.h:98
unsigned char UChar_t
Definition: RtypesCore.h:34
A TTree is a list of TBranches.
Definition: TBranch.h:58
Long64_t fEntryNext
current lowest entry number in the cache
Definition: TTreeCache.h:43
Long64_t fTotalUnzipBytes
[fNSeek] For each blk, tells us if it's unzipped or pending
const Bool_t kTRUE
Definition: Rtypes.h:91
Int_t UnzipCache(Int_t &startindex, Int_t &locbuffsz, char *&locbuff)
This inflates all the buffers in the cache.
void SetEntryRange(Long64_t emin, Long64_t emax)
Set the minimum and maximum entry number to be processed this information helps to optimize the numbe...
virtual void UpdateBranches(TTree *tree)
Update pointer to current Tree and recompute pointers to the branches in the cache.
Long64_t BinarySearch(Long64_t n, const T *array, T value)
Definition: TMath.h:944
virtual int GetSysInfo(SysInfo_t *info) const
Returns static system info, like OS type, CPU type, number of CPUs RAM size, etc into the SysInfo_t s...
Definition: TSystem.cxx:2360
virtual Bool_t ReadBufferAsync(Long64_t offs, Int_t len)
Definition: TFile.cxx:4955
Int_t fNseek
Number of blocks to be prefetched.
Int_t Signal()
Definition: TCondition.h:57
int ii
Definition: hprod.C:34
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition: TObject.cxx:904