Logo ROOT  
Reference Guide
TTreeCacheUnzip.cxx
Go to the documentation of this file.
1// Authors: Rene Brun 04/06/2006
2// Leandro Franco 10/04/2008
3// Fabrizio Furano (CERN) Aug 2009
4
5/*************************************************************************
6 * Copyright (C) 1995-2018, Rene Brun and Fons Rademakers. *
7 * All rights reserved. *
8 * *
9 * For the licensing terms see $ROOTSYS/LICENSE. *
10 * For the list of contributors see $ROOTSYS/README/CREDITS. *
11 *************************************************************************/
12
13/**
14\class TTreeCacheUnzip
15\ingroup tree
16
17A TTreeCache which exploits parallelized decompression of its own content.
18
19*/
20
21#include "TTreeCacheUnzip.h"
22#include "TBranch.h"
23#include "TChain.h"
24#include "TEnv.h"
25#include "TEventList.h"
26#include "TFile.h"
27#include "TMath.h"
28#include "TMutex.h"
29#include "TROOT.h"
30#include "ROOT/RMakeUnique.hxx"
31
32#ifdef R__USE_IMT
34#include "ROOT/TTaskGroup.hxx"
35#endif
36
37extern "C" void R__unzip(Int_t *nin, UChar_t *bufin, Int_t *lout, char *bufout, Int_t *nout);
38extern "C" int R__unzip_header(Int_t *nin, UChar_t *bufin, Int_t *lout);
39
41
42// The unzip cache does not consume memory by itself, it just allocates in advance
43// mem blocks which are then picked as they are by the baskets.
44// Hence there is no good reason to limit it too much
46
48
49////////////////////////////////////////////////////////////////////////////////
50/// Clear all baskets' state arrays.
51
53 for (Int_t i = 0; i < size; i++) {
54 if (!fUnzipLen.empty()) fUnzipLen[i] = 0;
55 if (fUnzipChunks) {
56 if (fUnzipChunks[i]) fUnzipChunks[i].reset();
57 }
58 if (fUnzipStatus) fUnzipStatus[i].store(0);
59 }
60}
61
62////////////////////////////////////////////////////////////////////////////////
63
65 return fUnzipStatus[index].load() == kUntouched;
66}
67
68////////////////////////////////////////////////////////////////////////////////
69
71 return fUnzipStatus[index].load() == kProgress;
72}
73
74////////////////////////////////////////////////////////////////////////////////
75
77 return fUnzipStatus[index].load() == kFinished;
78}
79
80////////////////////////////////////////////////////////////////////////////////
81/// Check if the basket is unzipped already. We must make sure the length in
82/// fUnzipLen is larger than 0.
83
85 return (fUnzipStatus[index].load() == kFinished) && (fUnzipChunks[index].get()) && (fUnzipLen[index] > 0);
86}
87
88////////////////////////////////////////////////////////////////////////////////
89/// Reset all baskets' state arrays. This function is only called by main
90/// thread and parallel processing from upper layers should be disabled such
91/// as IMT in TTree::GetEntry(). Other threads should not call this function
92/// since it is not thread-safe.
93
95 std::vector<Int_t> aUnzipLen = std::vector<Int_t>(newSize, 0);
96 std::unique_ptr<char[]> *aUnzipChunks = new std::unique_ptr<char[]>[newSize];
97 std::atomic<Byte_t> *aUnzipStatus = new std::atomic<Byte_t>[newSize];
98
99 for (Int_t i = 0; i < newSize; ++i)
100 aUnzipStatus[i].store(0);
101
102 for (Int_t i = 0; i < oldSize; i++) {
103 aUnzipLen[i] = fUnzipLen[i];
104 aUnzipChunks[i] = std::move(fUnzipChunks[i]);
105 aUnzipStatus[i].store(fUnzipStatus[i].load());
106 }
107
108 if (fUnzipChunks) delete [] fUnzipChunks;
109 if (fUnzipStatus) delete [] fUnzipStatus;
110
111 fUnzipLen = aUnzipLen;
112 fUnzipChunks = aUnzipChunks;
113 fUnzipStatus = aUnzipStatus;
114}
115
116////////////////////////////////////////////////////////////////////////////////
117/// Set cache as finished.
118/// There are three scenarios that a basket is set as finished:
119/// 1. The basket has already been unzipped.
120/// 2. The thread is aborted from unzipping process.
121/// 3. To avoid other tasks/threads work on this basket,
122/// main thread marks the basket as finished and designates itself
123/// to unzip this basket.
124
126 fUnzipLen[index] = 0;
127 fUnzipChunks[index].reset();
128 fUnzipStatus[index].store((Byte_t)kFinished);
129}
130
131////////////////////////////////////////////////////////////////////////////////
132
134 fUnzipChunks[index].reset();
135 fUnzipStatus[index].store((Byte_t)kFinished);
136}
137
138////////////////////////////////////////////////////////////////////////////////
139
141 // Update status array at the very end because we need to be synchronous with the main thread.
142 fUnzipLen[index] = len;
143 fUnzipChunks[index].reset(buf);
144 fUnzipStatus[index].store((Byte_t)kFinished);
145}
146
147////////////////////////////////////////////////////////////////////////////////
148/// Start unzipping the basket if it is untouched yet.
149
151 Byte_t oldValue = kUntouched;
152 Byte_t newValue = kProgress;
153 return fUnzipStatus[index].compare_exchange_weak(oldValue, newValue, std::memory_order_release, std::memory_order_relaxed);
154}
155
156////////////////////////////////////////////////////////////////////////////////
157
160 fEmpty(kTRUE),
161 fCycle(0),
162 fNseekMax(0),
165 fNFound(0),
166 fNMissed(0),
167 fNStalls(0),
168 fNUnzip(0)
169{
170 // Default Constructor.
171 Init();
172}
173
174////////////////////////////////////////////////////////////////////////////////
175/// Constructor.
176
178 fAsyncReading(kFALSE),
179 fEmpty(kTRUE),
180 fCycle(0),
181 fNseekMax(0),
182 fUnzipGroupSize(0),
183 fUnzipBufferSize(0),
184 fNFound(0),
185 fNMissed(0),
186 fNStalls(0),
187 fNUnzip(0)
188{
189 Init();
190}
191
192////////////////////////////////////////////////////////////////////////////////
193/// Initialization procedure common to all the constructors.
194
196{
197#ifdef R__USE_IMT
198 fUnzipTaskGroup.reset();
199#endif
200 fIOMutex = std::make_unique<TMutex>(kTRUE);
201
202 fCompBuffer = new char[16384];
203 fCompBufferSize = 16384;
204
205 fUnzipGroupSize = 102400; // Each task unzips at least 100 KB
206
207 if (fgParallel == kDisable) {
209 }
210 else if(fgParallel == kEnable || fgParallel == kForce) {
212
213 if(gDebug > 0)
214 Info("TTreeCacheUnzip", "Enabling Parallel Unzipping");
215
217
218 }
219 else {
220 Warning("TTreeCacheUnzip", "Parallel Option unknown");
221 }
222
223 // Check if asynchronous reading is supported by this TFile specialization
224 if (gEnv->GetValue("TFile.AsyncReading", 1)) {
225 if (fFile && !(fFile->ReadBufferAsync(0, 0)))
227 }
228
229}
230
231////////////////////////////////////////////////////////////////////////////////
232/// Destructor. (in general called by the TFile destructor)
233
235{
236 ResetCache();
238}
239
240////////////////////////////////////////////////////////////////////////////////
241/// Add a branch to the list of branches to be stored in the cache
242/// this function is called by TBranch::GetBasket
243/// Returns:
244/// - 0 branch added or already included
245/// - -1 on error
246
248{
249 return TTreeCache::AddBranch(b, subbranches);
250}
251
252////////////////////////////////////////////////////////////////////////////////
253/// Add a branch to the list of branches to be stored in the cache
254/// this function is called by TBranch::GetBasket
255/// Returns:
256/// - 0 branch added or already included
257/// - -1 on error
258
259Int_t TTreeCacheUnzip::AddBranch(const char *branch, Bool_t subbranches /*= kFALSE*/)
260{
261 return TTreeCache::AddBranch(branch, subbranches);
262}
263
264////////////////////////////////////////////////////////////////////////////////
265
267{
268
269 if (fNbranches <= 0) return kFALSE;
270
271 // Fill the cache buffer with the branches in the cache.
273
275 Long64_t entry = tree->GetReadEntry();
276
277 // If the entry is in the range we previously prefetched, there is
278 // no point in retrying. Note that this will also return false
279 // during the training phase (fEntryNext is then set intentional to
280 // the end of the training phase).
281 if (fEntryCurrent <= entry && entry < fEntryNext) return kFALSE;
282
283 // Triggered by the user, not the learning phase
284 if (entry == -1) entry = 0;
285
286 TTree::TClusterIterator clusterIter = tree->GetClusterIterator(entry);
287 fEntryCurrent = clusterIter();
288 fEntryNext = clusterIter.GetNextEntry();
289
291 if (fEntryMax <= 0) fEntryMax = tree->GetEntries();
293
294 // Check if owner has a TEventList set. If yes we optimize for this
295 // Special case reading only the baskets containing entries in the
296 // list.
297 TEventList *elist = fTree->GetEventList();
298 Long64_t chainOffset = 0;
299 if (elist) {
300 if (fTree->IsA() ==TChain::Class()) {
301 TChain *chain = (TChain*)fTree;
302 Int_t t = chain->GetTreeNumber();
303 chainOffset = chain->GetTreeOffset()[t];
304 }
305 }
306
307 //clear cache buffer
309
310 //store baskets
311 for (Int_t i = 0; i < fNbranches; i++) {
313 if (b->GetDirectory() == 0) continue;
314 if (b->GetDirectory()->GetFile() != fFile) continue;
315 Int_t nb = b->GetMaxBaskets();
316 Int_t *lbaskets = b->GetBasketBytes();
317 Long64_t *entries = b->GetBasketEntry();
318 if (!lbaskets || !entries) continue;
319 //we have found the branch. We now register all its baskets
320 //from the requested offset to the basket below fEntrymax
321 Int_t blistsize = b->GetListOfBaskets()->GetSize();
322 for (Int_t j=0;j<nb;j++) {
323 // This basket has already been read, skip it
324 if (j<blistsize && b->GetListOfBaskets()->UncheckedAt(j)) continue;
325
326 Long64_t pos = b->GetBasketSeek(j);
327 Int_t len = lbaskets[j];
328 if (pos <= 0 || len <= 0) continue;
329 //important: do not try to read fEntryNext, otherwise you jump to the next autoflush
330 if (entries[j] >= fEntryNext) continue;
331 if (entries[j] < entry && (j < nb - 1 && entries[j+1] <= entry)) continue;
332 if (elist) {
333 Long64_t emax = fEntryMax;
334 if (j < nb - 1) emax = entries[j+1] - 1;
335 if (!elist->ContainsRange(entries[j] + chainOffset, emax + chainOffset)) continue;
336 }
337 fNReadPref++;
338
339 TFileCacheRead::Prefetch(pos, len);
340 }
341 if (gDebug > 0) printf("Entry: %lld, registering baskets branch %s, fEntryNext=%lld, fNseek=%d, fNtot=%d\n", entry, ((TBranch*)fBranches->UncheckedAt(i))->GetName(), fEntryNext, fNseek, fNtot);
342 }
343
344 // Now fix the size of the status arrays
345 ResetCache();
347
348 return kTRUE;
349}
350
351////////////////////////////////////////////////////////////////////////////////
352/// Change the underlying buffer size of the cache.
353/// Returns:
354/// - 0 if the buffer content is still available
355/// - 1 if some or all of the buffer content has been made unavailable
356/// - -1 on error
357
359{
360 Int_t res = TTreeCache::SetBufferSize(buffersize);
361 if (res < 0) {
362 return res;
363 }
365 ResetCache();
366 return 1;
367}
368
369////////////////////////////////////////////////////////////////////////////////
370/// Set the minimum and maximum entry number to be processed
371/// this information helps to optimize the number of baskets to read
372/// when prefetching the branch buffers.
373
375{
376 TTreeCache::SetEntryRange(emin, emax);
377}
378
379////////////////////////////////////////////////////////////////////////////////
380/// It's the same as TTreeCache::StopLearningPhase but we guarantee that
381/// we start the unzipping just after getting the buffers
382
384{
386}
387
388////////////////////////////////////////////////////////////////////////////////
389///update pointer to current Tree and recompute pointers to the branches in the cache
390
392{
394}
395
396////////////////////////////////////////////////////////////////////////////////
397// //
398// From now on we have the methods concerning the threading part of the cache //
399// //
400////////////////////////////////////////////////////////////////////////////////
401
402////////////////////////////////////////////////////////////////////////////////
403/// Static function that returns the parallel option
404/// (to indicate an additional thread)
405
407{
408 return fgParallel;
409}
410
411////////////////////////////////////////////////////////////////////////////////
412/// Static function that tells wether the multithreading unzipping is activated
413
415{
416 if (fgParallel == kEnable || fgParallel == kForce)
417 return kTRUE;
418
419 return kFALSE;
420}
421
422////////////////////////////////////////////////////////////////////////////////
423/// Static function that (de)activates multithreading unzipping
424///
425/// The possible options are:
426/// - kEnable _Enable_ it, which causes an automatic detection and launches the
427/// additional thread if the number of cores in the machine is greater than
428/// one
429/// - kDisable _Disable_ will not activate the additional thread.
430/// - kForce _Force_ will start the additional thread even if there is only one
431/// core. the default will be taken as kEnable.
432///
433/// Returns 0 if there was an error, 1 otherwise.
434
436{
438 fgParallel = option;
439 return 1;
440 }
441 return 0;
442}
443
444////////////////////////////////////////////////////////////////////////////////
445// //
446// From now on we have the methods concerning the unzipping part of the cache //
447// //
448////////////////////////////////////////////////////////////////////////////////
449
450////////////////////////////////////////////////////////////////////////////////
451/// Read the logical record header from the buffer buf.
452/// That must be the pointer tho the header part not the object by itself and
453/// must contain data of at least maxbytes
454/// Returns nread;
455///
456/// In output arguments:
457///
458/// - nbytes : number of bytes in record
459/// if negative, this is a deleted record
460/// if 0, cannot read record, wrong value of argument first
461/// - objlen : uncompressed object size
462/// - keylen : length of logical record header
463///
464/// Note that the arguments objlen and keylen are returned only
465/// if maxbytes >=16
466/// Note: This was adapted from TFile... so some things dont apply
467
468Int_t TTreeCacheUnzip::GetRecordHeader(char *buf, Int_t maxbytes, Int_t &nbytes, Int_t &objlen, Int_t &keylen)
469{
470 Version_t versionkey;
471 Short_t klen;
472 UInt_t datime;
473 Int_t nb = 0, olen;
474 Int_t nread = maxbytes;
475 frombuf(buf, &nb);
476 nbytes = nb;
477 if (nb < 0) return nread;
478 // const Int_t headerSize = Int_t(sizeof(nb) +sizeof(versionkey) +sizeof(olen) +sizeof(datime) +sizeof(klen));
479 const Int_t headerSize = 16;
480 if (nread < headerSize) return nread;
481 frombuf(buf, &versionkey);
482 frombuf(buf, &olen);
483 frombuf(buf, &datime);
484 frombuf(buf, &klen);
485 if (!olen) olen = nbytes - klen;
486 objlen = olen;
487 keylen = klen;
488 return nread;
489}
490
491////////////////////////////////////////////////////////////////////////////////
492/// This will delete the list of buffers that are in the unzipping cache
493/// and will reset certain values in the cache.
494/// This name is ambiguos because the method doesn't reset the whole cache,
495/// only the part related to the unzipping
496/// Note: This method is completely different from TTreeCache::ResetCache(),
497/// in that method we were cleaning the prefetching buffer while here we
498/// delete the information about the unzipped buffers
499
501{
502 // Reset all the lists and wipe all the chunks
503 fCycle++;
505
506 if(fNseekMax < fNseek){
507 if (gDebug > 0)
508 Info("ResetCache", "Changing fNseekMax from:%d to:%d", fNseekMax, fNseek);
509
512 }
513 fEmpty = kTRUE;
514}
515
516////////////////////////////////////////////////////////////////////////////////
517/// This inflates a basket in the cache.. passing the data to a new
518/// buffer that will only wait there to be read...
519/// This function is responsible to update corresponding elements in
520/// fUnzipStatus, fUnzipChunks and fUnzipLen. Since we use atomic variables
521/// in fUnzipStatus to exclusively unzip the basket, we must update
522/// fUnzipStatus after fUnzipChunks and fUnzipLen and make sure fUnzipChunks
523/// and fUnzipLen are ready before main thread fetch the data.
524
526{
527 Int_t myCycle;
528 const Int_t hlen = 128;
529 Int_t objlen = 0, keylen = 0;
530 Int_t nbytes = 0;
531 Int_t readbuf = 0;
532
533 Long64_t rdoffs = 0;
534 Int_t rdlen = 0;
535
536 // To synchronize with the 'paging'
537 myCycle = fCycle;
538 rdoffs = fSeek[index];
539 rdlen = fSeekLen[index];
540
541 Int_t loc = -1;
542 if (!fNseek || fIsLearning) {
543 return 1;
544 }
545
546 if ((myCycle != fCycle) || !fIsTransferred) {
547 fUnzipState.SetFinished(index); // Set it as not done, main thread will take charge
548 return 1;
549 }
550
551 // Prepare a memory buffer of adequate size
552 char* locbuff = 0;
553 if (rdlen > 16384) {
554 locbuff = new char[rdlen];
555 } else if (rdlen * 3 < 16384) {
556 locbuff = new char[rdlen * 2];
557 } else {
558 locbuff = new char[16384];
559 }
560
561 readbuf = ReadBufferExt(locbuff, rdoffs, rdlen, loc);
562
563 if (readbuf <= 0) {
564 fUnzipState.SetFinished(index); // Set it as not done, main thread will take charge
565 if (locbuff) delete [] locbuff;
566 return -1;
567 }
568
569 GetRecordHeader(locbuff, hlen, nbytes, objlen, keylen);
570
571 Int_t len = (objlen > nbytes - keylen) ? keylen + objlen : nbytes;
572 // If the single unzipped chunk is really too big, reset it to not processable
573 // I.e. mark it as done but set the pointer to 0
574 // This block will be unzipped synchronously in the main thread
575 // TODO: ROOT internally breaks zipped buffers into 16MB blocks, we can probably still unzip in parallel.
576 if (len > 4 * fUnzipBufferSize) {
577 if (gDebug > 0)
578 Info("UnzipCache", "Block %d is too big, skipping.", index);
579
580 fUnzipState.SetFinished(index); // Set it as not done, main thread will take charge
581 if (locbuff) delete [] locbuff;
582 return 0;
583 }
584
585 // Unzip it into a new blk
586 char *ptr = 0;
587 Int_t loclen = UnzipBuffer(&ptr, locbuff);
588 if ((loclen > 0) && (loclen == objlen + keylen)) {
589 if ((myCycle != fCycle) || !fIsTransferred) {
590 fUnzipState.SetFinished(index); // Set it as not done, main thread will take charge
591 if (locbuff) delete [] locbuff;
592 return 1;
593 }
594 fUnzipState.SetUnzipped(index, ptr, loclen); // Set it as done
595 fNUnzip++;
596 } else {
597 fUnzipState.SetFinished(index); // Set it as not done, main thread will take charge
598 }
599
600 if (locbuff) delete [] locbuff;
601 return 0;
602}
603
604#ifdef R__USE_IMT
605////////////////////////////////////////////////////////////////////////////////
606/// We create a TTaskGroup and asynchronously maps each group of baskets(> 100 kB in total)
607/// to a task. In TTaskGroup, we use TThreadExecutor to do the actually work of unzipping
608/// a group of basket. The purpose of creating TTaskGroup is to avoid competing with main thread.
609
611{
612 auto mapFunction = [&]() {
613 auto unzipFunction = [&](const std::vector<Int_t> &indices) {
614 // If cache is invalidated and we should return immediately.
615 if (!fIsTransferred) return nullptr;
616
617 for (auto ii : indices) {
618 if(fUnzipState.TryUnzipping(ii)) {
619 Int_t res = UnzipCache(ii);
620 if(res)
621 if (gDebug > 0)
622 Info("UnzipCache", "Unzipping failed or cache is in learning state");
623 }
624 }
625 return nullptr;
626 };
627
628 Int_t accusz = 0;
629 std::vector<std::vector<Int_t>> basketIndices;
630 std::vector<Int_t> indices;
631 if (fUnzipGroupSize <= 0) fUnzipGroupSize = 102400;
632 for (Int_t i = 0; i < fNseek; i++) {
633 while (accusz < fUnzipGroupSize) {
634 accusz += fSeekLen[i];
635 indices.push_back(i);
636 i++;
637 if (i >= fNseek) break;
638 }
639 if (i < fNseek) i--;
640 basketIndices.push_back(indices);
641 indices.clear();
642 accusz = 0;
643 }
645 pool.Foreach(unzipFunction, basketIndices);
646 };
647
649 fUnzipTaskGroup->Run(mapFunction);
650
651 return 0;
652}
653#endif
654
655////////////////////////////////////////////////////////////////////////////////
656/// We try to read a buffer that has already been unzipped
657/// Returns -1 in case of read failure, 0 in case it's not in the
658/// cache and n>0 in case read from cache (number of bytes copied).
659/// pos and len are the original values as were passed to ReadBuffer
660/// but instead we will return the inflated buffer.
661/// Note!! : If *buf == 0 we will allocate the buffer and it will be the
662/// responsability of the caller to free it... it is useful for example
663/// to pass it to the creator of TBuffer
664
666{
667 Int_t res = 0;
668 Int_t loc = -1;
669
670 // We go straight to TTreeCache/TfileCacheRead, in order to get the info we need
671 // pointer to the original zipped chunk
672 // its index in the original unsorted offsets lists
673 //
674 // Actually there are situations in which copying the buffer is not
675 // useful. But the choice is among doing once more a small memcpy or a binary search in a large array. I prefer the former.
676 // Also, here we prefer not to trigger the (re)population of the chunks in the TFileCacheRead. That is
677 // better to be done in the main thread.
678
679 Int_t myCycle = fCycle;
680
681 if (fParallel && !fIsLearning) {
682
683 if(fNseekMax < fNseek){
684 if (gDebug > 0)
685 Info("GetUnzipBuffer", "Changing fNseekMax from:%d to:%d", fNseekMax, fNseek);
686
689 }
690
692 if ((fCycle == myCycle) && (loc >= 0) && (loc < fNseek) && (pos == fSeekSort[loc])) {
693
694 // The buffer is, at minimum, in the file cache. We must know its index in the requests list
695 // In order to get its info
696 Int_t seekidx = fSeekIndex[loc];
697
698 do {
699
700 // If the block is ready we get it immediately.
701 // And also we don't have to alloc the blks. This is supposed to be
702 // the main thread of the app.
703 if (fUnzipState.IsUnzipped(seekidx)) {
704 if(!(*buf)) {
705 *buf = fUnzipState.fUnzipChunks[seekidx].get();
706 fUnzipState.fUnzipChunks[seekidx].release();
707 *free = kTRUE;
708 } else {
709 memcpy(*buf, fUnzipState.fUnzipChunks[seekidx].get(), fUnzipState.fUnzipLen[seekidx]);
710 fUnzipState.fUnzipChunks[seekidx].reset();
711 *free = kFALSE;
712 }
713
714 fNFound++;
715 return fUnzipState.fUnzipLen[seekidx];
716 }
717
718 // If the requested basket is being unzipped by a background task, we try to steal a blk to unzip.
719 Int_t reqi = -1;
720
721 if (fUnzipState.IsProgress(seekidx)) {
722 if (fEmpty) {
723 for (Int_t ii = 0; ii < fNseek; ++ii) {
724 Int_t idx = (seekidx + 1 + ii) % fNseek;
725 if (fUnzipState.IsUntouched(idx)) {
726 if(fUnzipState.TryUnzipping(idx)) {
727 reqi = idx;
728 break;
729 }
730 }
731 }
732 if (reqi < 0) {
733 fEmpty = kFALSE;
734 } else {
735 UnzipCache(reqi);
736 }
737 }
738
739 if ( myCycle != fCycle ) {
740 if (gDebug > 0)
741 Info("GetUnzipBuffer", "Sudden paging Break!!! fNseek: %d, fIsLearning:%d",
743
744 seekidx = -1;
745 break;
746 }
747 }
748
749 } while (fUnzipState.IsProgress(seekidx));
750
751 // Here the block is not pending. It could be done or aborted or not yet being processed.
752 if ( (seekidx >= 0) && (fUnzipState.IsUnzipped(seekidx)) ) {
753 if(!(*buf)) {
754 *buf = fUnzipState.fUnzipChunks[seekidx].get();
755 fUnzipState.fUnzipChunks[seekidx].release();
756 *free = kTRUE;
757 } else {
758 memcpy(*buf, fUnzipState.fUnzipChunks[seekidx].get(), fUnzipState.fUnzipLen[seekidx]);
759 fUnzipState.fUnzipChunks[seekidx].reset();
760 *free = kFALSE;
761 }
762
763 fNStalls++;
764 return fUnzipState.fUnzipLen[seekidx];
765 } else {
766 // This is a complete miss. We want to avoid the background tasks
767 // to try unzipping this block in the future.
768 fUnzipState.SetMissed(seekidx);
769 }
770 } else {
771 loc = -1;
773 }
774 }
775
776 if (len > fCompBufferSize) {
777 if(fCompBuffer) delete [] fCompBuffer;
778 fCompBuffer = new char[len];
779 fCompBufferSize = len;
780 } else {
781 if (fCompBufferSize > len * 4) {
782 if(fCompBuffer) delete [] fCompBuffer;
783 fCompBuffer = new char[len*2];
784 fCompBufferSize = len * 2;
785 }
786 }
787
788 res = 0;
789 if (!ReadBufferExt(fCompBuffer, pos, len, loc)) {
790 // Cache is invalidated and we need to wait for all unzipping tasks to befinished before fill new baskets in cache.
791#ifdef R__USE_IMT
793 fUnzipTaskGroup->Cancel();
794 fUnzipTaskGroup.reset();
795 }
796#endif
797 {
798 // Fill new baskets into cache.
799 R__LOCKGUARD(fIOMutex.get());
800 fFile->Seek(pos);
801 res = fFile->ReadBuffer(fCompBuffer, len);
802 } // end of lock scope
803#ifdef R__USE_IMT
805 CreateTasks();
806 }
807#endif
808 }
809
810 if (res) res = -1;
811
812 if (!res) {
813 res = UnzipBuffer(buf, fCompBuffer);
814 *free = kTRUE;
815 }
816
817 if (!fIsLearning) {
818 fNMissed++;
819 }
820
821 return res;
822}
823
824////////////////////////////////////////////////////////////////////////////////
825/// static function: Sets the unzip relatibe buffer size
826
828{
829 fgRelBuffSize = relbufferSize;
830}
831
832////////////////////////////////////////////////////////////////////////////////
833/// Sets the size for the unzipping cache... by default it should be
834/// two times the size of the prefetching cache
835
837{
838 fUnzipBufferSize = bufferSize;
839}
840
841////////////////////////////////////////////////////////////////////////////////
842/// Unzips a ROOT specific buffer... by reading the header at the beginning.
843/// returns the size of the inflated buffer or -1 if error
844/// Note!! : If *dest == 0 we will allocate the buffer and it will be the
845/// responsability of the caller to free it... it is useful for example
846/// to pass it to the creator of TBuffer
847/// src is the original buffer with the record (header+compressed data)
848/// *dest is the inflated buffer (including the header)
849
851{
852 Int_t uzlen = 0;
853 Bool_t alloc = kFALSE;
854
855 // Here we read the header of the buffer
856 const Int_t hlen = 128;
857 Int_t nbytes = 0, objlen = 0, keylen = 0;
858 GetRecordHeader(src, hlen, nbytes, objlen, keylen);
859
860 if (!(*dest)) {
861 /* early consistency check */
862 UChar_t *bufcur = (UChar_t *) (src + keylen);
863 Int_t nin, nbuf;
864 if(objlen > nbytes - keylen && R__unzip_header(&nin, bufcur, &nbuf) != 0) {
865 Error("UnzipBuffer", "Inconsistency found in header (nin=%d, nbuf=%d)", nin, nbuf);
866 uzlen = -1;
867 return uzlen;
868 }
869 Int_t l = keylen + objlen;
870 *dest = new char[l];
871 alloc = kTRUE;
872 }
873 // Must unzip the buffer
874 // fSeekPos[ind]; adress of zipped buffer
875 // fSeekLen[ind]; len of the zipped buffer
876 // &fBuffer[fSeekPos[ind]]; memory address
877
878 // This is similar to TBasket::ReadBasketBuffers
879 Bool_t oldCase = objlen == nbytes - keylen
880 && ((TBranch*)fBranches->UncheckedAt(0))->GetCompressionLevel() != 0
881 && fFile->GetVersion() <= 30401;
882
883 if (objlen > nbytes-keylen || oldCase) {
884
885 // Copy the key
886 memcpy(*dest, src, keylen);
887 uzlen += keylen;
888
889 char *objbuf = *dest + keylen;
890 UChar_t *bufcur = (UChar_t *) (src + keylen);
891 Int_t nin, nbuf;
892 Int_t nout = 0;
893 Int_t noutot = 0;
894
895 while (1) {
896 Int_t hc = R__unzip_header(&nin, bufcur, &nbuf);
897 if (hc != 0) break;
898 if (gDebug > 2)
899 Info("UnzipBuffer", " nin:%d, nbuf:%d, bufcur[3] :%d, bufcur[4] :%d, bufcur[5] :%d ",
900 nin, nbuf, bufcur[3], bufcur[4], bufcur[5]);
901 if (oldCase && (nin > objlen || nbuf > objlen)) {
902 if (gDebug > 2)
903 Info("UnzipBuffer", "oldcase objlen :%d ", objlen);
904
905 //buffer was very likely not compressed in an old version
906 memcpy(*dest + keylen, src + keylen, objlen);
907 uzlen += objlen;
908 return uzlen;
909 }
910
911 R__unzip(&nin, bufcur, &nbuf, objbuf, &nout);
912
913 if (gDebug > 2)
914 Info("UnzipBuffer", "R__unzip nin:%d, bufcur:%p, nbuf:%d, objbuf:%p, nout:%d",
915 nin, bufcur, nbuf, objbuf, nout);
916
917 if (!nout) break;
918 noutot += nout;
919 if (noutot >= objlen) break;
920 bufcur += nin;
921 objbuf += nout;
922 }
923
924 if (noutot != objlen) {
925 Error("UnzipBuffer", "nbytes = %d, keylen = %d, objlen = %d, noutot = %d, nout=%d, nin=%d, nbuf=%d",
926 nbytes,keylen,objlen, noutot,nout,nin,nbuf);
927 uzlen = -1;
928 if(alloc) delete [] *dest;
929 *dest = 0;
930 return uzlen;
931 }
932 uzlen += objlen;
933 } else {
934 memcpy(*dest, src, keylen);
935 uzlen += keylen;
936 memcpy(*dest + keylen, src + keylen, objlen);
937 uzlen += objlen;
938 }
939 return uzlen;
940}
941
942////////////////////////////////////////////////////////////////////////////////
943
944void TTreeCacheUnzip::Print(Option_t* option) const {
945
946 printf("******TreeCacheUnzip statistics for file: %s ******\n",fFile->GetName());
947 printf("Max allowed mem for pending buffers: %lld\n", fUnzipBufferSize);
948 printf("Number of blocks unzipped by threads: %d\n", fNUnzip);
949 printf("Number of hits: %d\n", fNFound);
950 printf("Number of stalls: %d\n", fNStalls);
951 printf("Number of misses: %d\n", fNMissed);
952
953 TTreeCache::Print(option);
954}
955
956////////////////////////////////////////////////////////////////////////////////
957
959 R__LOCKGUARD(fIOMutex.get());
960 return TTreeCache::ReadBufferExt(buf, pos, len, loc);
961}
void frombuf(char *&buf, Bool_t *x)
Definition: Bytes.h:280
void Class()
Definition: Class.C:29
#define b(i)
Definition: RSha256.hxx:100
unsigned char Byte_t
Definition: RtypesCore.h:62
int Int_t
Definition: RtypesCore.h:43
short Version_t
Definition: RtypesCore.h:63
unsigned char UChar_t
Definition: RtypesCore.h:36
const Bool_t kFALSE
Definition: RtypesCore.h:90
short Short_t
Definition: RtypesCore.h:37
double Double_t
Definition: RtypesCore.h:57
R__EXTERN Int_t gDebug
Definition: RtypesCore.h:117
long long Long64_t
Definition: RtypesCore.h:71
float Float_t
Definition: RtypesCore.h:55
const Bool_t kTRUE
Definition: RtypesCore.h:89
const char Option_t
Definition: RtypesCore.h:64
#define ClassImp(name)
Definition: Rtypes.h:361
R__EXTERN TEnv * gEnv
Definition: TEnv.h:171
void R__unzip(Int_t *nin, UChar_t *bufin, Int_t *lout, char *bufout, Int_t *nout)
int R__unzip_header(Int_t *nin, UChar_t *bufin, Int_t *lout)
#define R__LOCKGUARD(mutex)
#define free
Definition: civetweb.c:1539
A class to manage the asynchronous execution of work items.
Definition: TTaskGroup.hxx:21
This class provides a simple interface to execute the same task multiple times in parallel,...
void Foreach(F func, unsigned nTimes, unsigned nChunks=0)
Execute func (with no arguments) nTimes in parallel.
A TTree is a list of TBranches.
Definition: TBranch.h:91
A chain is a collection of files containing TTree objects.
Definition: TChain.h:34
virtual Int_t GetTreeNumber() const
Definition: TChain.h:117
Long64_t * GetTreeOffset() const
Definition: TChain.h:118
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
Definition: TEnv.cxx:491
A TEventList object is a list of selected events (entries) in a TTree.
Definition: TEventList.h:31
virtual Bool_t ContainsRange(Long64_t entrymin, Long64_t entrymax)
Return TRUE if list contains entries from entrymin to entrymax included.
Definition: TEventList.cxx:172
Int_t * fSeekIndex
[fNseek] sorted index table of fSeek
virtual Int_t ReadBufferExt(char *buf, Long64_t pos, Int_t len, Int_t &loc)
Long64_t * fSeekSort
[fNseek] Position on file of buffers to be prefetched (sorted)
Int_t * fSeekLen
[fNseek] Length of buffers to be prefetched
Int_t fNtot
Total size of prefetched blocks.
virtual void Prefetch(Long64_t pos, Int_t len)
Add block of length len at position pos in the list of blocks to be prefetched.
Long64_t * fSeek
[fNseek] Position on file of buffers to be prefetched
Bool_t fIsTransferred
True when fBuffer contains something valid.
TFile * fFile
Pointer to file.
Int_t fNseek
Number of blocks to be prefetched.
virtual Int_t GetBufferSize() const
virtual void Seek(Long64_t offset, ERelativeTo pos=kBeg)
Seek to a specific position in the file. Pos it either kBeg, kCur or kEnd.
Definition: TFile.cxx:2135
Int_t GetVersion() const
Definition: TFile.h:236
virtual Bool_t ReadBufferAsync(Long64_t offs, Int_t len)
Definition: TFile.cxx:5044
virtual Bool_t ReadBuffer(char *buf, Int_t len)
Read a buffer from the file.
Definition: TFile.cxx:1631
virtual const char * GetName() const
Returns name of object.
Definition: TNamed.h:47
TObject * UncheckedAt(Int_t i) const
Definition: TObjArray.h:90
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition: TObject.cxx:877
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition: TObject.cxx:891
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition: TObject.cxx:865
A TTreeCache which exploits parallelized decompression of its own content.
virtual void StopLearningPhase()
It's the same as TTreeCache::StopLearningPhase but we guarantee that we start the unzipping just afte...
UnzipState_t fUnzipState
Bool_t FillBuffer()
Fill the cache buffer with the branches in the cache.
void Init()
Initialization procedure common to all the constructors.
Int_t UnzipCache(Int_t index)
This inflates a basket in the cache.
Int_t fNMissed
! number of blocks that were not found in the cache and were unzipped
void SetEntryRange(Long64_t emin, Long64_t emax)
Set the minimum and maximum entry number to be processed this information helps to optimize the numbe...
virtual Int_t ReadBufferExt(char *buf, Long64_t pos, Int_t len, Int_t &loc)
Int_t fNseekMax
! fNseek can change so we need to know its max size
virtual ~TTreeCacheUnzip()
Destructor. (in general called by the TFile destructor)
void UpdateBranches(TTree *tree)
update pointer to current Tree and recompute pointers to the branches in the cache
Int_t fNStalls
! number of hits which caused a stall
static TTreeCacheUnzip::EParUnzipMode fgParallel
Indicate if we want to activate the parallelism.
std::unique_ptr< ROOT::Experimental::TTaskGroup > fUnzipTaskGroup
static Int_t SetParallelUnzip(TTreeCacheUnzip::EParUnzipMode option=TTreeCacheUnzip::kEnable)
Static function that (de)activates multithreading unzipping.
Int_t UnzipBuffer(char **dest, char *src)
Unzips a ROOT specific buffer... by reading the header at the beginning.
Int_t GetRecordHeader(char *buf, Int_t maxbytes, Int_t &nbytes, Int_t &objlen, Int_t &keylen)
Read the logical record header from the buffer buf.
static void SetUnzipRelBufferSize(Float_t relbufferSize)
static function: Sets the unzip relatibe buffer size
void Print(Option_t *option="") const
Print cache statistics.
std::unique_ptr< TMutex > fIOMutex
Int_t fNUnzip
! number of blocks that were unzipped
Int_t CreateTasks()
We create a TTaskGroup and asynchronously maps each group of baskets(> 100 kB in total) to a task.
Bool_t fParallel
Indicate if we want to activate the parallelism (for this instance)
Long64_t fUnzipBufferSize
! Max Size for the ready unzipped blocks (default is 2*fBufferSize)
static Bool_t IsParallelUnzip()
Static function that tells wether the multithreading unzipping is activated.
void SetUnzipBufferSize(Long64_t bufferSize)
Sets the size for the unzipping cache... by default it should be two times the size of the prefetchin...
Int_t fUnzipGroupSize
! Min accumulated size of a group of baskets ready to be unzipped by a IMT task
virtual Int_t SetBufferSize(Int_t buffersize)
Change the underlying buffer size of the cache.
Int_t fNFound
! number of blocks that were found in the cache
virtual Int_t GetUnzipBuffer(char **buf, Long64_t pos, Int_t len, Bool_t *free)
We try to read a buffer that has already been unzipped Returns -1 in case of read failure,...
virtual Int_t AddBranch(TBranch *b, Bool_t subbranches=kFALSE)
Add a branch to the list of branches to be stored in the cache this function is called by TBranch::Ge...
static Double_t fgRelBuffSize
This is the percentage of the TTreeCacheUnzip that will be used.
virtual void ResetCache()
This will delete the list of buffers that are in the unzipping cache and will reset certain values in...
static EParUnzipMode GetParallelUnzip()
Static function that returns the parallel option (to indicate an additional thread)
A cache to speed-up the reading of ROOT datasets.
Definition: TTreeCache.h:35
virtual Int_t AddBranch(TBranch *b, Bool_t subgbranches=kFALSE)
Add a branch to the list of branches to be stored in the cache this function is called by the user vi...
Definition: TTreeCache.cxx:368
virtual void UpdateBranches(TTree *tree)
Update pointer to current Tree and recompute pointers to the branches in the cache.
virtual Int_t SetBufferSize(Int_t buffersize)
Change the underlying buffer size of the cache.
Long64_t fEntryMin
! first entry in the cache
Definition: TTreeCache.h:41
Long64_t fEntryNext
! next entry number where cache must be filled
Definition: TTreeCache.h:44
Bool_t fIsLearning
! true if cache is in learning mode
Definition: TTreeCache.h:57
TTree * GetTree() const
Definition: TTreeCache.h:152
virtual void SetEntryRange(Long64_t emin, Long64_t emax)
Set the minimum and maximum entry number to be processed this information helps to optimize the numbe...
Long64_t fEntryMax
! last entry in the cache
Definition: TTreeCache.h:42
Long64_t fEntryCurrent
! current lowest entry number in the cache
Definition: TTreeCache.h:43
Int_t fNReadPref
Number of blocks that were prefetched.
Definition: TTreeCache.h:52
TTree * fTree
! pointer to the current Tree
Definition: TTreeCache.h:56
virtual void StopLearningPhase()
This is the counterpart of StartLearningPhase() and can be used to stop the learning phase.
Int_t fNbranches
! Number of branches in the cache
Definition: TTreeCache.h:47
virtual void Print(Option_t *option="") const
Print cache statistics.
TObjArray * fBranches
! List of branches to be stored in the cache
Definition: TTreeCache.h:54
Helper class to iterate over cluster of baskets.
Definition: TTree.h:265
Long64_t GetNextEntry()
Definition: TTree.h:302
A TTree represents a columnar dataset.
Definition: TTree.h:78
TEventList * GetEventList() const
Definition: TTree.h:467
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
Definition: TROOT.cxx:557
Long64_t BinarySearch(Long64_t n, const T *array, T value)
Definition: TMathBase.h:278
Definition: tree.py:1
Bool_t IsFinished(Int_t index) const
void Reset(Int_t oldSize, Int_t newSize)
Reset all baskets' state arrays.
void Clear(Int_t size)
Clear all baskets' state arrays.
Bool_t IsProgress(Int_t index) const
std::atomic< Byte_t > * fUnzipStatus
! [fNSeek]
void SetUnzipped(Int_t index, char *buf, Int_t len)
std::vector< Int_t > fUnzipLen
! [fNseek] Length of the unzipped buffers
Bool_t TryUnzipping(Int_t index)
Start unzipping the basket if it is untouched yet.
Bool_t IsUnzipped(Int_t index) const
Check if the basket is unzipped already.
Bool_t IsUntouched(Int_t index) const
void SetFinished(Int_t index)
Set cache as finished.
std::unique_ptr< char[]> * fUnzipChunks
! [fNseek] Individual unzipped chunks. Their summed size is kept under control.
auto * l
Definition: textangle.C:4
#define dest(otri, vertexptr)
Definition: triangle.c:1040