Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
TTreeCacheUnzip.cxx
Go to the documentation of this file.
1// Authors: Rene Brun 04/06/2006
2// Leandro Franco 10/04/2008
3// Fabrizio Furano (CERN) Aug 2009
4
5/*************************************************************************
6 * Copyright (C) 1995-2018, Rene Brun and Fons Rademakers. *
7 * All rights reserved. *
8 * *
9 * For the licensing terms see $ROOTSYS/LICENSE. *
10 * For the list of contributors see $ROOTSYS/README/CREDITS. *
11 *************************************************************************/
12
13/**
14\class TTreeCacheUnzip
15\ingroup tree
16
17A TTreeCache which exploits parallelized decompression of its own content.
18
19*/
20
21#include "TTreeCacheUnzip.h"
22#include "TBranch.h"
23#include "TChain.h"
24#include "TEnv.h"
25#include "TEventList.h"
26#include "TFile.h"
27#include "TMath.h"
28#include "TROOT.h"
29#include "TMutex.h"
30#include "ROOT/RMakeUnique.hxx"
31
32#ifdef R__USE_IMT
34#include "ROOT/TTaskGroup.hxx"
35#endif
36
37extern "C" void R__unzip(Int_t *nin, UChar_t *bufin, Int_t *lout, char *bufout, Int_t *nout);
38extern "C" int R__unzip_header(Int_t *nin, UChar_t *bufin, Int_t *lout);
39
41
42// The unzip cache does not consume memory by itself, it just allocates in advance
43// mem blocks which are then picked as they are by the baskets.
44// Hence there is no good reason to limit it too much
46
48
49////////////////////////////////////////////////////////////////////////////////
50/// Clear all baskets' state arrays.
51
53 for (Int_t i = 0; i < size; i++) {
54 if (!fUnzipLen.empty()) fUnzipLen[i] = 0;
55 if (fUnzipChunks) {
56 if (fUnzipChunks[i]) fUnzipChunks[i].reset();
57 }
58 if (fUnzipStatus) fUnzipStatus[i].store(0);
59 }
60}
61
62////////////////////////////////////////////////////////////////////////////////
63
65 return fUnzipStatus[index].load() == kUntouched;
66}
67
68////////////////////////////////////////////////////////////////////////////////
69
71 return fUnzipStatus[index].load() == kProgress;
72}
73
74////////////////////////////////////////////////////////////////////////////////
75
77 return fUnzipStatus[index].load() == kFinished;
78}
79
80////////////////////////////////////////////////////////////////////////////////
81/// Check if the basket is unzipped already. We must make sure the length in
82/// fUnzipLen is larger than 0.
83
85 return (fUnzipStatus[index].load() == kFinished) && (fUnzipChunks[index].get()) && (fUnzipLen[index] > 0);
86}
87
88////////////////////////////////////////////////////////////////////////////////
89/// Reset all baskets' state arrays. This function is only called by main
90/// thread and parallel processing from upper layers should be disabled such
91/// as IMT in TTree::GetEntry(). Other threads should not call this function
92/// since it is not thread-safe.
93
95 std::vector<Int_t> aUnzipLen = std::vector<Int_t>(newSize, 0);
96 std::unique_ptr<char[]> *aUnzipChunks = new std::unique_ptr<char[]>[newSize];
97 std::atomic<Byte_t> *aUnzipStatus = new std::atomic<Byte_t>[newSize];
98
99 for (Int_t i = 0; i < newSize; ++i)
100 aUnzipStatus[i].store(0);
101
102 for (Int_t i = 0; i < oldSize; i++) {
103 aUnzipLen[i] = fUnzipLen[i];
104 aUnzipChunks[i] = std::move(fUnzipChunks[i]);
105 aUnzipStatus[i].store(fUnzipStatus[i].load());
106 }
107
108 if (fUnzipChunks) delete [] fUnzipChunks;
109 if (fUnzipStatus) delete [] fUnzipStatus;
110
111 fUnzipLen = aUnzipLen;
112 fUnzipChunks = aUnzipChunks;
113 fUnzipStatus = aUnzipStatus;
114}
115
116////////////////////////////////////////////////////////////////////////////////
117/// Set cache as finished.
118/// There are three scenarios that a basket is set as finished:
119/// 1. The basket has already been unzipped.
120/// 2. The thread is aborted from unzipping process.
121/// 3. To avoid other tasks/threads work on this basket,
122/// main thread marks the basket as finished and designates itself
123/// to unzip this basket.
124
126 fUnzipLen[index] = 0;
127 fUnzipChunks[index].reset();
128 fUnzipStatus[index].store((Byte_t)kFinished);
129}
130
131////////////////////////////////////////////////////////////////////////////////
132
134 fUnzipChunks[index].reset();
135 fUnzipStatus[index].store((Byte_t)kFinished);
136}
137
138////////////////////////////////////////////////////////////////////////////////
139
141 // Update status array at the very end because we need to be synchronous with the main thread.
142 fUnzipLen[index] = len;
143 fUnzipChunks[index].reset(buf);
144 fUnzipStatus[index].store((Byte_t)kFinished);
145}
146
147////////////////////////////////////////////////////////////////////////////////
148/// Start unzipping the basket if it is untouched yet.
149
151 Byte_t oldValue = kUntouched;
152 Byte_t newValue = kProgress;
153 return fUnzipStatus[index].compare_exchange_weak(oldValue, newValue, std::memory_order_release, std::memory_order_relaxed);
154}
155
156////////////////////////////////////////////////////////////////////////////////
157
160 fEmpty(kTRUE),
161 fCycle(0),
162 fNseekMax(0),
165 fNFound(0),
166 fNMissed(0),
167 fNStalls(0),
168 fNUnzip(0)
169{
170 // Default Constructor.
171 Init();
172}
173
174////////////////////////////////////////////////////////////////////////////////
175/// Constructor.
176
178 fAsyncReading(kFALSE),
179 fEmpty(kTRUE),
180 fCycle(0),
181 fNseekMax(0),
182 fUnzipGroupSize(0),
183 fUnzipBufferSize(0),
184 fNFound(0),
185 fNMissed(0),
186 fNStalls(0),
187 fNUnzip(0)
188{
189 Init();
190}
191
192////////////////////////////////////////////////////////////////////////////////
193/// Initialization procedure common to all the constructors.
194
196{
197#ifdef R__USE_IMT
198 fUnzipTaskGroup.reset();
199#endif
200 fIOMutex = std::make_unique<TMutex>(kTRUE);
201
202 fCompBuffer = new char[16384];
203 fCompBufferSize = 16384;
204
205 fUnzipGroupSize = 102400; // Each task unzips at least 100 KB
206
207 if (fgParallel == kDisable) {
209 }
210 else if(fgParallel == kEnable || fgParallel == kForce) {
212
213 if(gDebug > 0)
214 Info("TTreeCacheUnzip", "Enabling Parallel Unzipping");
215
217
218 }
219 else {
220 Warning("TTreeCacheUnzip", "Parallel Option unknown");
221 }
222
223 // Check if asynchronous reading is supported by this TFile specialization
224 if (gEnv->GetValue("TFile.AsyncReading", 1)) {
225 if (fFile && !(fFile->ReadBufferAsync(0, 0)))
227 }
228
229}
230
231////////////////////////////////////////////////////////////////////////////////
232/// Destructor. (in general called by the TFile destructor)
233
235{
236 ResetCache();
238}
239
240////////////////////////////////////////////////////////////////////////////////
241/// Add a branch to the list of branches to be stored in the cache
242/// this function is called by TBranch::GetBasket
243/// Returns:
244/// - 0 branch added or already included
245/// - -1 on error
246
248{
249 return TTreeCache::AddBranch(b, subbranches);
250}
251
252////////////////////////////////////////////////////////////////////////////////
253/// Add a branch to the list of branches to be stored in the cache
254/// this function is called by TBranch::GetBasket
255/// Returns:
256/// - 0 branch added or already included
257/// - -1 on error
258
259Int_t TTreeCacheUnzip::AddBranch(const char *branch, Bool_t subbranches /*= kFALSE*/)
260{
261 return TTreeCache::AddBranch(branch, subbranches);
262}
263
264////////////////////////////////////////////////////////////////////////////////
265
267{
268
269 if (fNbranches <= 0) return kFALSE;
270
271 // Fill the cache buffer with the branches in the cache.
273
275 Long64_t entry = tree->GetReadEntry();
276
277 // If the entry is in the range we previously prefetched, there is
278 // no point in retrying. Note that this will also return false
279 // during the training phase (fEntryNext is then set intentional to
280 // the end of the training phase).
281 if (fEntryCurrent <= entry && entry < fEntryNext) return kFALSE;
282
283 // Triggered by the user, not the learning phase
284 if (entry == -1) entry = 0;
285
286 TTree::TClusterIterator clusterIter = tree->GetClusterIterator(entry);
287 fEntryCurrent = clusterIter();
288 fEntryNext = clusterIter.GetNextEntry();
289
291 if (fEntryMax <= 0) fEntryMax = tree->GetEntries();
293
294 // Check if owner has a TEventList set. If yes we optimize for this
295 // Special case reading only the baskets containing entries in the
296 // list.
297 TEventList *elist = fTree->GetEventList();
298 Long64_t chainOffset = 0;
299 if (elist) {
300 if (fTree->IsA() ==TChain::Class()) {
301 TChain *chain = (TChain*)fTree;
302 Int_t t = chain->GetTreeNumber();
303 chainOffset = chain->GetTreeOffset()[t];
304 }
305 }
306
307 //clear cache buffer
309
310 //store baskets
311 for (Int_t i = 0; i < fNbranches; i++) {
313 if (b->GetDirectory() == 0) continue;
314 if (b->GetDirectory()->GetFile() != fFile) continue;
315 Int_t nb = b->GetMaxBaskets();
316 Int_t *lbaskets = b->GetBasketBytes();
317 Long64_t *entries = b->GetBasketEntry();
318 if (!lbaskets || !entries) continue;
319 //we have found the branch. We now register all its baskets
320 //from the requested offset to the basket below fEntrymax
321 Int_t blistsize = b->GetListOfBaskets()->GetSize();
322 for (Int_t j=0;j<nb;j++) {
323 // This basket has already been read, skip it
324 if (j<blistsize && b->GetListOfBaskets()->UncheckedAt(j)) continue;
325
326 Long64_t pos = b->GetBasketSeek(j);
327 Int_t len = lbaskets[j];
328 if (pos <= 0 || len <= 0) continue;
329 //important: do not try to read fEntryNext, otherwise you jump to the next autoflush
330 if (entries[j] >= fEntryNext) continue;
331 if (entries[j] < entry && (j < nb - 1 && entries[j+1] <= entry)) continue;
332 if (elist) {
333 Long64_t emax = fEntryMax;
334 if (j < nb - 1) emax = entries[j+1] - 1;
335 if (!elist->ContainsRange(entries[j] + chainOffset, emax + chainOffset)) continue;
336 }
337 fNReadPref++;
338
339 TFileCacheRead::Prefetch(pos, len);
340 }
341 if (gDebug > 0) printf("Entry: %lld, registering baskets branch %s, fEntryNext=%lld, fNseek=%d, fNtot=%d\n", entry, ((TBranch*)fBranches->UncheckedAt(i))->GetName(), fEntryNext, fNseek, fNtot);
342 }
343
344 // Now fix the size of the status arrays
345 ResetCache();
347
348 return kTRUE;
349}
350
351////////////////////////////////////////////////////////////////////////////////
352/// Change the underlying buffer size of the cache.
353/// Returns:
354/// - 0 if the buffer content is still available
355/// - 1 if some or all of the buffer content has been made unavailable
356/// - -1 on error
357
359{
360 Int_t res = TTreeCache::SetBufferSize(buffersize);
361 if (res < 0) {
362 return res;
363 }
365 ResetCache();
366 return 1;
367}
368
369////////////////////////////////////////////////////////////////////////////////
370/// Set the minimum and maximum entry number to be processed
371/// this information helps to optimize the number of baskets to read
372/// when prefetching the branch buffers.
373
375{
376 TTreeCache::SetEntryRange(emin, emax);
377}
378
379////////////////////////////////////////////////////////////////////////////////
380/// It's the same as TTreeCache::StopLearningPhase but we guarantee that
381/// we start the unzipping just after getting the buffers
382
384{
386}
387
388////////////////////////////////////////////////////////////////////////////////
389///update pointer to current Tree and recompute pointers to the branches in the cache
390
392{
394}
395
396////////////////////////////////////////////////////////////////////////////////
397// //
398// From now on we have the methods concerning the threading part of the cache //
399// //
400////////////////////////////////////////////////////////////////////////////////
401
402////////////////////////////////////////////////////////////////////////////////
403/// Static function that returns the parallel option
404/// (to indicate an additional thread)
405
407{
408 return fgParallel;
409}
410
411////////////////////////////////////////////////////////////////////////////////
412/// Static function that tells wether the multithreading unzipping is activated
413
415{
416 if (fgParallel == kEnable || fgParallel == kForce)
417 return kTRUE;
418
419 return kFALSE;
420}
421
422////////////////////////////////////////////////////////////////////////////////
423/// Static function that (de)activates multithreading unzipping
424///
425/// The possible options are:
426/// - kEnable _Enable_ it, which causes an automatic detection and launches the
427/// additional thread if the number of cores in the machine is greater than
428/// one
429/// - kDisable _Disable_ will not activate the additional thread.
430/// - kForce _Force_ will start the additional thread even if there is only one
431/// core. the default will be taken as kEnable.
432///
433/// Returns 0 if there was an error, 1 otherwise.
434
436{
438 fgParallel = option;
439 return 1;
440 }
441 return 0;
442}
443
444////////////////////////////////////////////////////////////////////////////////
445// //
446// From now on we have the methods concerning the unzipping part of the cache //
447// //
448////////////////////////////////////////////////////////////////////////////////
449
450////////////////////////////////////////////////////////////////////////////////
451/// Read the logical record header from the buffer buf.
452/// That must be the pointer tho the header part not the object by itself and
453/// must contain data of at least maxbytes
454/// Returns nread;
455///
456/// In output arguments:
457///
458/// - nbytes : number of bytes in record
459/// if negative, this is a deleted record
460/// if 0, cannot read record, wrong value of argument first
461/// - objlen : uncompressed object size
462/// - keylen : length of logical record header
463///
464/// Note that the arguments objlen and keylen are returned only
465/// if maxbytes >=16
466/// Note: This was adapted from TFile... so some things dont apply
467
468Int_t TTreeCacheUnzip::GetRecordHeader(char *buf, Int_t maxbytes, Int_t &nbytes, Int_t &objlen, Int_t &keylen)
469{
470 Version_t versionkey;
471 Short_t klen;
472 UInt_t datime;
473 Int_t nb = 0, olen;
474 Int_t nread = maxbytes;
475 frombuf(buf, &nb);
476 nbytes = nb;
477 if (nb < 0) return nread;
478 // const Int_t headerSize = Int_t(sizeof(nb) +sizeof(versionkey) +sizeof(olen) +sizeof(datime) +sizeof(klen));
479 const Int_t headerSize = 16;
480 if (nread < headerSize) return nread;
481 frombuf(buf, &versionkey);
482 frombuf(buf, &olen);
483 frombuf(buf, &datime);
484 frombuf(buf, &klen);
485 if (!olen) olen = nbytes - klen;
486 objlen = olen;
487 keylen = klen;
488 return nread;
489}
490
491////////////////////////////////////////////////////////////////////////////////
492/// This will delete the list of buffers that are in the unzipping cache
493/// and will reset certain values in the cache.
494/// This name is ambiguos because the method doesn't reset the whole cache,
495/// only the part related to the unzipping
496/// Note: This method is completely different from TTreeCache::ResetCache(),
497/// in that method we were cleaning the prefetching buffer while here we
498/// delete the information about the unzipped buffers
499
501{
502 // Reset all the lists and wipe all the chunks
503 fCycle++;
505
506 if(fNseekMax < fNseek){
507 if (gDebug > 0)
508 Info("ResetCache", "Changing fNseekMax from:%d to:%d", fNseekMax, fNseek);
509
512 }
513 fEmpty = kTRUE;
514}
515
516////////////////////////////////////////////////////////////////////////////////
517/// This inflates a basket in the cache.. passing the data to a new
518/// buffer that will only wait there to be read...
519/// This function is responsible to update corresponding elements in
520/// fUnzipStatus, fUnzipChunks and fUnzipLen. Since we use atomic variables
521/// in fUnzipStatus to exclusively unzip the basket, we must update
522/// fUnzipStatus after fUnzipChunks and fUnzipLen and make sure fUnzipChunks
523/// and fUnzipLen are ready before main thread fetch the data.
524
526{
527 Int_t myCycle;
528 const Int_t hlen = 128;
529 Int_t objlen = 0, keylen = 0;
530 Int_t nbytes = 0;
531 Int_t readbuf = 0;
532
533 Long64_t rdoffs = 0;
534 Int_t rdlen = 0;
535
536 // To synchronize with the 'paging'
537 myCycle = fCycle;
538 rdoffs = fSeek[index];
539 rdlen = fSeekLen[index];
540
541 Int_t loc = -1;
542 if (!fNseek || fIsLearning) {
543 return 1;
544 }
545
546 if ((myCycle != fCycle) || !fIsTransferred) {
547 fUnzipState.SetFinished(index); // Set it as not done, main thread will take charge
548 return 1;
549 }
550
551 // Prepare a memory buffer of adequate size
552 char* locbuff = 0;
553 if (rdlen > 16384) {
554 locbuff = new char[rdlen];
555 } else if (rdlen * 3 < 16384) {
556 locbuff = new char[rdlen * 2];
557 } else {
558 locbuff = new char[16384];
559 }
560
561 readbuf = ReadBufferExt(locbuff, rdoffs, rdlen, loc);
562
563 if (readbuf <= 0) {
564 fUnzipState.SetFinished(index); // Set it as not done, main thread will take charge
565 if (locbuff) delete [] locbuff;
566 return -1;
567 }
568
569 GetRecordHeader(locbuff, hlen, nbytes, objlen, keylen);
570
571 Int_t len = (objlen > nbytes - keylen) ? keylen + objlen : nbytes;
572 // If the single unzipped chunk is really too big, reset it to not processable
573 // I.e. mark it as done but set the pointer to 0
574 // This block will be unzipped synchronously in the main thread
575 // TODO: ROOT internally breaks zipped buffers into 16MB blocks, we can probably still unzip in parallel.
576 if (len > 4 * fUnzipBufferSize) {
577 if (gDebug > 0)
578 Info("UnzipCache", "Block %d is too big, skipping.", index);
579
580 fUnzipState.SetFinished(index); // Set it as not done, main thread will take charge
581 if (locbuff) delete [] locbuff;
582 return 0;
583 }
584
585 // Unzip it into a new blk
586 char *ptr = nullptr;
587 Int_t loclen = UnzipBuffer(&ptr, locbuff);
588 if ((loclen > 0) && (loclen == objlen + keylen)) {
589 if ((myCycle != fCycle) || !fIsTransferred) {
590 fUnzipState.SetFinished(index); // Set it as not done, main thread will take charge
591 if (locbuff) delete [] locbuff;
592 delete [] ptr;
593 return 1;
594 }
595 fUnzipState.SetUnzipped(index, ptr, loclen); // Set it as done
596 fNUnzip++;
597 } else {
598 fUnzipState.SetFinished(index); // Set it as not done, main thread will take charge
599 delete [] ptr;
600 }
601
602 if (locbuff) delete [] locbuff;
603 return 0;
604}
605
606#ifdef R__USE_IMT
607////////////////////////////////////////////////////////////////////////////////
608/// We create a TTaskGroup and asynchronously maps each group of baskets(> 100 kB in total)
609/// to a task. In TTaskGroup, we use TThreadExecutor to do the actually work of unzipping
610/// a group of basket. The purpose of creating TTaskGroup is to avoid competing with main thread.
611
613{
614 auto mapFunction = [&]() {
615 auto unzipFunction = [&](const std::vector<Int_t> &indices) {
616 // If cache is invalidated and we should return immediately.
617 if (!fIsTransferred) return nullptr;
618
619 for (auto ii : indices) {
620 if(fUnzipState.TryUnzipping(ii)) {
621 Int_t res = UnzipCache(ii);
622 if(res)
623 if (gDebug > 0)
624 Info("UnzipCache", "Unzipping failed or cache is in learning state");
625 }
626 }
627 return nullptr;
628 };
629
630 Int_t accusz = 0;
631 std::vector<std::vector<Int_t>> basketIndices;
632 std::vector<Int_t> indices;
633 if (fUnzipGroupSize <= 0) fUnzipGroupSize = 102400;
634 for (Int_t i = 0; i < fNseek; i++) {
635 while (accusz < fUnzipGroupSize) {
636 accusz += fSeekLen[i];
637 indices.push_back(i);
638 i++;
639 if (i >= fNseek) break;
640 }
641 if (i < fNseek) i--;
642 basketIndices.push_back(indices);
643 indices.clear();
644 accusz = 0;
645 }
647 pool.Foreach(unzipFunction, basketIndices);
648 };
649
651 fUnzipTaskGroup->Run(mapFunction);
652
653 return 0;
654}
655#endif
656
657////////////////////////////////////////////////////////////////////////////////
658/// We try to read a buffer that has already been unzipped
659/// Returns -1 in case of read failure, 0 in case it's not in the
660/// cache and n>0 in case read from cache (number of bytes copied).
661/// pos and len are the original values as were passed to ReadBuffer
662/// but instead we will return the inflated buffer.
663/// Note!! : If *buf == 0 we will allocate the buffer and it will be the
664/// responsibility of the caller to free it... it is useful for example
665/// to pass it to the creator of TBuffer
666
668{
669 Int_t res = 0;
670 Int_t loc = -1;
671
672 // We go straight to TTreeCache/TfileCacheRead, in order to get the info we need
673 // pointer to the original zipped chunk
674 // its index in the original unsorted offsets lists
675 //
676 // Actually there are situations in which copying the buffer is not
677 // useful. But the choice is among doing once more a small memcpy or a binary search in a large array. I prefer the former.
678 // Also, here we prefer not to trigger the (re)population of the chunks in the TFileCacheRead. That is
679 // better to be done in the main thread.
680
681 Int_t myCycle = fCycle;
682
683 if (fParallel && !fIsLearning) {
684
685 if(fNseekMax < fNseek){
686 if (gDebug > 0)
687 Info("GetUnzipBuffer", "Changing fNseekMax from:%d to:%d", fNseekMax, fNseek);
688
691 }
692
694 if ((fCycle == myCycle) && (loc >= 0) && (loc < fNseek) && (pos == fSeekSort[loc])) {
695
696 // The buffer is, at minimum, in the file cache. We must know its index in the requests list
697 // In order to get its info
698 Int_t seekidx = fSeekIndex[loc];
699
700 do {
701
702 // If the block is ready we get it immediately.
703 // And also we don't have to alloc the blks. This is supposed to be
704 // the main thread of the app.
705 if (fUnzipState.IsUnzipped(seekidx)) {
706 if(!(*buf)) {
707 *buf = fUnzipState.fUnzipChunks[seekidx].get();
708 fUnzipState.fUnzipChunks[seekidx].release();
709 *free = kTRUE;
710 } else {
711 memcpy(*buf, fUnzipState.fUnzipChunks[seekidx].get(), fUnzipState.fUnzipLen[seekidx]);
712 fUnzipState.fUnzipChunks[seekidx].reset();
713 *free = kFALSE;
714 }
715
716 fNFound++;
717 return fUnzipState.fUnzipLen[seekidx];
718 }
719
720 // If the requested basket is being unzipped by a background task, we try to steal a blk to unzip.
721 Int_t reqi = -1;
722
723 if (fUnzipState.IsProgress(seekidx)) {
724 if (fEmpty) {
725 for (Int_t ii = 0; ii < fNseek; ++ii) {
726 Int_t idx = (seekidx + 1 + ii) % fNseek;
727 if (fUnzipState.IsUntouched(idx)) {
728 if(fUnzipState.TryUnzipping(idx)) {
729 reqi = idx;
730 break;
731 }
732 }
733 }
734 if (reqi < 0) {
735 fEmpty = kFALSE;
736 } else {
737 UnzipCache(reqi);
738 }
739 }
740
741 if ( myCycle != fCycle ) {
742 if (gDebug > 0)
743 Info("GetUnzipBuffer", "Sudden paging Break!!! fNseek: %d, fIsLearning:%d",
745
746 seekidx = -1;
747 break;
748 }
749 }
750
751 } while (fUnzipState.IsProgress(seekidx));
752
753 // Here the block is not pending. It could be done or aborted or not yet being processed.
754 if ( (seekidx >= 0) && (fUnzipState.IsUnzipped(seekidx)) ) {
755 if(!(*buf)) {
756 *buf = fUnzipState.fUnzipChunks[seekidx].get();
757 fUnzipState.fUnzipChunks[seekidx].release();
758 *free = kTRUE;
759 } else {
760 memcpy(*buf, fUnzipState.fUnzipChunks[seekidx].get(), fUnzipState.fUnzipLen[seekidx]);
761 fUnzipState.fUnzipChunks[seekidx].reset();
762 *free = kFALSE;
763 }
764
765 fNStalls++;
766 return fUnzipState.fUnzipLen[seekidx];
767 } else {
768 // This is a complete miss. We want to avoid the background tasks
769 // to try unzipping this block in the future.
770 fUnzipState.SetMissed(seekidx);
771 }
772 } else {
773 loc = -1;
775 }
776 }
777
778 if (len > fCompBufferSize) {
779 if(fCompBuffer) delete [] fCompBuffer;
780 fCompBuffer = new char[len];
781 fCompBufferSize = len;
782 } else {
783 if (fCompBufferSize > len * 4) {
784 if(fCompBuffer) delete [] fCompBuffer;
785 fCompBuffer = new char[len*2];
786 fCompBufferSize = len * 2;
787 }
788 }
789
790 res = 0;
791 if (!ReadBufferExt(fCompBuffer, pos, len, loc)) {
792 // Cache is invalidated and we need to wait for all unzipping tasks to be finished before fill new baskets in cache.
793#ifdef R__USE_IMT
795 fUnzipTaskGroup->Cancel();
796 fUnzipTaskGroup.reset();
797 }
798#endif
799 {
800 // Fill new baskets into cache.
801 R__LOCKGUARD(fIOMutex.get());
802 fFile->Seek(pos);
803 res = fFile->ReadBuffer(fCompBuffer, len);
804 } // end of lock scope
805#ifdef R__USE_IMT
807 CreateTasks();
808 }
809#endif
810 }
811
812 if (res) res = -1;
813
814 if (!res) {
815 res = UnzipBuffer(buf, fCompBuffer);
816 *free = kTRUE;
817 }
818
819 if (!fIsLearning) {
820 fNMissed++;
821 }
822
823 return res;
824}
825
826////////////////////////////////////////////////////////////////////////////////
827/// static function: Sets the unzip relative buffer size
828
830{
831 fgRelBuffSize = relbufferSize;
832}
833
834////////////////////////////////////////////////////////////////////////////////
835/// Sets the size for the unzipping cache... by default it should be
836/// two times the size of the prefetching cache
837
839{
840 fUnzipBufferSize = bufferSize;
841}
842
843////////////////////////////////////////////////////////////////////////////////
844/// Unzips a ROOT specific buffer... by reading the header at the beginning.
845/// returns the size of the inflated buffer or -1 if error
846/// Note!! : If *dest == 0 we will allocate the buffer and it will be the
847/// responsibility of the caller to free it... it is useful for example
848/// to pass it to the creator of TBuffer
849/// src is the original buffer with the record (header+compressed data)
850/// *dest is the inflated buffer (including the header)
851
853{
854 Int_t uzlen = 0;
855 Bool_t alloc = kFALSE;
856
857 // Here we read the header of the buffer
858 const Int_t hlen = 128;
859 Int_t nbytes = 0, objlen = 0, keylen = 0;
860 GetRecordHeader(src, hlen, nbytes, objlen, keylen);
861
862 if (!(*dest)) {
863 /* early consistency check */
864 UChar_t *bufcur = (UChar_t *) (src + keylen);
865 Int_t nin, nbuf;
866 if(objlen > nbytes - keylen && R__unzip_header(&nin, bufcur, &nbuf) != 0) {
867 Error("UnzipBuffer", "Inconsistency found in header (nin=%d, nbuf=%d)", nin, nbuf);
868 uzlen = -1;
869 return uzlen;
870 }
871 Int_t l = keylen + objlen;
872 *dest = new char[l];
873 alloc = kTRUE;
874 }
875 // Must unzip the buffer
876 // fSeekPos[ind]; adress of zipped buffer
877 // fSeekLen[ind]; len of the zipped buffer
878 // &fBuffer[fSeekPos[ind]]; memory address
879
880 // This is similar to TBasket::ReadBasketBuffers
881 Bool_t oldCase = objlen == nbytes - keylen
882 && ((TBranch*)fBranches->UncheckedAt(0))->GetCompressionLevel() != 0
883 && fFile->GetVersion() <= 30401;
884
885 if (objlen > nbytes-keylen || oldCase) {
886
887 // Copy the key
888 memcpy(*dest, src, keylen);
889 uzlen += keylen;
890
891 char *objbuf = *dest + keylen;
892 UChar_t *bufcur = (UChar_t *) (src + keylen);
893 Int_t nin, nbuf;
894 Int_t nout = 0;
895 Int_t noutot = 0;
896
897 while (1) {
898 Int_t hc = R__unzip_header(&nin, bufcur, &nbuf);
899 if (hc != 0) break;
900 if (gDebug > 2)
901 Info("UnzipBuffer", " nin:%d, nbuf:%d, bufcur[3] :%d, bufcur[4] :%d, bufcur[5] :%d ",
902 nin, nbuf, bufcur[3], bufcur[4], bufcur[5]);
903 if (oldCase && (nin > objlen || nbuf > objlen)) {
904 if (gDebug > 2)
905 Info("UnzipBuffer", "oldcase objlen :%d ", objlen);
906
907 //buffer was very likely not compressed in an old version
908 memcpy(*dest + keylen, src + keylen, objlen);
909 uzlen += objlen;
910 return uzlen;
911 }
912
913 R__unzip(&nin, bufcur, &nbuf, objbuf, &nout);
914
915 if (gDebug > 2)
916 Info("UnzipBuffer", "R__unzip nin:%d, bufcur:%p, nbuf:%d, objbuf:%p, nout:%d",
917 nin, bufcur, nbuf, objbuf, nout);
918
919 if (!nout) break;
920 noutot += nout;
921 if (noutot >= objlen) break;
922 bufcur += nin;
923 objbuf += nout;
924 }
925
926 if (noutot != objlen) {
927 Error("UnzipBuffer", "nbytes = %d, keylen = %d, objlen = %d, noutot = %d, nout=%d, nin=%d, nbuf=%d",
928 nbytes,keylen,objlen, noutot,nout,nin,nbuf);
929 uzlen = -1;
930 if(alloc) delete [] *dest;
931 *dest = 0;
932 return uzlen;
933 }
934 uzlen += objlen;
935 } else {
936 memcpy(*dest, src, keylen);
937 uzlen += keylen;
938 memcpy(*dest + keylen, src + keylen, objlen);
939 uzlen += objlen;
940 }
941 return uzlen;
942}
943
944////////////////////////////////////////////////////////////////////////////////
945
946void TTreeCacheUnzip::Print(Option_t* option) const {
947
948 printf("******TreeCacheUnzip statistics for file: %s ******\n",fFile->GetName());
949 printf("Max allowed mem for pending buffers: %lld\n", fUnzipBufferSize);
950 printf("Number of blocks unzipped by threads: %d\n", fNUnzip);
951 printf("Number of hits: %d\n", fNFound);
952 printf("Number of stalls: %d\n", fNStalls);
953 printf("Number of misses: %d\n", fNMissed);
954
955 TTreeCache::Print(option);
956}
957
958////////////////////////////////////////////////////////////////////////////////
959
961 R__LOCKGUARD(fIOMutex.get());
962 return TTreeCache::ReadBufferExt(buf, pos, len, loc);
963}
void frombuf(char *&buf, Bool_t *x)
Definition Bytes.h:278
#define b(i)
Definition RSha256.hxx:100
unsigned char Byte_t
Definition RtypesCore.h:64
int Int_t
Definition RtypesCore.h:45
short Version_t
Definition RtypesCore.h:65
unsigned char UChar_t
Definition RtypesCore.h:38
const Bool_t kFALSE
Definition RtypesCore.h:92
short Short_t
Definition RtypesCore.h:39
double Double_t
Definition RtypesCore.h:59
long long Long64_t
Definition RtypesCore.h:73
float Float_t
Definition RtypesCore.h:57
const Bool_t kTRUE
Definition RtypesCore.h:91
const char Option_t
Definition RtypesCore.h:66
#define ClassImp(name)
Definition Rtypes.h:364
R__EXTERN TEnv * gEnv
Definition TEnv.h:171
Int_t gDebug
Definition TROOT.cxx:590
void R__unzip(Int_t *nin, UChar_t *bufin, Int_t *lout, char *bufout, Int_t *nout)
int R__unzip_header(Int_t *nin, UChar_t *bufin, Int_t *lout)
#define R__LOCKGUARD(mutex)
#define free
Definition civetweb.c:1539
A class to manage the asynchronous execution of work items.
This class provides a simple interface to execute the same task multiple times in parallel threads,...
void Foreach(F func, unsigned nTimes, unsigned nChunks=0)
Execute a function without arguments several times in parallel, dividing the execution in nChunks.
A TTree is a list of TBranches.
Definition TBranch.h:89
A chain is a collection of files containing TTree objects.
Definition TChain.h:33
virtual Int_t GetTreeNumber() const
Definition TChain.h:116
Long64_t * GetTreeOffset() const
Definition TChain.h:117
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
Definition TEnv.cxx:491
A TEventList object is a list of selected events (entries) in a TTree.
Definition TEventList.h:31
virtual Bool_t ContainsRange(Long64_t entrymin, Long64_t entrymax)
Return TRUE if list contains entries from entrymin to entrymax included.
Int_t * fSeekIndex
[fNseek] sorted index table of fSeek
virtual Int_t ReadBufferExt(char *buf, Long64_t pos, Int_t len, Int_t &loc)
Long64_t * fSeekSort
[fNseek] Position on file of buffers to be prefetched (sorted)
Int_t * fSeekLen
[fNseek] Length of buffers to be prefetched
Int_t fNtot
Total size of prefetched blocks.
virtual void Prefetch(Long64_t pos, Int_t len)
Add block of length len at position pos in the list of blocks to be prefetched.
Long64_t * fSeek
[fNseek] Position on file of buffers to be prefetched
Bool_t fIsTransferred
True when fBuffer contains something valid.
TFile * fFile
Pointer to file.
Int_t fNseek
Number of blocks to be prefetched.
virtual Int_t GetBufferSize() const
virtual void Seek(Long64_t offset, ERelativeTo pos=kBeg)
Seek to a specific position in the file. Pos it either kBeg, kCur or kEnd.
Definition TFile.cxx:2190
Int_t GetVersion() const
Definition TFile.h:237
virtual Bool_t ReadBufferAsync(Long64_t offs, Int_t len)
Definition TFile.cxx:5099
virtual Bool_t ReadBuffer(char *buf, Int_t len)
Read a buffer from the file.
Definition TFile.cxx:1686
virtual const char * GetName() const
Returns name of object.
Definition TNamed.h:47
TObject * UncheckedAt(Int_t i) const
Definition TObjArray.h:90
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition TObject.cxx:879
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition TObject.cxx:893
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition TObject.cxx:867
A TTreeCache which exploits parallelized decompression of its own content.
virtual void StopLearningPhase()
It's the same as TTreeCache::StopLearningPhase but we guarantee that we start the unzipping just afte...
UnzipState_t fUnzipState
Bool_t FillBuffer()
Fill the cache buffer with the branches in the cache.
void Init()
Initialization procedure common to all the constructors.
Int_t UnzipCache(Int_t index)
This inflates a basket in the cache.
Int_t fNMissed
! number of blocks that were not found in the cache and were unzipped
void SetEntryRange(Long64_t emin, Long64_t emax)
Set the minimum and maximum entry number to be processed this information helps to optimize the numbe...
virtual Int_t ReadBufferExt(char *buf, Long64_t pos, Int_t len, Int_t &loc)
Int_t fNseekMax
! fNseek can change so we need to know its max size
virtual ~TTreeCacheUnzip()
Destructor. (in general called by the TFile destructor)
void UpdateBranches(TTree *tree)
update pointer to current Tree and recompute pointers to the branches in the cache
Int_t fNStalls
! number of hits which caused a stall
static TTreeCacheUnzip::EParUnzipMode fgParallel
Indicate if we want to activate the parallelism.
std::unique_ptr< ROOT::Experimental::TTaskGroup > fUnzipTaskGroup
static Int_t SetParallelUnzip(TTreeCacheUnzip::EParUnzipMode option=TTreeCacheUnzip::kEnable)
Static function that (de)activates multithreading unzipping.
Int_t UnzipBuffer(char **dest, char *src)
Unzips a ROOT specific buffer... by reading the header at the beginning.
Int_t GetRecordHeader(char *buf, Int_t maxbytes, Int_t &nbytes, Int_t &objlen, Int_t &keylen)
Read the logical record header from the buffer buf.
static void SetUnzipRelBufferSize(Float_t relbufferSize)
static function: Sets the unzip relative buffer size
void Print(Option_t *option="") const
Print cache statistics.
std::unique_ptr< TMutex > fIOMutex
Int_t fNUnzip
! number of blocks that were unzipped
Int_t CreateTasks()
We create a TTaskGroup and asynchronously maps each group of baskets(> 100 kB in total) to a task.
Bool_t fParallel
Indicate if we want to activate the parallelism (for this instance)
Long64_t fUnzipBufferSize
! Max Size for the ready unzipped blocks (default is 2*fBufferSize)
static Bool_t IsParallelUnzip()
Static function that tells wether the multithreading unzipping is activated.
void SetUnzipBufferSize(Long64_t bufferSize)
Sets the size for the unzipping cache... by default it should be two times the size of the prefetchin...
Int_t fUnzipGroupSize
! Min accumulated size of a group of baskets ready to be unzipped by a IMT task
virtual Int_t SetBufferSize(Int_t buffersize)
Change the underlying buffer size of the cache.
Int_t fNFound
! number of blocks that were found in the cache
virtual Int_t GetUnzipBuffer(char **buf, Long64_t pos, Int_t len, Bool_t *free)
We try to read a buffer that has already been unzipped Returns -1 in case of read failure,...
virtual Int_t AddBranch(TBranch *b, Bool_t subbranches=kFALSE)
Add a branch to the list of branches to be stored in the cache this function is called by TBranch::Ge...
static Double_t fgRelBuffSize
This is the percentage of the TTreeCacheUnzip that will be used.
virtual void ResetCache()
This will delete the list of buffers that are in the unzipping cache and will reset certain values in...
static EParUnzipMode GetParallelUnzip()
Static function that returns the parallel option (to indicate an additional thread)
A cache to speed-up the reading of ROOT datasets.
Definition TTreeCache.h:32
virtual Int_t AddBranch(TBranch *b, Bool_t subgbranches=kFALSE)
Add a branch to the list of branches to be stored in the cache this function is called by the user vi...
virtual void UpdateBranches(TTree *tree)
Update pointer to current Tree and recompute pointers to the branches in the cache.
virtual Int_t SetBufferSize(Int_t buffersize)
Change the underlying buffer size of the cache.
Long64_t fEntryMin
! first entry in the cache
Definition TTreeCache.h:38
Long64_t fEntryNext
! next entry number where cache must be filled
Definition TTreeCache.h:41
Bool_t fIsLearning
! true if cache is in learning mode
Definition TTreeCache.h:54
TTree * GetTree() const
Definition TTreeCache.h:149
virtual void SetEntryRange(Long64_t emin, Long64_t emax)
Set the minimum and maximum entry number to be processed this information helps to optimize the numbe...
Long64_t fEntryMax
! last entry in the cache
Definition TTreeCache.h:39
Long64_t fEntryCurrent
! current lowest entry number in the cache
Definition TTreeCache.h:40
Int_t fNReadPref
Number of blocks that were prefetched.
Definition TTreeCache.h:49
TTree * fTree
! pointer to the current Tree
Definition TTreeCache.h:53
virtual void StopLearningPhase()
This is the counterpart of StartLearningPhase() and can be used to stop the learning phase.
Int_t fNbranches
! Number of branches in the cache
Definition TTreeCache.h:44
virtual void Print(Option_t *option="") const
Print cache statistics.
TObjArray * fBranches
! List of branches to be stored in the cache
Definition TTreeCache.h:51
Helper class to iterate over cluster of baskets.
Definition TTree.h:267
Long64_t GetNextEntry()
Definition TTree.h:304
A TTree represents a columnar dataset.
Definition TTree.h:79
TEventList * GetEventList() const
Definition TTree.h:470
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
Definition TROOT.cxx:556
Long64_t BinarySearch(Long64_t n, const T *array, T value)
Definition TMathBase.h:278
Definition tree.py:1
Bool_t IsFinished(Int_t index) const
void Reset(Int_t oldSize, Int_t newSize)
Reset all baskets' state arrays.
void Clear(Int_t size)
Clear all baskets' state arrays.
Bool_t IsProgress(Int_t index) const
std::atomic< Byte_t > * fUnzipStatus
! [fNSeek]
void SetUnzipped(Int_t index, char *buf, Int_t len)
std::vector< Int_t > fUnzipLen
! [fNseek] Length of the unzipped buffers
Bool_t TryUnzipping(Int_t index)
Start unzipping the basket if it is untouched yet.
Bool_t IsUnzipped(Int_t index) const
Check if the basket is unzipped already.
Bool_t IsUntouched(Int_t index) const
void SetFinished(Int_t index)
Set cache as finished.
std::unique_ptr< char[]> * fUnzipChunks
! [fNseek] Individual unzipped chunks. Their summed size is kept under control.
auto * l
Definition textangle.C:4
#define dest(otri, vertexptr)
Definition triangle.c:1040