Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
TTreeCacheUnzip.cxx
Go to the documentation of this file.
1// Authors: Rene Brun 04/06/2006
2// Leandro Franco 10/04/2008
3// Fabrizio Furano (CERN) Aug 2009
4
5/*************************************************************************
6 * Copyright (C) 1995-2018, Rene Brun and Fons Rademakers. *
7 * All rights reserved. *
8 * *
9 * For the licensing terms see $ROOTSYS/LICENSE. *
10 * For the list of contributors see $ROOTSYS/README/CREDITS. *
11 *************************************************************************/
12
13/**
14\class TTreeCacheUnzip
15\ingroup tree
16
17A TTreeCache which exploits parallelized decompression of its own content.
18
19*/
20
21#include "TTreeCacheUnzip.h"
22#include "TBranch.h"
23#include "TChain.h"
24#include "TEnv.h"
25#include "TEventList.h"
26#include "TFile.h"
27#include "TMath.h"
28#include "TROOT.h"
29#include "TMutex.h"
30
31#ifdef R__USE_IMT
33#include "ROOT/TTaskGroup.hxx"
34#endif
35
36#include <memory>
37
38extern "C" void R__unzip(Int_t *nin, UChar_t *bufin, Int_t *lout, char *bufout, Int_t *nout);
39extern "C" int R__unzip_header(Int_t *nin, UChar_t *bufin, Int_t *lout);
40
42
43// The unzip cache does not consume memory by itself, it just allocates in advance
44// mem blocks which are then picked as they are by the baskets.
45// Hence there is no good reason to limit it too much
47
49
50////////////////////////////////////////////////////////////////////////////////
51/// Clear all baskets' state arrays.
52
54 for (Int_t i = 0; i < size; i++) {
55 if (!fUnzipLen.empty()) fUnzipLen[i] = 0;
56 if (fUnzipChunks) {
57 if (fUnzipChunks[i]) fUnzipChunks[i].reset();
58 }
59 if (fUnzipStatus) fUnzipStatus[i].store(0);
60 }
61}
62
63////////////////////////////////////////////////////////////////////////////////
64
66 return fUnzipStatus[index].load() == kUntouched;
67}
68
69////////////////////////////////////////////////////////////////////////////////
70
72 return fUnzipStatus[index].load() == kProgress;
73}
74
75////////////////////////////////////////////////////////////////////////////////
76
78 return fUnzipStatus[index].load() == kFinished;
79}
80
81////////////////////////////////////////////////////////////////////////////////
82/// Check if the basket is unzipped already. We must make sure the length in
83/// fUnzipLen is larger than 0.
84
86 return (fUnzipStatus[index].load() == kFinished) && (fUnzipChunks[index].get()) && (fUnzipLen[index] > 0);
87}
88
89////////////////////////////////////////////////////////////////////////////////
90/// Reset all baskets' state arrays. This function is only called by main
91/// thread and parallel processing from upper layers should be disabled such
92/// as IMT in TTree::GetEntry(). Other threads should not call this function
93/// since it is not thread-safe.
94
96 std::vector<Int_t> aUnzipLen = std::vector<Int_t>(newSize, 0);
97 std::unique_ptr<char[]> *aUnzipChunks = new std::unique_ptr<char[]>[newSize];
98 std::atomic<Byte_t> *aUnzipStatus = new std::atomic<Byte_t>[newSize];
99
100 for (Int_t i = 0; i < newSize; ++i)
101 aUnzipStatus[i].store(0);
102
103 for (Int_t i = 0; i < oldSize; i++) {
104 aUnzipLen[i] = fUnzipLen[i];
105 aUnzipChunks[i] = std::move(fUnzipChunks[i]);
106 aUnzipStatus[i].store(fUnzipStatus[i].load());
107 }
108
109 if (fUnzipChunks) delete [] fUnzipChunks;
110 if (fUnzipStatus) delete [] fUnzipStatus;
111
112 fUnzipLen = aUnzipLen;
113 fUnzipChunks = aUnzipChunks;
114 fUnzipStatus = aUnzipStatus;
115}
116
117////////////////////////////////////////////////////////////////////////////////
118/// Set cache as finished.
119/// There are three scenarios that a basket is set as finished:
120/// 1. The basket has already been unzipped.
121/// 2. The thread is aborted from unzipping process.
122/// 3. To avoid other tasks/threads work on this basket,
123/// main thread marks the basket as finished and designates itself
124/// to unzip this basket.
125
127 fUnzipLen[index] = 0;
128 fUnzipChunks[index].reset();
129 fUnzipStatus[index].store((Byte_t)kFinished);
130}
131
132////////////////////////////////////////////////////////////////////////////////
133
135 fUnzipChunks[index].reset();
136 fUnzipStatus[index].store((Byte_t)kFinished);
137}
138
139////////////////////////////////////////////////////////////////////////////////
140
142 // Update status array at the very end because we need to be synchronous with the main thread.
143 fUnzipLen[index] = len;
144 fUnzipChunks[index].reset(buf);
145 fUnzipStatus[index].store((Byte_t)kFinished);
146}
147
148////////////////////////////////////////////////////////////////////////////////
149/// Start unzipping the basket if it is untouched yet.
150
152 Byte_t oldValue = kUntouched;
153 Byte_t newValue = kProgress;
154 return fUnzipStatus[index].compare_exchange_weak(oldValue, newValue, std::memory_order_release, std::memory_order_relaxed);
155}
156
157////////////////////////////////////////////////////////////////////////////////
158
160 fAsyncReading(false),
161 fEmpty(true),
162 fCycle(0),
163 fNseekMax(0),
166 fNFound(0),
167 fNMissed(0),
168 fNStalls(0),
169 fNUnzip(0)
170{
171 // Default Constructor.
172 Init();
173}
174
175////////////////////////////////////////////////////////////////////////////////
176/// Constructor.
177
178TTreeCacheUnzip::TTreeCacheUnzip(TTree *tree, Int_t buffersize) : TTreeCache(tree,buffersize),
179 fAsyncReading(false),
180 fEmpty(true),
181 fCycle(0),
182 fNseekMax(0),
183 fUnzipGroupSize(0),
184 fUnzipBufferSize(0),
185 fNFound(0),
186 fNMissed(0),
187 fNStalls(0),
188 fNUnzip(0)
189{
190 Init();
191}
192
193////////////////////////////////////////////////////////////////////////////////
194/// Initialization procedure common to all the constructors.
195
197{
198#ifdef R__USE_IMT
199 fUnzipTaskGroup.reset();
200#endif
201 fIOMutex = std::make_unique<TMutex>(true);
202
203 fCompBuffer = new char[16384];
204 fCompBufferSize = 16384;
205
206 fUnzipGroupSize = 102400; // Each task unzips at least 100 KB
207
208 if (fgParallel == kDisable) {
209 fParallel = false;
210 }
211 else if(fgParallel == kEnable || fgParallel == kForce) {
213
214 if(gDebug > 0)
215 Info("TTreeCacheUnzip", "Enabling Parallel Unzipping");
216
217 fParallel = true;
218
219 }
220 else {
221 Warning("TTreeCacheUnzip", "Parallel Option unknown");
222 }
223
224 // Check if asynchronous reading is supported by this TFile specialization
225 if (gEnv->GetValue("TFile.AsyncReading", 1)) {
226 if (fFile && !(fFile->ReadBufferAsync(0, 0)))
227 fAsyncReading = true;
228 }
229
230}
231
232////////////////////////////////////////////////////////////////////////////////
233/// Destructor. (in general called by the TFile destructor)
234
236{
237 ResetCache();
239}
240
241////////////////////////////////////////////////////////////////////////////////
242/// Add a branch to the list of branches to be stored in the cache
243/// this function is called by TBranch::GetBasket
244/// Returns:
245/// - 0 branch added or already included
246/// - -1 on error
247
248Int_t TTreeCacheUnzip::AddBranch(TBranch *b, bool subbranches /*= false*/)
249{
250 return TTreeCache::AddBranch(b, subbranches);
251}
252
253////////////////////////////////////////////////////////////////////////////////
254/// Add a branch to the list of branches to be stored in the cache
255/// this function is called by TBranch::GetBasket
256/// Returns:
257/// - 0 branch added or already included
258/// - -1 on error
259
260Int_t TTreeCacheUnzip::AddBranch(const char *branch, bool subbranches /*= false*/)
261{
262 return TTreeCache::AddBranch(branch, subbranches);
263}
264
265////////////////////////////////////////////////////////////////////////////////
266
268{
269
270 if (fNbranches <= 0) return false;
271
272 // Fill the cache buffer with the branches in the cache.
273 fIsTransferred = false;
274
275 TTree *tree = ((TBranch*)fBranches->UncheckedAt(0))->GetTree();
276 Long64_t entry = tree->GetReadEntry();
277
278 // If the entry is in the range we previously prefetched, there is
279 // no point in retrying. Note that this will also return false
280 // during the training phase (fEntryNext is then set intentional to
281 // the end of the training phase).
282 if (fEntryCurrent <= entry && entry < fEntryNext) return false;
283
284 // Triggered by the user, not the learning phase
285 if (entry == -1) entry = 0;
286
287 TTree::TClusterIterator clusterIter = tree->GetClusterIterator(entry);
288 fEntryCurrent = clusterIter();
289 fEntryNext = clusterIter.GetNextEntry();
290
292 if (fEntryMax <= 0) fEntryMax = tree->GetEntries();
294
295 // Check if owner has a TEventList set. If yes we optimize for this
296 // Special case reading only the baskets containing entries in the
297 // list.
298 TEventList *elist = fTree->GetEventList();
299 Long64_t chainOffset = 0;
300 if (elist) {
301 if (fTree->IsA() ==TChain::Class()) {
302 TChain *chain = (TChain*)fTree;
303 Int_t t = chain->GetTreeNumber();
304 chainOffset = chain->GetTreeOffset()[t];
305 }
306 }
307
308 //clear cache buffer
310
311 //store baskets
312 for (Int_t i = 0; i < fNbranches; i++) {
314 if (b->GetDirectory() == nullptr) continue;
315 if (b->GetDirectory()->GetFile() != fFile) continue;
316 Int_t nb = b->GetMaxBaskets();
317 Int_t *lbaskets = b->GetBasketBytes();
318 Long64_t *entries = b->GetBasketEntry();
319 if (!lbaskets || !entries) continue;
320 //we have found the branch. We now register all its baskets
321 //from the requested offset to the basket below fEntrymax
322 Int_t blistsize = b->GetListOfBaskets()->GetSize();
323 for (Int_t j=0;j<nb;j++) {
324 // This basket has already been read, skip it
325 if (j<blistsize && b->GetListOfBaskets()->UncheckedAt(j)) continue;
326
327 Long64_t pos = b->GetBasketSeek(j);
328 Int_t len = lbaskets[j];
329 if (pos <= 0 || len <= 0) continue;
330 //important: do not try to read fEntryNext, otherwise you jump to the next autoflush
331 if (entries[j] >= fEntryNext) continue;
332 if (entries[j] < entry && (j < nb - 1 && entries[j+1] <= entry)) continue;
333 if (elist) {
334 Long64_t emax = fEntryMax;
335 if (j < nb - 1) emax = entries[j+1] - 1;
336 if (!elist->ContainsRange(entries[j] + chainOffset, emax + chainOffset)) continue;
337 }
338 fNReadPref++;
339
341 }
342 if (gDebug > 0) printf("Entry: %lld, registering baskets branch %s, fEntryNext=%lld, fNseek=%d, fNtot=%d\n", entry, ((TBranch*)fBranches->UncheckedAt(i))->GetName(), fEntryNext, fNseek, fNtot);
343 }
344
345 // Now fix the size of the status arrays
346 ResetCache();
347 fIsLearning = false;
348
349 return true;
350}
351
352////////////////////////////////////////////////////////////////////////////////
353/// Change the underlying buffer size of the cache.
354/// The buffersize might be clamped, see TFileCacheRead::SetBufferSize
355/// Returns:
356/// - 0 if the buffer content is still available
357/// - 1 if some or all of the buffer content has been made unavailable
358/// - -1 on error
359
361{
362 Int_t res = TTreeCache::SetBufferSize(buffersize);
363 if (res < 0) {
364 return res;
365 }
367 ResetCache();
368 return 1;
369}
370
371////////////////////////////////////////////////////////////////////////////////
372/// Set the minimum and maximum entry number to be processed
373/// this information helps to optimize the number of baskets to read
374/// when prefetching the branch buffers.
375
377{
378 TTreeCache::SetEntryRange(emin, emax);
379}
380
381////////////////////////////////////////////////////////////////////////////////
382/// It's the same as TTreeCache::StopLearningPhase but we guarantee that
383/// we start the unzipping just after getting the buffers
384
386{
388}
389
390////////////////////////////////////////////////////////////////////////////////
391///update pointer to current Tree and recompute pointers to the branches in the cache
392
394{
396}
397
398////////////////////////////////////////////////////////////////////////////////
399// //
400// From now on we have the methods concerning the threading part of the cache //
401// //
402////////////////////////////////////////////////////////////////////////////////
403
404////////////////////////////////////////////////////////////////////////////////
405/// Static function that returns the parallel option
406/// (to indicate an additional thread)
407
409{
410 return fgParallel;
411}
412
413////////////////////////////////////////////////////////////////////////////////
414/// Static function that tells wether the multithreading unzipping is activated
415
417{
418 if (fgParallel == kEnable || fgParallel == kForce)
419 return true;
420
421 return false;
422}
423
424////////////////////////////////////////////////////////////////////////////////
425/// Static function that (de)activates multithreading unzipping
426///
427/// The possible options are:
428/// - kEnable _Enable_ it, which causes an automatic detection and launches the
429/// additional thread if the number of cores in the machine is greater than
430/// one
431/// - kDisable _Disable_ will not activate the additional thread.
432/// - kForce _Force_ will start the additional thread even if there is only one
433/// core. the default will be taken as kEnable.
434///
435/// Returns 0 if there was an error, 1 otherwise.
436
438{
441 return 1;
442 }
443 return 0;
444}
445
446////////////////////////////////////////////////////////////////////////////////
447// //
448// From now on we have the methods concerning the unzipping part of the cache //
449// //
450////////////////////////////////////////////////////////////////////////////////
451
452////////////////////////////////////////////////////////////////////////////////
453/// Read the logical record header from the buffer buf.
454/// That must be the pointer tho the header part not the object by itself and
455/// must contain data of at least maxbytes
456/// Returns nread;
457///
458/// In output arguments:
459///
460/// - nbytes : number of bytes in record
461/// if negative, this is a deleted record
462/// if 0, cannot read record, wrong value of argument first
463/// - objlen : uncompressed object size
464/// - keylen : length of logical record header
465///
466/// Note that the arguments objlen and keylen are returned only
467/// if maxbytes >=16
468/// Note: This was adapted from TFile... so some things dont apply
469
470Int_t TTreeCacheUnzip::GetRecordHeader(char *buf, Int_t maxbytes, Int_t &nbytes, Int_t &objlen, Int_t &keylen)
471{
472 Version_t versionkey;
473 Short_t klen;
474 UInt_t datime;
475 Int_t nb = 0, olen;
476 Int_t nread = maxbytes;
477 frombuf(buf, &nb);
478 nbytes = nb;
479 if (nb < 0) return nread;
480 // const Int_t headerSize = Int_t(sizeof(nb) +sizeof(versionkey) +sizeof(olen) +sizeof(datime) +sizeof(klen));
481 const Int_t headerSize = 16;
482 if (nread < headerSize) return nread;
483 frombuf(buf, &versionkey);
484 frombuf(buf, &olen);
485 frombuf(buf, &datime);
486 frombuf(buf, &klen);
487 if (!olen) olen = nbytes - klen;
488 objlen = olen;
489 keylen = klen;
490 return nread;
491}
492
493////////////////////////////////////////////////////////////////////////////////
494/// This will delete the list of buffers that are in the unzipping cache
495/// and will reset certain values in the cache.
496/// This name is ambiguos because the method doesn't reset the whole cache,
497/// only the part related to the unzipping
498/// Note: This method is completely different from TTreeCache::ResetCache(),
499/// in that method we were cleaning the prefetching buffer while here we
500/// delete the information about the unzipped buffers
501
503{
504 // Reset all the lists and wipe all the chunks
505 fCycle++;
507
508 if(fNseekMax < fNseek){
509 if (gDebug > 0)
510 Info("ResetCache", "Changing fNseekMax from:%d to:%d", fNseekMax, fNseek);
511
514 }
515 fEmpty = true;
516}
517
518////////////////////////////////////////////////////////////////////////////////
519/// This inflates a basket in the cache.. passing the data to a new
520/// buffer that will only wait there to be read...
521/// This function is responsible to update corresponding elements in
522/// fUnzipStatus, fUnzipChunks and fUnzipLen. Since we use atomic variables
523/// in fUnzipStatus to exclusively unzip the basket, we must update
524/// fUnzipStatus after fUnzipChunks and fUnzipLen and make sure fUnzipChunks
525/// and fUnzipLen are ready before main thread fetch the data.
526
528{
529 Int_t myCycle;
530 const Int_t hlen = 128;
531 Int_t objlen = 0, keylen = 0;
532 Int_t nbytes = 0;
533 Int_t readbuf = 0;
534
535 Long64_t rdoffs = 0;
536 Int_t rdlen = 0;
537
538 // To synchronize with the 'paging'
539 myCycle = fCycle;
540 rdoffs = fSeek[index];
541 rdlen = fSeekLen[index];
542
543 Int_t loc = -1;
544 if (!fNseek || fIsLearning) {
545 return 1;
546 }
547
548 if ((myCycle != fCycle) || !fIsTransferred) {
549 fUnzipState.SetFinished(index); // Set it as not done, main thread will take charge
550 return 1;
551 }
552
553 // Prepare a memory buffer of adequate size
554 char* locbuff = nullptr;
555 if (rdlen > 16384) {
556 locbuff = new char[rdlen];
557 } else if (rdlen * 3 < 16384) {
558 locbuff = new char[rdlen * 2];
559 } else {
560 locbuff = new char[16384];
561 }
562
563 readbuf = ReadBufferExt(locbuff, rdoffs, rdlen, loc);
564
565 if (readbuf <= 0) {
566 fUnzipState.SetFinished(index); // Set it as not done, main thread will take charge
567 if (locbuff) delete [] locbuff;
568 return -1;
569 }
570
571 GetRecordHeader(locbuff, hlen, nbytes, objlen, keylen);
572
573 Int_t len = (objlen > nbytes - keylen) ? keylen + objlen : nbytes;
574 // If the single unzipped chunk is really too big, reset it to not processable
575 // I.e. mark it as done but set the pointer to 0
576 // This block will be unzipped synchronously in the main thread
577 // TODO: ROOT internally breaks zipped buffers into 16MB blocks, we can probably still unzip in parallel.
578 if (len > 4 * fUnzipBufferSize) {
579 if (gDebug > 0)
580 Info("UnzipCache", "Block %d is too big, skipping.", index);
581
582 fUnzipState.SetFinished(index); // Set it as not done, main thread will take charge
583 if (locbuff) delete [] locbuff;
584 return 0;
585 }
586
587 // Unzip it into a new blk
588 char *ptr = nullptr;
589 Int_t loclen = UnzipBuffer(&ptr, locbuff);
590 if ((loclen > 0) && (loclen == objlen + keylen)) {
591 if ((myCycle != fCycle) || !fIsTransferred) {
592 fUnzipState.SetFinished(index); // Set it as not done, main thread will take charge
593 if (locbuff) delete [] locbuff;
594 delete [] ptr;
595 return 1;
596 }
597 fUnzipState.SetUnzipped(index, ptr, loclen); // Set it as done
598 fNUnzip++;
599 } else {
600 fUnzipState.SetFinished(index); // Set it as not done, main thread will take charge
601 delete [] ptr;
602 }
603
604 if (locbuff) delete [] locbuff;
605 return 0;
606}
607
608#ifdef R__USE_IMT
609////////////////////////////////////////////////////////////////////////////////
610/// We create a TTaskGroup and asynchronously maps each group of baskets(> 100 kB in total)
611/// to a task. In TTaskGroup, we use TThreadExecutor to do the actually work of unzipping
612/// a group of basket. The purpose of creating TTaskGroup is to avoid competing with main thread.
613
615{
616 auto mapFunction = [&]() {
617 auto unzipFunction = [&](const std::vector<Int_t> &indices) {
618 // If cache is invalidated and we should return immediately.
619 if (!fIsTransferred) return nullptr;
620
621 for (auto ii : indices) {
622 if(fUnzipState.TryUnzipping(ii)) {
623 Int_t res = UnzipCache(ii);
624 if(res)
625 if (gDebug > 0)
626 Info("UnzipCache", "Unzipping failed or cache is in learning state");
627 }
628 }
629 return nullptr;
630 };
631
632 Int_t accusz = 0;
633 std::vector<std::vector<Int_t>> basketIndices;
634 std::vector<Int_t> indices;
635 if (fUnzipGroupSize <= 0) fUnzipGroupSize = 102400;
636 for (Int_t i = 0; i < fNseek; i++) {
637 while (accusz < fUnzipGroupSize) {
638 accusz += fSeekLen[i];
639 indices.push_back(i);
640 i++;
641 if (i >= fNseek) break;
642 }
643 if (i < fNseek) i--;
644 basketIndices.push_back(indices);
645 indices.clear();
646 accusz = 0;
647 }
649 pool.Foreach(unzipFunction, basketIndices);
650 };
651
652 fUnzipTaskGroup = std::make_unique<ROOT::Experimental::TTaskGroup>();
653 fUnzipTaskGroup->Run(mapFunction);
654
655 return 0;
656}
657#endif
658
659////////////////////////////////////////////////////////////////////////////////
660/// We try to read a buffer that has already been unzipped
661/// Returns -1 in case of read failure, 0 in case it's not in the
662/// cache and n>0 in case read from cache (number of bytes copied).
663/// pos and len are the original values as were passed to ReadBuffer
664/// but instead we will return the inflated buffer.
665/// Note!! : If *buf == 0 we will allocate the buffer and it will be the
666/// responsibility of the caller to free it... it is useful for example
667/// to pass it to the creator of TBuffer
668
670{
671 Int_t res = 0;
672 Int_t loc = -1;
673
674 // We go straight to TTreeCache/TfileCacheRead, in order to get the info we need
675 // pointer to the original zipped chunk
676 // its index in the original unsorted offsets lists
677 //
678 // Actually there are situations in which copying the buffer is not
679 // useful. But the choice is among doing once more a small memcpy or a binary search in a large array. I prefer the former.
680 // Also, here we prefer not to trigger the (re)population of the chunks in the TFileCacheRead. That is
681 // better to be done in the main thread.
682
683 Int_t myCycle = fCycle;
684
685 if (fParallel && !fIsLearning) {
686
687 if(fNseekMax < fNseek){
688 if (gDebug > 0)
689 Info("GetUnzipBuffer", "Changing fNseekMax from:%d to:%d", fNseekMax, fNseek);
690
693 }
694
696 if ((fCycle == myCycle) && (loc >= 0) && (loc < fNseek) && (pos == fSeekSort[loc])) {
697
698 // The buffer is, at minimum, in the file cache. We must know its index in the requests list
699 // In order to get its info
700 Int_t seekidx = fSeekIndex[loc];
701
702 do {
703
704 // If the block is ready we get it immediately.
705 // And also we don't have to alloc the blks. This is supposed to be
706 // the main thread of the app.
707 if (fUnzipState.IsUnzipped(seekidx)) {
708 if(!(*buf)) {
709 *buf = fUnzipState.fUnzipChunks[seekidx].get();
710 fUnzipState.fUnzipChunks[seekidx].release();
711 *free = true;
712 } else {
713 memcpy(*buf, fUnzipState.fUnzipChunks[seekidx].get(), fUnzipState.fUnzipLen[seekidx]);
714 fUnzipState.fUnzipChunks[seekidx].reset();
715 *free = false;
716 }
717
718 fNFound++;
719 return fUnzipState.fUnzipLen[seekidx];
720 }
721
722 // If the requested basket is being unzipped by a background task, we try to steal a blk to unzip.
723 Int_t reqi = -1;
724
725 if (fUnzipState.IsProgress(seekidx)) {
726 if (fEmpty) {
727 for (Int_t ii = 0; ii < fNseek; ++ii) {
728 Int_t idx = (seekidx + 1 + ii) % fNseek;
729 if (fUnzipState.IsUntouched(idx)) {
730 if(fUnzipState.TryUnzipping(idx)) {
731 reqi = idx;
732 break;
733 }
734 }
735 }
736 if (reqi < 0) {
737 fEmpty = false;
738 } else {
739 UnzipCache(reqi);
740 }
741 }
742
743 if ( myCycle != fCycle ) {
744 if (gDebug > 0)
745 Info("GetUnzipBuffer", "Sudden paging Break!!! fNseek: %d, fIsLearning:%d",
747
748 seekidx = -1;
749 break;
750 }
751 }
752
753 } while (fUnzipState.IsProgress(seekidx));
754
755 // Here the block is not pending. It could be done or aborted or not yet being processed.
756 if ( (seekidx >= 0) && (fUnzipState.IsUnzipped(seekidx)) ) {
757 if(!(*buf)) {
758 *buf = fUnzipState.fUnzipChunks[seekidx].get();
759 fUnzipState.fUnzipChunks[seekidx].release();
760 *free = true;
761 } else {
762 memcpy(*buf, fUnzipState.fUnzipChunks[seekidx].get(), fUnzipState.fUnzipLen[seekidx]);
763 fUnzipState.fUnzipChunks[seekidx].reset();
764 *free = false;
765 }
766
767 fNStalls++;
768 return fUnzipState.fUnzipLen[seekidx];
769 } else {
770 // This is a complete miss. We want to avoid the background tasks
771 // to try unzipping this block in the future.
772 fUnzipState.SetMissed(seekidx);
773 }
774 } else {
775 loc = -1;
776 fIsTransferred = false;
777 }
778 }
779
780 if (len > fCompBufferSize) {
781 if(fCompBuffer) delete [] fCompBuffer;
782 fCompBuffer = new char[len];
784 } else {
785 if (fCompBufferSize > len * 4) {
786 if(fCompBuffer) delete [] fCompBuffer;
787 fCompBuffer = new char[len*2];
788 fCompBufferSize = len * 2;
789 }
790 }
791
792 res = 0;
793 if (!ReadBufferExt(fCompBuffer, pos, len, loc)) {
794 // Cache is invalidated and we need to wait for all unzipping tasks to be finished before fill new baskets in cache.
795#ifdef R__USE_IMT
797 fUnzipTaskGroup->Cancel();
798 fUnzipTaskGroup.reset();
799 }
800#endif
801 {
802 // Fill new baskets into cache.
803 R__LOCKGUARD(fIOMutex.get());
804 fFile->Seek(pos);
806 } // end of lock scope
807#ifdef R__USE_IMT
809 CreateTasks();
810 }
811#endif
812 }
813
814 if (res) res = -1;
815
816 if (!res) {
817 res = UnzipBuffer(buf, fCompBuffer);
818 *free = true;
819 }
820
821 if (!fIsLearning) {
822 fNMissed++;
823 }
824
825 return res;
826}
827
828////////////////////////////////////////////////////////////////////////////////
829/// static function: Sets the unzip relative buffer size
830
832{
833 fgRelBuffSize = relbufferSize;
834}
835
836////////////////////////////////////////////////////////////////////////////////
837/// Sets the size for the unzipping cache... by default it should be
838/// two times the size of the prefetching cache
839
841{
842 fUnzipBufferSize = bufferSize;
843}
844
845////////////////////////////////////////////////////////////////////////////////
846/// Unzips a ROOT specific buffer... by reading the header at the beginning.
847/// returns the size of the inflated buffer or -1 if error
848/// Note!! : If *dest == 0 we will allocate the buffer and it will be the
849/// responsibility of the caller to free it... it is useful for example
850/// to pass it to the creator of TBuffer
851/// src is the original buffer with the record (header+compressed data)
852/// *dest is the inflated buffer (including the header)
853
855{
856 Int_t uzlen = 0;
857 bool alloc = false;
858
859 // Here we read the header of the buffer
860 const Int_t hlen = 128;
861 Int_t nbytes = 0, objlen = 0, keylen = 0;
862 GetRecordHeader(src, hlen, nbytes, objlen, keylen);
863
864 if (!(*dest)) {
865 /* early consistency check */
866 UChar_t *bufcur = (UChar_t *) (src + keylen);
867 Int_t nin, nbuf;
868 if(objlen > nbytes - keylen && R__unzip_header(&nin, bufcur, &nbuf) != 0) {
869 Error("UnzipBuffer", "Inconsistency found in header (nin=%d, nbuf=%d)", nin, nbuf);
870 uzlen = -1;
871 return uzlen;
872 }
873 Int_t l = keylen + objlen;
874 *dest = new char[l];
875 alloc = true;
876 }
877 // Must unzip the buffer
878 // fSeekPos[ind]; adress of zipped buffer
879 // fSeekLen[ind]; len of the zipped buffer
880 // &fBuffer[fSeekPos[ind]]; memory address
881
882 // This is similar to TBasket::ReadBasketBuffers
883 bool oldCase = objlen == nbytes - keylen
884 && ((TBranch*)fBranches->UncheckedAt(0))->GetCompressionLevel() != 0
885 && fFile->GetVersion() <= 30401;
886
887 if (objlen > nbytes-keylen || oldCase) {
888
889 // Copy the key
890 memcpy(*dest, src, keylen);
891 uzlen += keylen;
892
893 char *objbuf = *dest + keylen;
894 UChar_t *bufcur = (UChar_t *) (src + keylen);
895 Int_t nin, nbuf;
896 Int_t nout = 0;
897 Int_t noutot = 0;
898
899 while (true) {
900 Int_t hc = R__unzip_header(&nin, bufcur, &nbuf);
901 if (hc != 0) break;
902 if (gDebug > 2)
903 Info("UnzipBuffer", " nin:%d, nbuf:%d, bufcur[3] :%d, bufcur[4] :%d, bufcur[5] :%d ",
904 nin, nbuf, bufcur[3], bufcur[4], bufcur[5]);
905 if (oldCase && (nin > objlen || nbuf > objlen)) {
906 if (gDebug > 2)
907 Info("UnzipBuffer", "oldcase objlen :%d ", objlen);
908
909 //buffer was very likely not compressed in an old version
910 memcpy(*dest + keylen, src + keylen, objlen);
911 uzlen += objlen;
912 return uzlen;
913 }
914
915 R__unzip(&nin, bufcur, &nbuf, objbuf, &nout);
916
917 if (gDebug > 2)
918 Info("UnzipBuffer", "R__unzip nin:%d, bufcur:%p, nbuf:%d, objbuf:%p, nout:%d",
919 nin, bufcur, nbuf, objbuf, nout);
920
921 if (!nout) break;
922 noutot += nout;
923 if (noutot >= objlen) break;
924 bufcur += nin;
925 objbuf += nout;
926 }
927
928 if (noutot != objlen) {
929 Error("UnzipBuffer", "nbytes = %d, keylen = %d, objlen = %d, noutot = %d, nout=%d, nin=%d, nbuf=%d",
930 nbytes,keylen,objlen, noutot,nout,nin,nbuf);
931 uzlen = -1;
932 if(alloc) delete [] *dest;
933 *dest = nullptr;
934 return uzlen;
935 }
936 uzlen += objlen;
937 } else {
938 memcpy(*dest, src, keylen);
939 uzlen += keylen;
940 memcpy(*dest + keylen, src + keylen, objlen);
941 uzlen += objlen;
942 }
943 return uzlen;
944}
945
946////////////////////////////////////////////////////////////////////////////////
947
949
950 printf("******TreeCacheUnzip statistics for file: %s ******\n",fFile->GetName());
951 printf("Max allowed mem for pending buffers: %lld\n", fUnzipBufferSize);
952 printf("Number of blocks unzipped by threads: %d\n", fNUnzip);
953 printf("Number of hits: %d\n", fNFound);
954 printf("Number of stalls: %d\n", fNStalls);
955 printf("Number of misses: %d\n", fNMissed);
956
958}
959
960////////////////////////////////////////////////////////////////////////////////
961
963 R__LOCKGUARD(fIOMutex.get());
964 return TTreeCache::ReadBufferExt(buf, pos, len, loc);
965}
void frombuf(char *&buf, Bool_t *x)
Definition Bytes.h:278
#define b(i)
Definition RSha256.hxx:100
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
int Int_t
Definition RtypesCore.h:45
unsigned char Byte_t
Definition RtypesCore.h:64
short Version_t
Definition RtypesCore.h:65
unsigned char UChar_t
Definition RtypesCore.h:38
float Float_t
Definition RtypesCore.h:57
short Short_t
Definition RtypesCore.h:39
double Double_t
Definition RtypesCore.h:59
long long Long64_t
Definition RtypesCore.h:69
const char Option_t
Definition RtypesCore.h:66
#define ClassImp(name)
Definition Rtypes.h:382
R__EXTERN TEnv * gEnv
Definition TEnv.h:170
Option_t Option_t option
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t dest
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t index
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h Atom_t Int_t ULong_t ULong_t unsigned char prop_list Atom_t Atom_t Atom_t Time_t UChar_t len
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t src
Int_t gDebug
Definition TROOT.cxx:597
void R__unzip(Int_t *nin, UChar_t *bufin, Int_t *lout, char *bufout, Int_t *nout)
int R__unzip_header(Int_t *nin, UChar_t *bufin, Int_t *lout)
#define R__LOCKGUARD(mutex)
#define free
Definition civetweb.c:1539
This class provides a simple interface to execute the same task multiple times in parallel threads,...
void Foreach(F func, unsigned nTimes, unsigned nChunks=0)
Execute a function without arguments several times in parallel, dividing the execution in nChunks.
A TTree is a list of TBranches.
Definition TBranch.h:93
A chain is a collection of files containing TTree objects.
Definition TChain.h:33
static TClass * Class()
Long64_t * GetTreeOffset() const
Definition TChain.h:121
Int_t GetTreeNumber() const override
Definition TChain.h:120
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
Definition TEnv.cxx:491
<div class="legacybox"><h2>Legacy Code</h2> TEventList is a legacy interface: there will be no bug fi...
Definition TEventList.h:31
virtual bool ContainsRange(Long64_t entrymin, Long64_t entrymax)
Return TRUE if list contains entries from entrymin to entrymax included.
Int_t * fSeekIndex
[fNseek] sorted index table of fSeek
virtual Int_t ReadBufferExt(char *buf, Long64_t pos, Int_t len, Int_t &loc)
Long64_t * fSeekSort
[fNseek] Position on file of buffers to be prefetched (sorted)
Int_t * fSeekLen
[fNseek] Length of buffers to be prefetched
Int_t fNtot
Total size of prefetched blocks.
virtual void Prefetch(Long64_t pos, Int_t len)
Add block of length len at position pos in the list of blocks to be prefetched.
Long64_t * fSeek
[fNseek] Position on file of buffers to be prefetched
Bool_t fIsTransferred
True when fBuffer contains something valid.
TFile * fFile
Pointer to file.
Int_t fNseek
Number of blocks to be prefetched.
virtual Int_t GetBufferSize() const
virtual void Seek(Long64_t offset, ERelativeTo pos=kBeg)
Seek to a specific position in the file. Pos it either kBeg, kCur or kEnd.
Definition TFile.cxx:2271
Int_t GetVersion() const
Definition TFile.h:245
virtual Bool_t ReadBufferAsync(Long64_t offs, Int_t len)
Definition TFile.cxx:5205
virtual Bool_t ReadBuffer(char *buf, Int_t len)
Read a buffer from the file.
Definition TFile.cxx:1767
const char * GetName() const override
Returns name of object.
Definition TNamed.h:47
TObject * UncheckedAt(Int_t i) const
Definition TObjArray.h:84
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition TObject.cxx:991
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition TObject.cxx:1005
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition TObject.cxx:979
A TTreeCache which exploits parallelized decompression of its own content.
UnzipState_t fUnzipState
void Init()
Initialization procedure common to all the constructors.
Int_t UnzipCache(Int_t index)
This inflates a basket in the cache.
Int_t SetBufferSize(Long64_t buffersize) override
Change the underlying buffer size of the cache.
Int_t fNMissed
! number of blocks that were not found in the cache and were unzipped
void UpdateBranches(TTree *tree) override
update pointer to current Tree and recompute pointers to the branches in the cache
Int_t AddBranch(TBranch *b, bool subbranches=false) override
Add a branch to the list of branches to be stored in the cache this function is called by TBranch::Ge...
void ResetCache() override
This will delete the list of buffers that are in the unzipping cache and will reset certain values in...
void SetEntryRange(Long64_t emin, Long64_t emax) override
Set the minimum and maximum entry number to be processed this information helps to optimize the numbe...
static bool IsParallelUnzip()
Static function that tells wether the multithreading unzipping is activated.
Int_t fNseekMax
! fNseek can change so we need to know its max size
Int_t fNStalls
! number of hits which caused a stall
static TTreeCacheUnzip::EParUnzipMode fgParallel
Indicate if we want to activate the parallelism.
std::unique_ptr< ROOT::Experimental::TTaskGroup > fUnzipTaskGroup
static Int_t SetParallelUnzip(TTreeCacheUnzip::EParUnzipMode option=TTreeCacheUnzip::kEnable)
Static function that (de)activates multithreading unzipping.
bool FillBuffer() override
Fill the cache buffer with the branches in the cache.
Int_t ReadBufferExt(char *buf, Long64_t pos, Int_t len, Int_t &loc) override
Int_t UnzipBuffer(char **dest, char *src)
Unzips a ROOT specific buffer... by reading the header at the beginning.
Int_t GetRecordHeader(char *buf, Int_t maxbytes, Int_t &nbytes, Int_t &objlen, Int_t &keylen)
Read the logical record header from the buffer buf.
static void SetUnzipRelBufferSize(Float_t relbufferSize)
static function: Sets the unzip relative buffer size
std::unique_ptr< TMutex > fIOMutex
Int_t fNUnzip
! number of blocks that were unzipped
Int_t CreateTasks()
We create a TTaskGroup and asynchronously maps each group of baskets(> 100 kB in total) to a task.
Int_t GetUnzipBuffer(char **buf, Long64_t pos, Int_t len, bool *free) override
We try to read a buffer that has already been unzipped Returns -1 in case of read failure,...
Long64_t fUnzipBufferSize
! Max Size for the ready unzipped blocks (default is 2*fBufferSize)
void SetUnzipBufferSize(Long64_t bufferSize)
Sets the size for the unzipping cache... by default it should be two times the size of the prefetchin...
Int_t fUnzipGroupSize
! Min accumulated size of a group of baskets ready to be unzipped by a IMT task
void StopLearningPhase() override
It's the same as TTreeCache::StopLearningPhase but we guarantee that we start the unzipping just afte...
void Print(Option_t *option="") const override
Print cache statistics.
bool fParallel
Indicate if we want to activate the parallelism (for this instance)
Int_t fNFound
! number of blocks that were found in the cache
static Double_t fgRelBuffSize
This is the percentage of the TTreeCacheUnzip that will be used.
~TTreeCacheUnzip() override
Destructor. (in general called by the TFile destructor)
static EParUnzipMode GetParallelUnzip()
Static function that returns the parallel option (to indicate an additional thread)
A cache to speed-up the reading of ROOT datasets.
Definition TTreeCache.h:32
virtual void UpdateBranches(TTree *tree)
Update pointer to current Tree and recompute pointers to the branches in the cache.
Int_t SetBufferSize(Long64_t buffersize) override
Change the underlying buffer size of the cache.
Long64_t fEntryMin
! first entry in the cache
Definition TTreeCache.h:38
Long64_t fEntryNext
! next entry number where cache must be filled
Definition TTreeCache.h:41
TTree * GetTree() const
Definition TTreeCache.h:149
bool fIsLearning
! true if cache is in learning mode
Definition TTreeCache.h:54
virtual void SetEntryRange(Long64_t emin, Long64_t emax)
Set the minimum and maximum entry number to be processed this information helps to optimize the numbe...
Long64_t fEntryMax
! last entry in the cache
Definition TTreeCache.h:39
Long64_t fEntryCurrent
! current lowest entry number in the cache
Definition TTreeCache.h:40
Int_t fNReadPref
Number of blocks that were prefetched.
Definition TTreeCache.h:49
TTree * fTree
! pointer to the current Tree
Definition TTreeCache.h:53
virtual void StopLearningPhase()
This is the counterpart of StartLearningPhase() and can be used to stop the learning phase.
Int_t fNbranches
! Number of branches in the cache
Definition TTreeCache.h:44
void Print(Option_t *option="") const override
Print cache statistics.
Int_t AddBranch(TBranch *b, bool subgbranches=false) override
Add a branch to the list of branches to be stored in the cache this function is called by the user vi...
TObjArray * fBranches
! List of branches to be stored in the cache
Definition TTreeCache.h:51
Helper class to iterate over cluster of baskets.
Definition TTree.h:270
Long64_t GetNextEntry()
Definition TTree.h:307
A TTree represents a columnar dataset.
Definition TTree.h:79
TEventList * GetEventList() const
Definition TTree.h:513
TClass * IsA() const override
Definition TTree.h:705
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
Definition TROOT.cxx:570
Long64_t BinarySearch(Long64_t n, const T *array, T value)
Binary search in an array of n values to locate value.
Definition TMathBase.h:347
void Reset(Int_t oldSize, Int_t newSize)
Reset all baskets' state arrays.
void Clear(Int_t size)
Clear all baskets' state arrays.
bool IsUnzipped(Int_t index) const
Check if the basket is unzipped already.
bool IsFinished(Int_t index) const
std::atomic< Byte_t > * fUnzipStatus
! [fNSeek]
void SetUnzipped(Int_t index, char *buf, Int_t len)
std::vector< Int_t > fUnzipLen
! [fNseek] Length of the unzipped buffers
bool TryUnzipping(Int_t index)
Start unzipping the basket if it is untouched yet.
bool IsProgress(Int_t index) const
bool IsUntouched(Int_t index) const
void SetFinished(Int_t index)
Set cache as finished.
std::unique_ptr< char[]> * fUnzipChunks
! [fNseek] Individual unzipped chunks. Their summed size is kept under control.
TLine l
Definition textangle.C:4