Logo ROOT   6.14/05
Reference Guide
TTreeCacheUnzip.cxx
Go to the documentation of this file.
1 // @(#)root/tree:$Id$
2 // Author: Leandro Franco 10/04/2008
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 /** \class TTreeCacheUnzip
13 \ingroup tree
14 
15 Specialization of TTreeCache for parallel Unzipping.
16 
17 Fabrizio Furano (CERN) Aug 2009
18 Core TTree-related code borrowed from the previous version
19  by Leandro Franco and Rene Brun
20 
21 ## Parallel Unzipping
22 
23 TTreeCache has been specialised in order to let additional threads
24 free to unzip in advance its content. In this implementation we
25 support up to 10 threads, but right now it makes more sense to
26 limit their number to 1-2
27 
28 The application reading data is carefully synchronized, in order to:
29  - if the block it wants is not unzipped, it self-unzips it without
30  waiting
31  - if the block is being unzipped in parallel, it waits only
32  for that unzip to finish
33  - if the block has already been unzipped, it takes it
34 
35 This is supposed to cancel a part of the unzipping latency, at the
36 expenses of cpu time.
37 
38 The default parameters are the same of the prev version, i.e. 20%
39 of the TTreeCache cache size. To change it use
40 TTreeCache::SetUnzipBufferSize(Long64_t bufferSize)
41 where bufferSize must be passed in bytes.
42 */
43 
44 #include "TTreeCacheUnzip.h"
45 #include "TBranch.h"
46 #include "TChain.h"
47 #include "TEnv.h"
48 #include "TEventList.h"
49 #include "TFile.h"
50 #include "TMath.h"
51 #include "TMutex.h"
52 #include "TROOT.h"
53 #include "TVirtualMutex.h"
54 
55 #ifdef R__USE_IMT
56 #include "ROOT/TThreadExecutor.hxx"
57 #endif
58 
59 extern "C" void R__unzip(Int_t *nin, UChar_t *bufin, Int_t *lout, char *bufout, Int_t *nout);
60 extern "C" int R__unzip_header(Int_t *nin, UChar_t *bufin, Int_t *lout);
61 
63 
64 // The unzip cache does not consume memory by itself, it just allocates in advance
65 // mem blocks which are then picked as they are by the baskets.
66 // Hence there is no good reason to limit it too much
68 
70 
71 ////////////////////////////////////////////////////////////////////////////////
72 /// Clear all baskets' state arrays.
73 
75  for (Int_t i = 0; i < size; i++) {
76  if (!fUnzipLen.empty()) fUnzipLen[i] = 0;
77  if (fUnzipChunks) {
78  if (fUnzipChunks[i]) fUnzipChunks[i].reset();
79  }
80  if (fUnzipStatus) fUnzipStatus[i].store(0);
81  }
82 }
83 
84 ////////////////////////////////////////////////////////////////////////////////
85 
87  return fUnzipStatus[index].load() == kUntouched;
88 }
89 
90 ////////////////////////////////////////////////////////////////////////////////
91 
93  return fUnzipStatus[index].load() == kProgress;
94 }
95 
96 ////////////////////////////////////////////////////////////////////////////////
97 
99  return fUnzipStatus[index].load() == kFinished;
100 }
101 
102 ////////////////////////////////////////////////////////////////////////////////
103 /// Check if the basket is unzipped already. We must make sure the length in
104 /// fUnzipLen is larger than 0.
105 
107  return (fUnzipStatus[index].load() == kFinished) && (fUnzipChunks[index].get()) && (fUnzipLen[index] > 0);
108 }
109 
110 ////////////////////////////////////////////////////////////////////////////////
111 /// Reset all baskets' state arrays. This function is only called by main
112 /// thread and parallel processing from upper layers should be disabled such
113 /// as IMT in TTree::GetEntry(). Other threads should not call this function
114 /// since it is not thread-safe.
115 
117  std::vector<Int_t> aUnzipLen = std::vector<Int_t>(newSize, 0);
118  std::unique_ptr<char[]> *aUnzipChunks = new std::unique_ptr<char[]>[newSize];
119  std::atomic<Byte_t> *aUnzipStatus = new std::atomic<Byte_t>[newSize];
120  memset(aUnzipStatus, 0, newSize*sizeof(std::atomic<Byte_t>));
121 
122  for (Int_t i = 0; i < oldSize; i++) {
123  aUnzipLen[i] = fUnzipLen[i];
124  aUnzipChunks[i] = std::move(fUnzipChunks[i]);
125  aUnzipStatus[i].store(fUnzipStatus[i].load());
126  }
127 
128  if (fUnzipChunks) delete [] fUnzipChunks;
129  if (fUnzipStatus) delete [] fUnzipStatus;
130 
131  fUnzipLen = aUnzipLen;
132  fUnzipChunks = aUnzipChunks;
133  fUnzipStatus = aUnzipStatus;
134 }
135 
136 ////////////////////////////////////////////////////////////////////////////////
137 /// Set cache as finished.
138 /// There are three scenarios that a basket is set as finished:
139 /// 1. The basket has already been unzipped.
140 /// 2. The thread is aborted from unzipping process.
141 /// 3. To avoid other tasks/threads work on this basket,
142 /// main thread marks the basket as finished and designates itself
143 /// to unzip this basket.
144 
146  fUnzipLen[index] = 0;
147  fUnzipChunks[index].reset();
148  fUnzipStatus[index].store((Byte_t)kFinished);
149 }
150 
151 ////////////////////////////////////////////////////////////////////////////////
152 
154  fUnzipChunks[index].reset();
155  fUnzipStatus[index].store((Byte_t)kFinished);
156 }
157 
158 ////////////////////////////////////////////////////////////////////////////////
159 
161  // Update status array at the very end because we need to be synchronous with the main thread.
162  fUnzipLen[index] = len;
163  fUnzipChunks[index].reset(buf);
164  fUnzipStatus[index].store((Byte_t)kFinished);
165 }
166 
167 ////////////////////////////////////////////////////////////////////////////////
168 /// Start unzipping the basket if it is untouched yet.
169 
171  Byte_t oldValue = kUntouched;
172  Byte_t newValue = kProgress;
173  return fUnzipStatus[index].compare_exchange_weak(oldValue, newValue, std::memory_order_release, std::memory_order_relaxed);
174 }
175 
176 ////////////////////////////////////////////////////////////////////////////////
177 
180  fEmpty(kTRUE),
181  fCycle(0),
182  fNseekMax(0),
183  fUnzipGroupSize(0),
184  fUnzipBufferSize(0),
185  fNFound(0),
186  fNMissed(0),
187  fNStalls(0),
188  fNUnzip(0)
189 {
190  // Default Constructor.
191  Init();
192 }
193 
194 ////////////////////////////////////////////////////////////////////////////////
195 /// Constructor.
196 
197 TTreeCacheUnzip::TTreeCacheUnzip(TTree *tree, Int_t buffersize) : TTreeCache(tree,buffersize),
199  fEmpty(kTRUE),
200  fCycle(0),
201  fNseekMax(0),
202  fUnzipGroupSize(0),
203  fUnzipBufferSize(0),
204  fNFound(0),
205  fNMissed(0),
206  fNStalls(0),
207  fNUnzip(0)
208 {
209  Init();
210 }
211 
212 ////////////////////////////////////////////////////////////////////////////////
213 /// Initialization procedure common to all the constructors.
214 
216 {
217 #ifdef R__USE_IMT
218  fUnzipTaskGroup.reset();
219 #endif
220  fIOMutex = new TMutex(kTRUE);
221 
222  fCompBuffer = new char[16384];
223  fCompBufferSize = 16384;
224 
225  fUnzipGroupSize = 102400; // Each task unzips at least 100 KB
226 
227  if (fgParallel == kDisable) {
228  fParallel = kFALSE;
229  }
230  else if(fgParallel == kEnable || fgParallel == kForce) {
232 
233  if(gDebug > 0)
234  Info("TTreeCacheUnzip", "Enabling Parallel Unzipping");
235 
236  fParallel = kTRUE;
237 
238  }
239  else {
240  Warning("TTreeCacheUnzip", "Parallel Option unknown");
241  }
242 
243  // Check if asynchronous reading is supported by this TFile specialization
244  if (gEnv->GetValue("TFile.AsyncReading", 1)) {
245  if (fFile && !(fFile->ReadBufferAsync(0, 0)))
247  }
248 
249 }
250 
251 ////////////////////////////////////////////////////////////////////////////////
252 /// Destructor. (in general called by the TFile destructor)
253 
255 {
256  ResetCache();
257  delete fIOMutex;
259 }
260 
261 ////////////////////////////////////////////////////////////////////////////////
262 /// Add a branch to the list of branches to be stored in the cache
263 /// this function is called by TBranch::GetBasket
264 /// Returns:
265 /// - 0 branch added or already included
266 /// - -1 on error
267 
268 Int_t TTreeCacheUnzip::AddBranch(TBranch *b, Bool_t subbranches /*= kFALSE*/)
269 {
270  return TTreeCache::AddBranch(b, subbranches);
271 }
272 
273 ////////////////////////////////////////////////////////////////////////////////
274 /// Add a branch to the list of branches to be stored in the cache
275 /// this function is called by TBranch::GetBasket
276 /// Returns:
277 /// - 0 branch added or already included
278 /// - -1 on error
279 
280 Int_t TTreeCacheUnzip::AddBranch(const char *branch, Bool_t subbranches /*= kFALSE*/)
281 {
282  return TTreeCache::AddBranch(branch, subbranches);
283 }
284 
285 ////////////////////////////////////////////////////////////////////////////////
286 
288 {
289 
290  if (fNbranches <= 0) return kFALSE;
291 
292  // Fill the cache buffer with the branches in the cache.
294 
296  Long64_t entry = tree->GetReadEntry();
297 
298  // If the entry is in the range we previously prefetched, there is
299  // no point in retrying. Note that this will also return false
300  // during the training phase (fEntryNext is then set intentional to
301  // the end of the training phase).
302  if (fEntryCurrent <= entry && entry < fEntryNext) return kFALSE;
303 
304  // Triggered by the user, not the learning phase
305  if (entry == -1) entry = 0;
306 
307  TTree::TClusterIterator clusterIter = tree->GetClusterIterator(entry);
308  fEntryCurrent = clusterIter();
309  fEntryNext = clusterIter.GetNextEntry();
310 
312  if (fEntryMax <= 0) fEntryMax = tree->GetEntries();
314 
315  // Check if owner has a TEventList set. If yes we optimize for this
316  // Special case reading only the baskets containing entries in the
317  // list.
318  TEventList *elist = fTree->GetEventList();
319  Long64_t chainOffset = 0;
320  if (elist) {
321  if (fTree->IsA() ==TChain::Class()) {
322  TChain *chain = (TChain*)fTree;
323  Int_t t = chain->GetTreeNumber();
324  chainOffset = chain->GetTreeOffset()[t];
325  }
326  }
327 
328  //clear cache buffer
330 
331  //store baskets
332  for (Int_t i = 0; i < fNbranches; i++) {
334  if (b->GetDirectory() == 0) continue;
335  if (b->GetDirectory()->GetFile() != fFile) continue;
336  Int_t nb = b->GetMaxBaskets();
337  Int_t *lbaskets = b->GetBasketBytes();
338  Long64_t *entries = b->GetBasketEntry();
339  if (!lbaskets || !entries) continue;
340  //we have found the branch. We now register all its baskets
341  //from the requested offset to the basket below fEntrymax
342  Int_t blistsize = b->GetListOfBaskets()->GetSize();
343  for (Int_t j=0;j<nb;j++) {
344  // This basket has already been read, skip it
345  if (j<blistsize && b->GetListOfBaskets()->UncheckedAt(j)) continue;
346 
347  Long64_t pos = b->GetBasketSeek(j);
348  Int_t len = lbaskets[j];
349  if (pos <= 0 || len <= 0) continue;
350  //important: do not try to read fEntryNext, otherwise you jump to the next autoflush
351  if (entries[j] >= fEntryNext) continue;
352  if (entries[j] < entry && (j < nb - 1 && entries[j+1] <= entry)) continue;
353  if (elist) {
354  Long64_t emax = fEntryMax;
355  if (j < nb - 1) emax = entries[j+1] - 1;
356  if (!elist->ContainsRange(entries[j] + chainOffset, emax + chainOffset)) continue;
357  }
358  fNReadPref++;
359 
360  TFileCacheRead::Prefetch(pos, len);
361  }
362  if (gDebug > 0) printf("Entry: %lld, registering baskets branch %s, fEntryNext=%lld, fNseek=%d, fNtot=%d\n", entry, ((TBranch*)fBranches->UncheckedAt(i))->GetName(), fEntryNext, fNseek, fNtot);
363  }
364 
365  // Now fix the size of the status arrays
366  ResetCache();
368 
369  return kTRUE;
370 }
371 
372 ////////////////////////////////////////////////////////////////////////////////
373 /// Change the underlying buffer size of the cache.
374 /// Returns:
375 /// - 0 if the buffer content is still available
376 /// - 1 if some or all of the buffer content has been made unavailable
377 /// - -1 on error
378 
380 {
381  Int_t res = TTreeCache::SetBufferSize(buffersize);
382  if (res < 0) {
383  return res;
384  }
386  ResetCache();
387  return 1;
388 }
389 
390 ////////////////////////////////////////////////////////////////////////////////
391 /// Set the minimum and maximum entry number to be processed
392 /// this information helps to optimize the number of baskets to read
393 /// when prefetching the branch buffers.
394 
396 {
397  TTreeCache::SetEntryRange(emin, emax);
398 }
399 
400 ////////////////////////////////////////////////////////////////////////////////
401 /// It's the same as TTreeCache::StopLearningPhase but we guarantee that
402 /// we start the unzipping just after getting the buffers
403 
405 {
407 }
408 
409 ////////////////////////////////////////////////////////////////////////////////
410 ///update pointer to current Tree and recompute pointers to the branches in the cache
411 
413 {
415 }
416 
417 ////////////////////////////////////////////////////////////////////////////////
418 // //
419 // From now on we have the methods concerning the threading part of the cache //
420 // //
421 ////////////////////////////////////////////////////////////////////////////////
422 
423 ////////////////////////////////////////////////////////////////////////////////
424 /// Static function that returns the parallel option
425 /// (to indicate an additional thread)
426 
428 {
429  return fgParallel;
430 }
431 
432 ////////////////////////////////////////////////////////////////////////////////
433 /// Static function that tells wether the multithreading unzipping is activated
434 
436 {
437  if (fgParallel == kEnable || fgParallel == kForce)
438  return kTRUE;
439 
440  return kFALSE;
441 }
442 
443 ////////////////////////////////////////////////////////////////////////////////
444 /// Static function that (de)activates multithreading unzipping
445 ///
446 /// The possible options are:
447 /// - kEnable _Enable_ it, which causes an automatic detection and launches the
448 /// additional thread if the number of cores in the machine is greater than
449 /// one
450 /// - kDisable _Disable_ will not activate the additional thread.
451 /// - kForce _Force_ will start the additional thread even if there is only one
452 /// core. the default will be taken as kEnable.
453 ///
454 /// Returns 0 if there was an error, 1 otherwise.
455 
457 {
459  fgParallel = option;
460  return 1;
461  }
462  return 0;
463 }
464 
465 ////////////////////////////////////////////////////////////////////////////////
466 // //
467 // From now on we have the methods concerning the unzipping part of the cache //
468 // //
469 ////////////////////////////////////////////////////////////////////////////////
470 
471 ////////////////////////////////////////////////////////////////////////////////
472 /// Read the logical record header from the buffer buf.
473 /// That must be the pointer tho the header part not the object by itself and
474 /// must contain data of at least maxbytes
475 /// Returns nread;
476 ///
477 /// In output arguments:
478 ///
479 /// - nbytes : number of bytes in record
480 /// if negative, this is a deleted record
481 /// if 0, cannot read record, wrong value of argument first
482 /// - objlen : uncompressed object size
483 /// - keylen : length of logical record header
484 ///
485 /// Note that the arguments objlen and keylen are returned only
486 /// if maxbytes >=16
487 /// Note: This was adapted from TFile... so some things dont apply
488 
489 Int_t TTreeCacheUnzip::GetRecordHeader(char *buf, Int_t maxbytes, Int_t &nbytes, Int_t &objlen, Int_t &keylen)
490 {
491  Version_t versionkey;
492  Short_t klen;
493  UInt_t datime;
494  Int_t nb = 0, olen;
495  Int_t nread = maxbytes;
496  frombuf(buf, &nb);
497  nbytes = nb;
498  if (nb < 0) return nread;
499  // const Int_t headerSize = Int_t(sizeof(nb) +sizeof(versionkey) +sizeof(olen) +sizeof(datime) +sizeof(klen));
500  const Int_t headerSize = 16;
501  if (nread < headerSize) return nread;
502  frombuf(buf, &versionkey);
503  frombuf(buf, &olen);
504  frombuf(buf, &datime);
505  frombuf(buf, &klen);
506  if (!olen) olen = nbytes - klen;
507  objlen = olen;
508  keylen = klen;
509  return nread;
510 }
511 
512 ////////////////////////////////////////////////////////////////////////////////
513 /// This will delete the list of buffers that are in the unzipping cache
514 /// and will reset certain values in the cache.
515 /// This name is ambiguos because the method doesn't reset the whole cache,
516 /// only the part related to the unzipping
517 /// Note: This method is completely different from TTreeCache::ResetCache(),
518 /// in that method we were cleaning the prefetching buffer while here we
519 /// delete the information about the unzipped buffers
520 
522 {
523  // Reset all the lists and wipe all the chunks
524  fCycle++;
526 
527  if(fNseekMax < fNseek){
528  if (gDebug > 0)
529  Info("ResetCache", "Changing fNseekMax from:%d to:%d", fNseekMax, fNseek);
530 
532  fNseekMax = fNseek;
533  }
534  fEmpty = kTRUE;
535 }
536 
537 ////////////////////////////////////////////////////////////////////////////////
538 /// This inflates a basket in the cache.. passing the data to a new
539 /// buffer that will only wait there to be read...
540 /// This function is responsible to update corresponding elements in
541 /// fUnzipStatus, fUnzipChunks and fUnzipLen. Since we use atomic variables
542 /// in fUnzipStatus to exclusively unzip the basket, we must update
543 /// fUnzipStatus after fUnzipChunks and fUnzipLen and make sure fUnzipChunks
544 /// and fUnzipLen are ready before main thread fetch the data.
545 
547 {
548  Int_t myCycle;
549  const Int_t hlen = 128;
550  Int_t objlen = 0, keylen = 0;
551  Int_t nbytes = 0;
552  Int_t readbuf = 0;
553 
554  Long64_t rdoffs = 0;
555  Int_t rdlen = 0;
556 
557  // To synchronize with the 'paging'
558  myCycle = fCycle;
559  rdoffs = fSeek[index];
560  rdlen = fSeekLen[index];
561 
562  Int_t loc = -1;
563  if (!fNseek || fIsLearning) {
564  return 1;
565  }
566 
567  if ((myCycle != fCycle) || !fIsTransferred) {
568  fUnzipState.SetFinished(index); // Set it as not done, main thread will take charge
569  return 1;
570  }
571 
572  // Prepare a memory buffer of adequate size
573  char* locbuff = 0;
574  if (rdlen > 16384) {
575  locbuff = new char[rdlen];
576  } else if (rdlen * 3 < 16384) {
577  locbuff = new char[rdlen * 2];
578  } else {
579  locbuff = new char[16384];
580  }
581 
582  readbuf = ReadBufferExt(locbuff, rdoffs, rdlen, loc);
583 
584  if (readbuf <= 0) {
585  fUnzipState.SetFinished(index); // Set it as not done, main thread will take charge
586  if (locbuff) delete [] locbuff;
587  return -1;
588  }
589 
590  GetRecordHeader(locbuff, hlen, nbytes, objlen, keylen);
591 
592  Int_t len = (objlen > nbytes - keylen) ? keylen + objlen : nbytes;
593  // If the single unzipped chunk is really too big, reset it to not processable
594  // I.e. mark it as done but set the pointer to 0
595  // This block will be unzipped synchronously in the main thread
596  // TODO: ROOT internally breaks zipped buffers into 16MB blocks, we can probably still unzip in parallel.
597  if (len > 4 * fUnzipBufferSize) {
598  if (gDebug > 0)
599  Info("UnzipCache", "Block %d is too big, skipping.", index);
600 
601  fUnzipState.SetFinished(index); // Set it as not done, main thread will take charge
602  if (locbuff) delete [] locbuff;
603  return 0;
604  }
605 
606  // Unzip it into a new blk
607  char *ptr = 0;
608  Int_t loclen = UnzipBuffer(&ptr, locbuff);
609  if ((loclen > 0) && (loclen == objlen + keylen)) {
610  if ((myCycle != fCycle) || !fIsTransferred) {
611  fUnzipState.SetFinished(index); // Set it as not done, main thread will take charge
612  if (locbuff) delete [] locbuff;
613  return 1;
614  }
615  fUnzipState.SetUnzipped(index, ptr, loclen); // Set it as done
616  fNUnzip++;
617  } else {
618  fUnzipState.SetFinished(index); // Set it as not done, main thread will take charge
619  }
620 
621  if (locbuff) delete [] locbuff;
622  return 0;
623 }
624 
625 #ifdef R__USE_IMT
626 ////////////////////////////////////////////////////////////////////////////////
627 /// We create a TTaskGroup and asynchronously maps each group of baskets(> 100 kB in total)
628 /// to a task. In TTaskGroup, we use TThreadExecutor to do the actually work of unzipping
629 /// a group of basket. The purpose of creating TTaskGroup is to avoid competing with main thread.
630 
632 {
633  auto mapFunction = [&]() {
634  auto unzipFunction = [&](const std::vector<Int_t> &indices) {
635  // If cache is invalidated and we should return immediately.
636  if (!fIsTransferred) return nullptr;
637 
638  for (auto ii : indices) {
639  if(fUnzipState.TryUnzipping(ii)) {
640  Int_t res = UnzipCache(ii);
641  if(res)
642  if (gDebug > 0)
643  Info("UnzipCache", "Unzipping failed or cache is in learning state");
644  }
645  }
646  return nullptr;
647  };
648 
649  Int_t accusz = 0;
650  std::vector<std::vector<Int_t>> basketIndices;
651  std::vector<Int_t> indices;
652  if (fUnzipGroupSize <= 0) fUnzipGroupSize = 102400;
653  for (Int_t i = 0; i < fNseek; i++) {
654  while (accusz < fUnzipGroupSize) {
655  accusz += fSeekLen[i];
656  indices.push_back(i);
657  i++;
658  if (i >= fNseek) break;
659  }
660  if (i < fNseek) i--;
661  basketIndices.push_back(indices);
662  indices.clear();
663  accusz = 0;
664  }
666  pool.Foreach(unzipFunction, basketIndices);
667  };
668 
670  fUnzipTaskGroup->Run(mapFunction);
671 
672  return 0;
673 }
674 #endif
675 
676 ////////////////////////////////////////////////////////////////////////////////
677 /// We try to read a buffer that has already been unzipped
678 /// Returns -1 in case of read failure, 0 in case it's not in the
679 /// cache and n>0 in case read from cache (number of bytes copied).
680 /// pos and len are the original values as were passed to ReadBuffer
681 /// but instead we will return the inflated buffer.
682 /// Note!! : If *buf == 0 we will allocate the buffer and it will be the
683 /// responsability of the caller to free it... it is useful for example
684 /// to pass it to the creator of TBuffer
685 
687 {
688  Int_t res = 0;
689  Int_t loc = -1;
690 
691  // We go straight to TTreeCache/TfileCacheRead, in order to get the info we need
692  // pointer to the original zipped chunk
693  // its index in the original unsorted offsets lists
694  //
695  // Actually there are situations in which copying the buffer is not
696  // useful. But the choice is among doing once more a small memcpy or a binary search in a large array. I prefer the former.
697  // Also, here we prefer not to trigger the (re)population of the chunks in the TFileCacheRead. That is
698  // better to be done in the main thread.
699 
700  Int_t myCycle = fCycle;
701 
702  if (fParallel && !fIsLearning) {
703 
704  if(fNseekMax < fNseek){
705  if (gDebug > 0)
706  Info("GetUnzipBuffer", "Changing fNseekMax from:%d to:%d", fNseekMax, fNseek);
707 
709  fNseekMax = fNseek;
710  }
711 
713  if ((fCycle == myCycle) && (loc >= 0) && (loc < fNseek) && (pos == fSeekSort[loc])) {
714 
715  // The buffer is, at minimum, in the file cache. We must know its index in the requests list
716  // In order to get its info
717  Int_t seekidx = fSeekIndex[loc];
718 
719  do {
720 
721  // If the block is ready we get it immediately.
722  // And also we don't have to alloc the blks. This is supposed to be
723  // the main thread of the app.
724  if (fUnzipState.IsUnzipped(seekidx)) {
725  if(!(*buf)) {
726  *buf = fUnzipState.fUnzipChunks[seekidx].get();
727  fUnzipState.fUnzipChunks[seekidx].release();
728  *free = kTRUE;
729  } else {
730  memcpy(*buf, fUnzipState.fUnzipChunks[seekidx].get(), fUnzipState.fUnzipLen[seekidx]);
731  fUnzipState.fUnzipChunks[seekidx].reset();
732  *free = kFALSE;
733  }
734 
735  fNFound++;
736  return fUnzipState.fUnzipLen[seekidx];
737  }
738 
739  // If the requested basket is being unzipped by a background task, we try to steal a blk to unzip.
740  Int_t reqi = -1;
741 
742  if (fUnzipState.IsProgress(seekidx)) {
743  if (fEmpty) {
744  for (Int_t ii = 0; ii < fNseek; ++ii) {
745  Int_t idx = (seekidx + 1 + ii) % fNseek;
746  if (fUnzipState.IsUntouched(idx)) {
747  if(fUnzipState.TryUnzipping(idx)) {
748  reqi = idx;
749  break;
750  }
751  }
752  }
753  if (reqi < 0) {
754  fEmpty = kFALSE;
755  } else {
756  UnzipCache(reqi);
757  }
758  }
759 
760  if ( myCycle != fCycle ) {
761  if (gDebug > 0)
762  Info("GetUnzipBuffer", "Sudden paging Break!!! fNseek: %d, fIsLearning:%d",
764 
765  seekidx = -1;
766  break;
767  }
768  }
769 
770  } while (fUnzipState.IsProgress(seekidx));
771 
772  // Here the block is not pending. It could be done or aborted or not yet being processed.
773  if ( (seekidx >= 0) && (fUnzipState.IsUnzipped(seekidx)) ) {
774  if(!(*buf)) {
775  *buf = fUnzipState.fUnzipChunks[seekidx].get();
776  fUnzipState.fUnzipChunks[seekidx].release();
777  *free = kTRUE;
778  } else {
779  memcpy(*buf, fUnzipState.fUnzipChunks[seekidx].get(), fUnzipState.fUnzipLen[seekidx]);
780  fUnzipState.fUnzipChunks[seekidx].reset();
781  *free = kFALSE;
782  }
783 
784  fNStalls++;
785  return fUnzipState.fUnzipLen[seekidx];
786  } else {
787  // This is a complete miss. We want to avoid the background tasks
788  // to try unzipping this block in the future.
789  fUnzipState.SetMissed(seekidx);
790  }
791  } else {
792  loc = -1;
794  }
795  }
796 
797  if (len > fCompBufferSize) {
798  if(fCompBuffer) delete [] fCompBuffer;
799  fCompBuffer = new char[len];
800  fCompBufferSize = len;
801  } else {
802  if (fCompBufferSize > len * 4) {
803  if(fCompBuffer) delete [] fCompBuffer;
804  fCompBuffer = new char[len*2];
805  fCompBufferSize = len * 2;
806  }
807  }
808 
809  res = 0;
810  if (!ReadBufferExt(fCompBuffer, pos, len, loc)) {
811  // Cache is invalidated and we need to wait for all unzipping tasks to befinished before fill new baskets in cache.
812 #ifdef R__USE_IMT
813  if(fUnzipTaskGroup) {
814  fUnzipTaskGroup->Cancel();
815  fUnzipTaskGroup.reset();
816  }
817 #endif
818  {
819  // Fill new baskets into cache.
821  fFile->Seek(pos);
822  res = fFile->ReadBuffer(fCompBuffer, len);
823  } // end of lock scope
824 #ifdef R__USE_IMT
825  CreateTasks();
826 #endif
827  }
828 
829  if (res) res = -1;
830 
831  if (!res) {
832  res = UnzipBuffer(buf, fCompBuffer);
833  *free = kTRUE;
834  }
835 
836  if (!fIsLearning) {
837  fNMissed++;
838  }
839 
840  return res;
841 }
842 
843 ////////////////////////////////////////////////////////////////////////////////
844 /// static function: Sets the unzip relatibe buffer size
845 
847 {
848  fgRelBuffSize = relbufferSize;
849 }
850 
851 ////////////////////////////////////////////////////////////////////////////////
852 /// Sets the size for the unzipping cache... by default it should be
853 /// two times the size of the prefetching cache
854 
856 {
857  fUnzipBufferSize = bufferSize;
858 }
859 
860 ////////////////////////////////////////////////////////////////////////////////
861 /// Unzips a ROOT specific buffer... by reading the header at the beginning.
862 /// returns the size of the inflated buffer or -1 if error
863 /// Note!! : If *dest == 0 we will allocate the buffer and it will be the
864 /// responsability of the caller to free it... it is useful for example
865 /// to pass it to the creator of TBuffer
866 /// src is the original buffer with the record (header+compressed data)
867 /// *dest is the inflated buffer (including the header)
868 
870 {
871  Int_t uzlen = 0;
872  Bool_t alloc = kFALSE;
873 
874  // Here we read the header of the buffer
875  const Int_t hlen = 128;
876  Int_t nbytes = 0, objlen = 0, keylen = 0;
877  GetRecordHeader(src, hlen, nbytes, objlen, keylen);
878 
879  if (!(*dest)) {
880  /* early consistency check */
881  UChar_t *bufcur = (UChar_t *) (src + keylen);
882  Int_t nin, nbuf;
883  if(objlen > nbytes - keylen && R__unzip_header(&nin, bufcur, &nbuf) != 0) {
884  Error("UnzipBuffer", "Inconsistency found in header (nin=%d, nbuf=%d)", nin, nbuf);
885  uzlen = -1;
886  return uzlen;
887  }
888  Int_t l = keylen + objlen;
889  *dest = new char[l];
890  alloc = kTRUE;
891  }
892  // Must unzip the buffer
893  // fSeekPos[ind]; adress of zipped buffer
894  // fSeekLen[ind]; len of the zipped buffer
895  // &fBuffer[fSeekPos[ind]]; memory address
896 
897  // This is similar to TBasket::ReadBasketBuffers
898  Bool_t oldCase = objlen == nbytes - keylen
899  && ((TBranch*)fBranches->UncheckedAt(0))->GetCompressionLevel() != 0
900  && fFile->GetVersion() <= 30401;
901 
902  if (objlen > nbytes-keylen || oldCase) {
903 
904  // Copy the key
905  memcpy(*dest, src, keylen);
906  uzlen += keylen;
907 
908  char *objbuf = *dest + keylen;
909  UChar_t *bufcur = (UChar_t *) (src + keylen);
910  Int_t nin, nbuf;
911  Int_t nout = 0;
912  Int_t noutot = 0;
913 
914  while (1) {
915  Int_t hc = R__unzip_header(&nin, bufcur, &nbuf);
916  if (hc != 0) break;
917  if (gDebug > 2)
918  Info("UnzipBuffer", " nin:%d, nbuf:%d, bufcur[3] :%d, bufcur[4] :%d, bufcur[5] :%d ",
919  nin, nbuf, bufcur[3], bufcur[4], bufcur[5]);
920  if (oldCase && (nin > objlen || nbuf > objlen)) {
921  if (gDebug > 2)
922  Info("UnzipBuffer", "oldcase objlen :%d ", objlen);
923 
924  //buffer was very likely not compressed in an old version
925  memcpy(*dest + keylen, src + keylen, objlen);
926  uzlen += objlen;
927  return uzlen;
928  }
929 
930  R__unzip(&nin, bufcur, &nbuf, objbuf, &nout);
931 
932  if (gDebug > 2)
933  Info("UnzipBuffer", "R__unzip nin:%d, bufcur:%p, nbuf:%d, objbuf:%p, nout:%d",
934  nin, bufcur, nbuf, objbuf, nout);
935 
936  if (!nout) break;
937  noutot += nout;
938  if (noutot >= objlen) break;
939  bufcur += nin;
940  objbuf += nout;
941  }
942 
943  if (noutot != objlen) {
944  Error("UnzipBuffer", "nbytes = %d, keylen = %d, objlen = %d, noutot = %d, nout=%d, nin=%d, nbuf=%d",
945  nbytes,keylen,objlen, noutot,nout,nin,nbuf);
946  uzlen = -1;
947  if(alloc) delete [] *dest;
948  *dest = 0;
949  return uzlen;
950  }
951  uzlen += objlen;
952  } else {
953  memcpy(*dest, src, keylen);
954  uzlen += keylen;
955  memcpy(*dest + keylen, src + keylen, objlen);
956  uzlen += objlen;
957  }
958  return uzlen;
959 }
960 
961 ////////////////////////////////////////////////////////////////////////////////
962 
963 void TTreeCacheUnzip::Print(Option_t* option) const {
964 
965  printf("******TreeCacheUnzip statistics for file: %s ******\n",fFile->GetName());
966  printf("Max allowed mem for pending buffers: %lld\n", fUnzipBufferSize);
967  printf("Number of blocks unzipped by threads: %d\n", fNUnzip);
968  printf("Number of hits: %d\n", fNFound);
969  printf("Number of stalls: %d\n", fNStalls);
970  printf("Number of misses: %d\n", fNMissed);
971 
972  TTreeCache::Print(option);
973 }
974 
975 ////////////////////////////////////////////////////////////////////////////////
976 
979  return TTreeCache::ReadBufferExt(buf, pos, len, loc);
980 }
void Foreach(F func, unsigned nTimes)
Execute func (with no arguments) nTimes in parallel.
std::unique_ptr< char[]> * fUnzipChunks
! [fNseek] Individual unzipped chunks. Their summed size is kept under control.
virtual const char * GetName() const
Returns name of object.
Definition: TNamed.h:47
Int_t fNtot
Total size of prefetched blocks.
Definition: TMutex.h:30
void frombuf(char *&buf, Bool_t *x)
Definition: Bytes.h:280
Long64_t fEntryMax
! last entry in the cache
Definition: TTreeCache.h:42
Bool_t IsProgress(Int_t index) const
Long64_t * GetBasketEntry() const
Definition: TBranch.h:172
Int_t fNUnzip
! number of blocks that were unzipped
Long64_t GetNextEntry()
Definition: TTree.h:273
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition: TObject.cxx:854
TFile * fFile
Pointer to file.
long long Long64_t
Definition: RtypesCore.h:69
short Version_t
Definition: RtypesCore.h:61
void UpdateBranches(TTree *tree)
update pointer to current Tree and recompute pointers to the branches in the cache ...
TObjArray * GetListOfBaskets()
Definition: TBranch.h:200
virtual void ResetCache()
This will delete the list of buffers that are in the unzipping cache and will reset certain values in...
float Float_t
Definition: RtypesCore.h:53
Int_t fNStalls
! number of hits which caused a stall
TObjArray * fBranches
! List of branches to be stored in the cache
Definition: TTreeCache.h:54
const char Option_t
Definition: RtypesCore.h:62
Int_t GetRecordHeader(char *buf, Int_t maxbytes, Int_t &nbytes, Int_t &objlen, Int_t &keylen)
Read the logical record header from the buffer buf.
Bool_t TryUnzipping(Int_t index)
Start unzipping the basket if it is untouched yet.
virtual void StopLearningPhase()
It&#39;s the same as TTreeCache::StopLearningPhase but we guarantee that we start the unzipping just afte...
A specialized TFileCacheRead object for a TTree.
Definition: TTreeCache.h:35
virtual void Seek(Long64_t offset, ERelativeTo pos=kBeg)
Seek to a specific position in the file. Pos it either kBeg, kCur or kEnd.
Definition: TFile.cxx:2173
static Int_t SetParallelUnzip(TTreeCacheUnzip::EParUnzipMode option=TTreeCacheUnzip::kEnable)
Static function that (de)activates multithreading unzipping.
virtual Bool_t ReadBuffer(char *buf, Int_t len)
Read a buffer from the file.
Definition: TFile.cxx:1670
virtual Int_t SetBufferSize(Int_t buffersize)
Change the underlying buffer size of the cache.
TDirectory * GetDirectory() const
Definition: TBranch.h:180
Long64_t fEntryMin
! first entry in the cache
Definition: TTreeCache.h:41
Bool_t FillBuffer()
Fill the cache buffer with the branches in the cache.
virtual void Prefetch(Long64_t pos, Int_t len)
Add block of length len at position pos in the list of blocks to be prefetched.
int Int_t
Definition: RtypesCore.h:41
bool Bool_t
Definition: RtypesCore.h:59
virtual void StopLearningPhase()
This is the counterpart of StartLearningPhase() and can be used to stop the learning phase...
void Clear(Int_t size)
Clear all baskets&#39; state arrays.
virtual Int_t AddBranch(TBranch *b, Bool_t subgbranches=kFALSE)
Add a branch to the list of branches to be stored in the cache this function is called by TBranch::Ge...
Definition: TTreeCache.cxx:307
Long64_t * GetTreeOffset() const
Definition: TChain.h:117
void SetUnzipBufferSize(Long64_t bufferSize)
Sets the size for the unzipping cache...
Int_t fNFound
! number of blocks that were found in the cache
Bool_t IsUnzipped(Int_t index) const
Check if the basket is unzipped already.
static Bool_t IsParallelUnzip()
Static function that tells wether the multithreading unzipping is activated.
Int_t UnzipBuffer(char **dest, char *src)
Unzips a ROOT specific buffer...
virtual Int_t GetUnzipBuffer(char **buf, Long64_t pos, Int_t len, Bool_t *free)
We try to read a buffer that has already been unzipped Returns -1 in case of read failure...
virtual Int_t ReadBufferExt(char *buf, Long64_t pos, Int_t len, Int_t &loc)
std::unique_ptr< ROOT::Experimental::TTaskGroup > fUnzipTaskGroup
Helper class to iterate over cluster of baskets.
Definition: TTree.h:235
Int_t * GetBasketBytes() const
Definition: TBranch.h:171
Int_t GetVersion() const
Definition: TFile.h:228
void Class()
Definition: Class.C:29
void SetUnzipped(Int_t index, char *buf, Int_t len)
Int_t * fSeekLen
[fNseek] Length of buffers to be prefetched
unsigned char Byte_t
Definition: RtypesCore.h:60
virtual Long64_t GetReadEntry() const
Definition: TTree.h:430
Bool_t fIsTransferred
True when fBuffer contains something valid.
Int_t fNMissed
! number of blocks that were not found in the cache and were unzipped
virtual Bool_t ContainsRange(Long64_t entrymin, Long64_t entrymax)
Return TRUE if list contains entries from entrymin to entrymax included.
Definition: TEventList.cxx:171
virtual TClusterIterator GetClusterIterator(Long64_t firstentry)
Return an iterator over the cluster of baskets starting at firstentry.
Definition: TTree.cxx:5193
int R__unzip_header(Int_t *nin, UChar_t *bufin, Int_t *lout)
void Print(Option_t *option="") const
Print cache statistics.
Specialization of TTreeCache for parallel Unzipping.
void Reset(Int_t oldSize, Int_t newSize)
Reset all baskets&#39; state arrays.
Bool_t fParallel
Indicate if we want to activate the parallelism (for this instance)
virtual TFile * GetFile() const
Definition: TDirectory.h:147
virtual Int_t ReadBufferExt(char *buf, Long64_t pos, Int_t len, Int_t &loc)
TTree * fTree
! pointer to the current Tree
Definition: TTreeCache.h:56
virtual Int_t AddBranch(TBranch *b, Bool_t subbranches=kFALSE)
Add a branch to the list of branches to be stored in the cache this function is called by TBranch::Ge...
static Double_t fgRelBuffSize
This is the percentage of the TTreeCacheUnzip that will be used.
This class provides a simple interface to execute the same task multiple times in parallel...
virtual ~TTreeCacheUnzip()
Destructor. (in general called by the TFile destructor)
virtual void SetEntryRange(Long64_t emin, Long64_t emax)
Set the minimum and maximum entry number to be processed this information helps to optimize the numbe...
Int_t CreateTasks()
We create a TTaskGroup and asynchronously maps each group of baskets(> 100 kB in total) to a task...
Int_t * fSeekIndex
[fNseek] sorted index table of fSeek
TTree * GetTree() const
Definition: TTreeCache.h:150
Int_t fNseekMax
! fNseek can change so we need to know its max size
Bool_t IsUntouched(Int_t index) const
unsigned int UInt_t
Definition: RtypesCore.h:42
A class to manage the asynchronous execution of work items.
Definition: TTaskGroup.hxx:21
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition: TObject.cxx:880
Int_t GetMaxBaskets() const
Definition: TBranch.h:203
static EParUnzipMode GetParallelUnzip()
Static function that returns the parallel option (to indicate an additional thread) ...
short Short_t
Definition: RtypesCore.h:35
A TEventList object is a list of selected events (entries) in a TTree.
Definition: TEventList.h:31
std::atomic< Byte_t > * fUnzipStatus
! [fNSeek]
Int_t fNReadPref
Number of blocks that were prefetched.
Definition: TTreeCache.h:52
virtual Long64_t GetBasketSeek(Int_t basket) const
Return address of basket in the file.
Definition: TBranch.cxx:1253
const Bool_t kFALSE
Definition: RtypesCore.h:88
Long64_t fEntryCurrent
! current lowest entry number in the cache
Definition: TTreeCache.h:43
TObject * UncheckedAt(Int_t i) const
Definition: TObjArray.h:89
virtual void Print(Option_t *option="") const
Print cache statistics.
#define ClassImp(name)
Definition: Rtypes.h:359
Int_t fUnzipGroupSize
! Min accumulated size of a group of baskets ready to be unzipped by a IMT task
double Double_t
Definition: RtypesCore.h:55
Bool_t fIsLearning
! true if cache is in learning mode
Definition: TTreeCache.h:57
Long64_t fUnzipBufferSize
! Max Size for the ready unzipped blocks (default is 2*fBufferSize)
virtual Int_t GetTreeNumber() const
Definition: TChain.h:116
R__EXTERN TEnv * gEnv
Definition: TEnv.h:171
virtual Int_t GetBufferSize() const
#define free
Definition: civetweb.c:1350
#define R__LOCKGUARD(mutex)
Bool_t IsFinished(Int_t index) const
void Init()
Initialization procedure common to all the constructors.
static void SetUnzipRelBufferSize(Float_t relbufferSize)
static function: Sets the unzip relatibe buffer size
std::vector< Int_t > fUnzipLen
! [fNseek] Length of the unzipped buffers
static TTreeCacheUnzip::EParUnzipMode fgParallel
Indicate if we want to activate the parallelism.
virtual Int_t SetBufferSize(Int_t buffersize)
Change the underlying buffer size of the cache.
void R__unzip(Int_t *nin, UChar_t *bufin, Int_t *lout, char *bufout, Int_t *nout)
virtual Long64_t GetEntries() const
Definition: TTree.h:384
Int_t fNbranches
! Number of branches in the cache
Definition: TTreeCache.h:47
Long64_t * fSeekSort
[fNseek] Position on file of buffers to be prefetched (sorted)
auto * l
Definition: textangle.C:4
#define dest(otri, vertexptr)
Definition: triangle.c:1040
Long64_t * fSeek
[fNseek] Position on file of buffers to be prefetched
A chain is a collection of files containing TTree objects.
Definition: TChain.h:33
you should not use this method at all Int_t Int_t Double_t Double_t Double_t Int_t Double_t Double_t Double_t Double_t b
Definition: TRolke.cxx:630
R__EXTERN Int_t gDebug
Definition: Rtypes.h:86
Definition: tree.py:1
UnzipState_t fUnzipState
A TTree object has a header with a name and a title.
Definition: TTree.h:70
unsigned char UChar_t
Definition: RtypesCore.h:34
TEventList * GetEventList() const
Definition: TTree.h:394
void SetFinished(Int_t index)
Set cache as finished.
virtual const char * GetName() const
Returns name of object.
Definition: TObject.cxx:357
virtual Int_t GetSize() const
Return the capacity of the collection, i.e.
Definition: TCollection.h:182
A TTree is a list of TBranches.
Definition: TBranch.h:62
Long64_t fEntryNext
! next entry number where cache must be filled
Definition: TTreeCache.h:44
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
Definition: TEnv.cxx:491
void SetEntryRange(Long64_t emin, Long64_t emax)
Set the minimum and maximum entry number to be processed this information helps to optimize the numbe...
virtual void UpdateBranches(TTree *tree)
Update pointer to current Tree and recompute pointers to the branches in the cache.
const Bool_t kTRUE
Definition: RtypesCore.h:87
Int_t UnzipCache(Int_t index)
This inflates a basket in the cache.
Long64_t BinarySearch(Long64_t n, const T *array, T value)
Binary search in an array of n values to locate value.
Definition: TMath.h:1221
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition: TObject.cxx:866
virtual Bool_t ReadBufferAsync(Long64_t offs, Int_t len)
Definition: TFile.cxx:5080
Int_t fNseek
Number of blocks to be prefetched.