Logo ROOT  
Reference Guide
TTreeProcessorMT.cxx
Go to the documentation of this file.
1// @(#)root/thread:$Id$
2// Authors: Enric Tejedor, Enrico Guiraud CERN 05/06/2018
3
4/*************************************************************************
5 * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12/** \class ROOT::TTreeProcessorMT
13 \ingroup Parallelism
14 \brief A class to process the entries of a TTree in parallel.
15
16By means of its Process method, ROOT::TTreeProcessorMT provides a way to process the
17entries of a TTree in parallel. When invoking TTreeProcessor::Process, the user
18passes a function whose only parameter is a TTreeReader. The function iterates
19on a subrange of entries by using that TTreeReader.
20
21The implementation of ROOT::TTreeProcessorMT parallelizes the processing of the subranges,
22each corresponding to a cluster in the TTree. This is possible thanks to the use
23of a ROOT::TThreadedObject, so that each thread works with its own TFile and TTree
24objects.
25*/
26
27#include "TROOT.h"
29
30using namespace ROOT;
31
32namespace {
33
34/// A cluster of entries
35struct EntryCluster {
36 Long64_t start;
37 Long64_t end;
38};
39
40// note that this routine assumes global entry numbers
41static bool ClustersAreSortedAndContiguous(const std::vector<std::vector<EntryCluster>> &cls)
42{
43 Long64_t last_end = 0ll;
44 for (const auto &fcl : cls) {
45 for (const auto &c : fcl) {
46 if (last_end != c.start)
47 return false;
48 last_end = c.end;
49 }
50 }
51 return true;
52}
53
54/// Take a vector of vectors of EntryClusters (a vector per file), filter the entries according to entryList, and
55/// and return a new vector of vectors of EntryClusters where cluster start/end entry numbers have been converted to
56/// TEntryList-local entry numbers.
57///
58/// This routine assumes that entry numbers in the TEntryList (and, if present, in the sub-entrylists) are in
59/// ascending order, i.e., for n > m:
60/// elist.GetEntry(n) + tree_offset_for_entry_from_elist(n) > elist.GetEntry(m) + tree_offset_for_entry_from_elist(m)
61static std::vector<std::vector<EntryCluster>>
62ConvertToElistClusters(std::vector<std::vector<EntryCluster>> &&clusters, TEntryList &entryList,
63 const std::vector<std::string> &treeNames, const std::vector<std::string> &fileNames,
64 const std::vector<Long64_t> &entriesPerFile)
65{
66 R__ASSERT(entryList.GetN() > 0); // wasteful to call this function if it has nothing to do
67 R__ASSERT(ClustersAreSortedAndContiguous(clusters));
68
69 const bool listHasGlobalEntryNumbers = entryList.GetLists() == nullptr;
70 const auto nFiles = clusters.size();
71
72 std::unique_ptr<TChain> chain;
73 using NextFn_t = Long64_t (*)(Long64_t &, TEntryList &, TChain *);
74 // A function that advances TEntryList and returns global entry numbers or -1 if we reached the end
75 // (might or might not need a TChain depending on whether listHasGlobalEntryNumbers)
76 NextFn_t Next;
77 if (listHasGlobalEntryNumbers) {
78 Next = [](Long64_t &elEntry, TEntryList &elist, TChain *) {
79 ++elEntry;
80 return elist.Next();
81 };
82 } else {
83 // we need `chain` to be able to convert local entry numbers to global entry numbers in `Next`
84 chain.reset(new TChain());
85 for (auto i = 0u; i < nFiles; ++i)
86 chain->Add((fileNames[i] + "?#" + treeNames[i]).c_str(), entriesPerFile[i]);
87 Next = [](Long64_t &elEntry, TEntryList &elist, TChain *ch) {
88 ++elEntry;
89 int treenum = -1;
90 Long64_t localEntry = elist.GetEntryAndTree(elEntry, treenum);
91 if (localEntry == -1ll)
92 return localEntry;
93 return localEntry + ch->GetTreeOffset()[treenum];
94 };
95 }
96
97 // the call to GetEntry also serves the purpose to reset TEntryList::fLastIndexQueried,
98 // so we can be sure TEntryList::Next will return the correct thing
99 Long64_t elistEntry = 0ll;
100 Long64_t entry = entryList.GetEntry(elistEntry);
101
102 std::vector<std::vector<EntryCluster>> elistClusters;
103
104 for (auto fileN = 0u; fileN < nFiles; ++fileN) {
105 std::vector<EntryCluster> elistClustersForFile;
106 for (const auto &c : clusters[fileN]) {
107 if (entry >= c.end || entry == -1ll) // no entrylist entries in this cluster
108 continue;
109 R__ASSERT(entry >= c.start); // current entry should never come before the cluster we are looking at
110 const Long64_t elistRangeStart = elistEntry;
111 // advance entry list until the entrylist entry goes beyond the end of the cluster
112 while (entry < c.end && entry != -1ll)
113 entry = Next(elistEntry, entryList, chain.get());
114 elistClustersForFile.emplace_back(EntryCluster{elistRangeStart, elistEntry});
115 }
116 elistClusters.emplace_back(std::move(elistClustersForFile));
117 }
118
119 R__ASSERT(elistClusters.size() == clusters.size()); // same number of files
120 R__ASSERT(ClustersAreSortedAndContiguous(elistClusters));
121
122 entryList.GetEntry(0ll); // reset TEntryList internal state, lest we incur in ROOT-10807
123 return elistClusters;
124}
125
126// EntryClusters and number of entries per file
127using ClustersAndEntries = std::pair<std::vector<std::vector<EntryCluster>>, std::vector<Long64_t>>;
128
129////////////////////////////////////////////////////////////////////////
130/// Return a vector of cluster boundaries for the given tree and files.
131static ClustersAndEntries MakeClusters(const std::vector<std::string> &treeNames,
132 const std::vector<std::string> &fileNames, const unsigned int maxTasksPerFile)
133{
134 // Note that as a side-effect of opening all files that are going to be used in the
135 // analysis once, all necessary streamers will be loaded into memory.
137 const auto nFileNames = fileNames.size();
138 std::vector<std::vector<EntryCluster>> clustersPerFile;
139 std::vector<Long64_t> entriesPerFile;
140 entriesPerFile.reserve(nFileNames);
141 Long64_t offset = 0ll;
142 for (auto i = 0u; i < nFileNames; ++i) {
143 const auto &fileName = fileNames[i];
144 const auto &treeName = treeNames[i];
145
146 std::unique_ptr<TFile> f(TFile::Open(fileName.c_str())); // need TFile::Open to load plugins if need be
147 if (!f || f->IsZombie()) {
148 const auto msg = "TTreeProcessorMT::Process: an error occurred while opening file \"" + fileName + "\"";
149 throw std::runtime_error(msg);
150 }
151 auto *t = f->Get<TTree>(treeName.c_str()); // t will be deleted by f
152
153 if (!t) {
154 const auto msg = "TTreeProcessorMT::Process: an error occurred while getting tree \"" + treeName +
155 "\" from file \"" + fileName + "\"";
156 throw std::runtime_error(msg);
157 }
158
159 auto clusterIter = t->GetClusterIterator(0);
160 Long64_t start = 0ll, end = 0ll;
161 const Long64_t entries = t->GetEntries();
162 // Iterate over the clusters in the current file
163 std::vector<EntryCluster> clusters;
164 while ((start = clusterIter()) < entries) {
165 end = clusterIter.GetNextEntry();
166 // Add the current file's offset to start and end to make them (chain) global
167 clusters.emplace_back(EntryCluster{start + offset, end + offset});
168 }
169 offset += entries;
170 clustersPerFile.emplace_back(std::move(clusters));
171 entriesPerFile.emplace_back(entries);
172 }
173
174 // Here we "fuse" clusters together if the number of clusters is too big with respect to
175 // the number of slots, otherwise we can incur in an overhead which is big enough
176 // to make parallelisation detrimental to performance.
177 // For example, this is the case when, following a merging of many small files, a file
178 // contains a tree with many entries and with clusters of just a few entries each.
179 // Another problematic case is a high number of slots (e.g. 256) coupled with a high number
180 // of files (e.g. 1000 files): the large amount of files might result in a large amount
181 // of tasks, but the elevated concurrency level makes the little synchronization required by
182 // task initialization very expensive. In this case it's better to simply process fewer, larger tasks.
183 // Cluster-merging can help reduce the number of tasks down to a minumum of one task per file.
184 //
185 // The criterion according to which we fuse clusters together is to have around
186 // TTreeProcessorMT::GetTasksPerWorkerHint() clusters per slot.
187 // Concretely, for each file we will cap the number of tasks to ceil(GetTasksPerWorkerHint() * nWorkers / nFiles).
188
189 std::vector<std::vector<EntryCluster>> eventRangesPerFile(clustersPerFile.size());
190 auto clustersPerFileIt = clustersPerFile.begin();
191 auto eventRangesPerFileIt = eventRangesPerFile.begin();
192 for (; clustersPerFileIt != clustersPerFile.end(); clustersPerFileIt++, eventRangesPerFileIt++) {
193 const auto clustersInThisFileSize = clustersPerFileIt->size();
194 const auto nFolds = clustersInThisFileSize / maxTasksPerFile;
195 // If the number of clusters is less than maxTasksPerFile
196 // we take the clusters as they are
197 if (nFolds == 0) {
198 *eventRangesPerFileIt = std::move(*clustersPerFileIt);
199 continue;
200 }
201 // Otherwise, we have to merge clusters, distributing the reminder evenly
202 // onto the first clusters
203 auto nReminderClusters = clustersInThisFileSize % maxTasksPerFile;
204 const auto &clustersInThisFile = *clustersPerFileIt;
205 for (auto i = 0ULL; i < clustersInThisFileSize; ++i) {
206 const auto start = clustersInThisFile[i].start;
207 // We lump together at least nFolds clusters, therefore
208 // we need to jump ahead of nFolds-1.
209 i += (nFolds - 1);
210 // We now add a cluster if we have some reminder left
211 if (nReminderClusters > 0) {
212 i += 1U;
213 nReminderClusters--;
214 }
215 const auto end = clustersInThisFile[i].end;
216 eventRangesPerFileIt->emplace_back(EntryCluster({start, end}));
217 }
218 }
219
220 return std::make_pair(std::move(eventRangesPerFile), std::move(entriesPerFile));
221}
222
223////////////////////////////////////////////////////////////////////////
224/// Return a vector containing the number of entries of each file of each friend TChain
225static std::vector<std::vector<Long64_t>> GetFriendEntries(const Internal::TreeUtils::RFriendInfo &friendInfo)
226{
227
228 const auto &friendNames = friendInfo.fFriendNames;
229 const auto &friendFileNames = friendInfo.fFriendFileNames;
230 const auto &friendChainSubNames = friendInfo.fFriendChainSubNames;
231
232 std::vector<std::vector<Long64_t>> friendEntries;
233 const auto nFriends = friendNames.size();
234 for (auto i = 0u; i < nFriends; ++i) {
235 std::vector<Long64_t> nEntries;
236 const auto &thisFriendName = friendNames[i].first;
237 const auto &thisFriendFiles = friendFileNames[i];
238 const auto &thisFriendChainSubNames = friendChainSubNames[i];
239 // If this friend has chain sub names, it means it's a TChain.
240 // In this case, we need to traverse all files that make up the TChain,
241 // retrieve the correct sub tree from each file and store the number of
242 // entries for that sub tree.
243 if (!thisFriendChainSubNames.empty()) {
244 // Traverse together filenames and respective treenames
245 for (auto fileidx = 0u; fileidx < thisFriendFiles.size(); ++fileidx) {
246 std::unique_ptr<TFile> curfile(TFile::Open(thisFriendFiles[fileidx].c_str()));
247 if (!curfile || curfile->IsZombie())
248 throw std::runtime_error("TTreeProcessorMT::GetFriendEntries: Could not open file \"" +
249 thisFriendFiles[fileidx] + "\"");
250 // thisFriendChainSubNames[fileidx] stores the name of the current
251 // subtree in the TChain stored in the current file.
252 TTree *curtree = curfile->Get<TTree>(thisFriendChainSubNames[fileidx].c_str());
253 if (!curtree)
254 throw std::runtime_error("TTreeProcessorMT::GetFriendEntries: Could not retrieve TTree \"" +
255 thisFriendChainSubNames[fileidx] + "\" from file \"" +
256 thisFriendFiles[fileidx] + "\"");
257 nEntries.emplace_back(curtree->GetEntries());
258 }
259 // Otherwise, if there are no sub names for the current friend, it means
260 // it's a TTree. We can safely use `thisFriendName` as the name of the tree
261 // to retrieve from the file in `thisFriendFiles`
262 } else {
263 for (const auto &fname : thisFriendFiles) {
264 std::unique_ptr<TFile> f(TFile::Open(fname.c_str()));
265 if (!f || f->IsZombie())
266 throw std::runtime_error("TTreeProcessorMT::GetFriendEntries: Could not open file \"" + fname + "\"");
267 TTree *t = f->Get<TTree>(thisFriendName.c_str());
268 if (!t)
269 throw std::runtime_error("TTreeProcessorMT::GetFriendEntries: Could not retrieve TTree \"" +
270 thisFriendName + "\" from file \"" + fname + "\"");
271 nEntries.emplace_back(t->GetEntries());
272 }
273 }
274 // Store the vector with entries for each file in the current tree/chain.
275 friendEntries.emplace_back(std::move(nEntries));
276 }
277
278 return friendEntries;
279}
280
281} // anonymous namespace
282
283namespace ROOT {
284
286
287namespace Internal {
288
289////////////////////////////////////////////////////////////////////////////////
290/// Construct fChain, also adding friends if needed and injecting knowledge of offsets if available.
291/// \param[in] treeNames Name of the tree for each file in `fileNames`.
292/// \param[in] fileNames Files to be opened.
293/// \param[in] friendInfo Information about TTree friends, if any.
294/// \param[in] nEntries Number of entries to be processed.
295/// \param[in] friendEntries Number of entries in each friend. Expected to have same ordering as friendInfo.
296void TTreeView::MakeChain(const std::vector<std::string> &treeNames, const std::vector<std::string> &fileNames,
297 const TreeUtils::RFriendInfo &friendInfo, const std::vector<Long64_t> &nEntries,
298 const std::vector<std::vector<Long64_t>> &friendEntries)
299{
300
301 const auto &friendNames = friendInfo.fFriendNames;
302 const auto &friendFileNames = friendInfo.fFriendFileNames;
303 const auto &friendChainSubNames = friendInfo.fFriendChainSubNames;
304
305 fChain.reset(new TChain());
306 const auto nFiles = fileNames.size();
307 for (auto i = 0u; i < nFiles; ++i) {
308 fChain->Add((fileNames[i] + "?#" + treeNames[i]).c_str(), nEntries[i]);
309 }
310 fChain->ResetBit(TObject::kMustCleanup);
311
312 fFriends.clear();
313 const auto nFriends = friendNames.size();
314 for (auto i = 0u; i < nFriends; ++i) {
315 const auto &thisFriendNameAlias = friendNames[i];
316 const auto &thisFriendName = thisFriendNameAlias.first;
317 const auto &thisFriendAlias = thisFriendNameAlias.second;
318 const auto &thisFriendFiles = friendFileNames[i];
319 const auto &thisFriendChainSubNames = friendChainSubNames[i];
320 const auto &thisFriendEntries = friendEntries[i];
321
322 // Build a friend chain
323 auto frChain = std::make_unique<TChain>(thisFriendName.c_str());
324 const auto nFileNames = friendFileNames[i].size();
325 if (thisFriendChainSubNames.empty()) {
326 // If there are no chain subnames, the friend was a TTree. It's safe
327 // to add to the chain the filename directly.
328 for (auto j = 0u; j < nFileNames; ++j) {
329 frChain->Add(thisFriendFiles[j].c_str(), thisFriendEntries[j]);
330 }
331 } else {
332 // Otherwise, the new friend chain needs to be built using the nomenclature
333 // "filename/treename" as argument to `TChain::Add`
334 for (auto j = 0u; j < nFileNames; ++j) {
335 frChain->Add((thisFriendFiles[j] + "?#" + thisFriendChainSubNames[j]).c_str(), thisFriendEntries[j]);
336 }
337 }
338
339 // Make it friends with the main chain
340 fChain->AddFriend(frChain.get(), thisFriendAlias.c_str());
341 fFriends.emplace_back(std::move(frChain));
342 }
343}
344
345//////////////////////////////////////////////////////////////////////////
346/// Get a TTreeReader for the current tree of this view.
347std::unique_ptr<TTreeReader>
348TTreeView::GetTreeReader(Long64_t start, Long64_t end, const std::vector<std::string> &treeNames,
349 const std::vector<std::string> &fileNames, const TreeUtils::RFriendInfo &friendInfo,
350 const TEntryList &entryList, const std::vector<Long64_t> &nEntries,
351 const std::vector<std::vector<Long64_t>> &friendEntries)
352{
353 const bool hasEntryList = entryList.GetN() > 0;
354 const bool usingLocalEntries = friendInfo.fFriendNames.empty() && !hasEntryList;
355 const bool needNewChain =
356 fChain == nullptr || (usingLocalEntries && (fileNames[0] != fChain->GetListOfFiles()->At(0)->GetTitle() ||
357 treeNames[0] != fChain->GetListOfFiles()->At(0)->GetName()));
358 if (needNewChain) {
359 MakeChain(treeNames, fileNames, friendInfo, nEntries, friendEntries);
360 if (hasEntryList) {
361 fEntryList.reset(new TEntryList(entryList));
362 if (fEntryList->GetLists() != nullptr) {
363 // need to associate the TEntryList to the TChain for the latter to set entry the fTreeNumbers of the
364 // sub-lists of the former...
365 fChain->SetEntryList(fEntryList.get());
366 fEntryList->ResetBit(TObject::kCanDelete); // ...but we want to retain ownership
367 }
368 }
369 }
370 auto reader = std::make_unique<TTreeReader>(fChain.get(), fEntryList.get());
371 reader->SetEntriesRange(start, end);
372 return reader;
373}
374
375} // namespace Internal
376} // namespace ROOT
377
378/////////////////////////////////////////////////////////////////////////////////////////////////
379/// Retrieve the names of the TTrees in each of the input files, throw if a TTree cannot be found.
380std::vector<std::string> TTreeProcessorMT::FindTreeNames()
381{
382 std::vector<std::string> treeNames;
383
384 if (fFileNames.empty()) // This can never happen
385 throw std::runtime_error("Empty list of files and no tree name provided");
386
388 for (const auto &fname : fFileNames) {
389 std::string treeName;
390 std::unique_ptr<TFile> f(TFile::Open(fname.c_str()));
391 TIter next(f->GetListOfKeys());
392 while (auto *key = static_cast<TKey *>(next())) {
393 const char *className = key->GetClassName();
394 if (strcmp(className, "TTree") == 0) {
395 treeName = key->GetName();
396 break;
397 }
398 }
399 if (treeName.empty())
400 throw std::runtime_error("Cannot find any tree in file " + fname);
401 treeNames.emplace_back(std::move(treeName));
402 }
403
404 return treeNames;
405}
406
407////////////////////////////////////////////////////////////////////////
408/// Constructor based on a file name.
409/// \param[in] filename Name of the file containing the tree to process.
410/// \param[in] treename Name of the tree to process. If not provided, the implementation will search
411/// for a TTree key in the file and will use the first one it finds.
412/// \param[in] nThreads Number of threads to create in the underlying thread-pool. The semantics of this argument are
413/// the same as for TThreadExecutor.
415 : fFileNames({std::string(filename)}),
416 fTreeNames(treename.empty() ? FindTreeNames() : std::vector<std::string>{std::string(treename)}), fFriendInfo(),
417 fPool(nThreads)
418{
420}
421
422std::vector<std::string> CheckAndConvert(const std::vector<std::string_view> &views)
423{
424 if (views.empty())
425 throw std::runtime_error("The provided list of file names is empty");
426
427 std::vector<std::string> strings;
428 strings.reserve(views.size());
429 for (const auto &v : views)
430 strings.emplace_back(v);
431 return strings;
432}
433
434////////////////////////////////////////////////////////////////////////
435/// Constructor based on a collection of file names.
436/// \param[in] filenames Collection of the names of the files containing the tree to process.
437/// \param[in] treename Name of the tree to process. If not provided, the implementation will
438/// search filenames for a TTree key and will use the first one it finds in each file.
439/// \param[in] nThreads Number of threads to create in the underlying thread-pool. The semantics of this argument are
440/// the same as for TThreadExecutor.
441///
442/// If different files contain TTrees with different names and automatic TTree name detection is not an option
443/// (for example, because some of the files contain multiple TTrees) please manually create a TChain and pass
444/// it to the appropriate TTreeProcessorMT constructor.
445TTreeProcessorMT::TTreeProcessorMT(const std::vector<std::string_view> &filenames, std::string_view treename,
446 UInt_t nThreads)
447 : fFileNames(CheckAndConvert(filenames)),
448 fTreeNames(treename.empty() ? FindTreeNames()
449 : std::vector<std::string>(fFileNames.size(), std::string(treename))),
450 fFriendInfo(), fPool(nThreads)
451{
453}
454
455////////////////////////////////////////////////////////////////////////
456/// Constructor based on a TTree and a TEntryList.
457/// \param[in] tree Tree or chain of files containing the tree to process.
458/// \param[in] entries List of entry numbers to process.
459/// \param[in] nThreads Number of threads to create in the underlying thread-pool. The semantics of this argument are
460/// the same as for TThreadExecutor.
462 : fFileNames(Internal::TreeUtils::GetFileNamesFromTree(tree)),
463 fTreeNames(Internal::TreeUtils::GetTreeFullPaths(tree)), fEntryList(entries),
464 fFriendInfo(Internal::TreeUtils::GetFriendInfo(tree)), fPool(nThreads)
465{
467}
468
469////////////////////////////////////////////////////////////////////////
470/// Constructor based on a TTree.
471/// \param[in] tree Tree or chain of files containing the tree to process.
472/// \param[in] nThreads Number of threads to create in the underlying thread-pool. The semantics of this argument are
473/// the same as for TThreadExecutor.
475
476//////////////////////////////////////////////////////////////////////////////
477/// Process the entries of a TTree in parallel. The user-provided function
478/// receives a TTreeReader which can be used to iterate on a subrange of
479/// entries
480/// ~~~{.cpp}
481/// TTreeProcessorMT::Process([](TTreeReader& readerSubRange) {
482/// // Select branches to read
483/// while (readerSubRange.Next()) {
484/// // Use content of current entry
485/// }
486/// });
487/// ~~~
488/// The user needs to be aware that each of the subranges can potentially
489/// be processed in parallel. This means that the code of the user function
490/// should be thread safe.
491///
492/// \param[in] func User-defined function that processes a subrange of entries
494{
495 // compute number of tasks per file
496 const unsigned int maxTasksPerFile =
497 std::ceil(float(GetTasksPerWorkerHint() * fPool.GetPoolSize()) / float(fFileNames.size()));
498
499 // If an entry list or friend trees are present, we need to generate clusters with global entry numbers,
500 // so we do it here for all files.
501 // Otherwise we can do it later, concurrently for each file, and clusters will contain local entry numbers.
502 // TODO: in practice we could also find clusters per-file in the case of no friends and a TEntryList with
503 // sub-entrylists.
504 const bool hasFriends = !fFriendInfo.fFriendNames.empty();
505 const bool hasEntryList = fEntryList.GetN() > 0;
506 const bool shouldRetrieveAllClusters = hasFriends || hasEntryList;
507 ClustersAndEntries clusterAndEntries{};
508 if (shouldRetrieveAllClusters) {
509 clusterAndEntries = MakeClusters(fTreeNames, fFileNames, maxTasksPerFile);
510 if (hasEntryList)
511 clusterAndEntries.first = ConvertToElistClusters(std::move(clusterAndEntries.first), fEntryList, fTreeNames,
512 fFileNames, clusterAndEntries.second);
513 }
514
515 const auto &clusters = clusterAndEntries.first;
516 const auto &entries = clusterAndEntries.second;
517
518 // Retrieve number of entries for each file for each friend tree
519 const auto friendEntries = hasFriends ? GetFriendEntries(fFriendInfo) : std::vector<std::vector<Long64_t>>{};
520
521 // Parent task, spawns tasks that process each of the entry clusters for each input file
522 // TODO: for readability we should have two versions of this lambda, for shouldRetrieveAllClusters == true/false
523 auto processFile = [&](std::size_t fileIdx) {
524 // theseFiles contains either all files or just the single file to process
525 const auto &theseFiles = shouldRetrieveAllClusters ? fFileNames : std::vector<std::string>({fFileNames[fileIdx]});
526 // either all tree names or just the single tree to process
527 const auto &theseTrees = shouldRetrieveAllClusters ? fTreeNames : std::vector<std::string>({fTreeNames[fileIdx]});
528 // Evaluate clusters (with local entry numbers) and number of entries for this file, if needed
529 const auto theseClustersAndEntries =
530 shouldRetrieveAllClusters ? ClustersAndEntries{} : MakeClusters(theseTrees, theseFiles, maxTasksPerFile);
531
532 // All clusters for the file to process, either with global or local entry numbers
533 const auto &thisFileClusters = shouldRetrieveAllClusters ? clusters[fileIdx] : theseClustersAndEntries.first[0];
534
535 // Either all number of entries or just the ones for this file
536 const auto &theseEntries =
537 shouldRetrieveAllClusters ? entries : std::vector<Long64_t>({theseClustersAndEntries.second[0]});
538
539 auto processCluster = [&](const EntryCluster &c) {
540 auto r = fTreeView->GetTreeReader(c.start, c.end, theseTrees, theseFiles, fFriendInfo, fEntryList,
541 theseEntries, friendEntries);
542 func(*r);
543 };
544
545 fPool.Foreach(processCluster, thisFileClusters);
546 };
547
548 std::vector<std::size_t> fileIdxs(fFileNames.size());
549 std::iota(fileIdxs.begin(), fileIdxs.end(), 0u);
550
551 fPool.Foreach(processFile, fileIdxs);
552}
553
554////////////////////////////////////////////////////////////////////////
555/// \brief Retrieve the current value for the desired number of tasks per worker.
556/// \return The desired number of tasks to be created per worker. TTreeProcessorMT uses this value as an hint.
558{
560}
561
562////////////////////////////////////////////////////////////////////////
563/// \brief Set the hint for the desired number of tasks created per worker.
564/// \param[in] tasksPerWorkerHint Desired number of tasks per worker.
565///
566/// This allows to create a reasonable number of tasks even if any of the
567/// processed files features a bad clustering, for example with a lot of
568/// entries and just a few entries per cluster, or to limit the number of
569/// tasks spawned when a very large number of files and workers is used.
570void TTreeProcessorMT::SetTasksPerWorkerHint(unsigned int tasksPerWorkerHint)
571{
572 fgTasksPerWorkerHint = tasksPerWorkerHint;
573}
ROOT::R::TRInterface & r
Definition: Object.C:4
#define f(i)
Definition: RSha256.hxx:104
#define c(i)
Definition: RSha256.hxx:101
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
unsigned int UInt_t
Definition: RtypesCore.h:46
long long Long64_t
Definition: RtypesCore.h:80
#define gDirectory
Definition: TDirectory.h:348
#define R__ASSERT(e)
Definition: TError.h:118
std::vector< std::string > CheckAndConvert(const std::vector< std::string_view > &views)
std::unique_ptr< TChain > fChain
Chain on which to operate.
std::vector< std::unique_ptr< TChain > > fFriends
Friends of the tree/chain, if present.
std::unique_ptr< TEntryList > fEntryList
TEntryList for fChain, if present.
void MakeChain(const std::vector< std::string > &treeName, const std::vector< std::string > &fileNames, const TreeUtils::RFriendInfo &friendInfo, const std::vector< Long64_t > &nEntries, const std::vector< std::vector< Long64_t > > &friendEntries)
Construct fChain, also adding friends if needed and injecting knowledge of offsets if available.
std::unique_ptr< TTreeReader > GetTreeReader(Long64_t start, Long64_t end, const std::vector< std::string > &treeName, const std::vector< std::string > &fileNames, const TreeUtils::RFriendInfo &friendInfo, const TEntryList &entryList, const std::vector< Long64_t > &nEntries, const std::vector< std::vector< Long64_t > > &friendEntries)
Get a TTreeReader for the current tree of this view.
unsigned GetPoolSize() const
Returns the number of worker threads in the task arena.
void Foreach(F func, unsigned nTimes, unsigned nChunks=0)
Execute a function without arguments several times in parallel, dividing the execution in nChunks.
A class to process the entries of a TTree in parallel.
const std::vector< std::string > fTreeNames
TTree names (always same size and ordering as fFileNames)
const std::vector< std::string > fFileNames
Names of the files.
static unsigned int fgTasksPerWorkerHint
const Internal::TreeUtils::RFriendInfo fFriendInfo
ROOT::TThreadExecutor fPool
! Thread pool for processing.
TEntryList fEntryList
User-defined selection of entry numbers to be processed, empty if none was provided.
static void SetTasksPerWorkerHint(unsigned int m)
Set the hint for the desired number of tasks created per worker.
ROOT::TThreadedObject< ROOT::Internal::TTreeView > fTreeView
Thread-local TreeViews.
TTreeProcessorMT(std::string_view filename, std::string_view treename="", UInt_t nThreads=0u)
Constructor based on a file name.
void Process(std::function< void(TTreeReader &)> func)
Process the entries of a TTree in parallel.
static unsigned int GetTasksPerWorkerHint()
Retrieve the current value for the desired number of tasks per worker.
A chain is a collection of files containing TTree objects.
Definition: TChain.h:33
TDirectory::TContext keeps track and restore the current directory.
Definition: TDirectory.h:89
A List of entry numbers in a TTree or TChain.
Definition: TEntryList.h:26
virtual TList * GetLists() const
Definition: TEntryList.h:75
virtual Long64_t GetEntry(Int_t index)
Return the number of the entry #index of this TEntryList in the TTree or TChain See also Next().
Definition: TEntryList.cxx:740
virtual Long64_t GetN() const
Definition: TEntryList.h:77
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
Create / open a file.
Definition: TFile.cxx:4011
Book space in a file, create I/O buffers, to fill them, (un)compress them.
Definition: TKey.h:28
@ kCanDelete
if object in a list can be deleted
Definition: TObject.h:58
@ kMustCleanup
if object destructor must call RecursiveRemove()
Definition: TObject.h:60
A simple, robust and fast interface to read values from ROOT columnar datasets such as TTree,...
Definition: TTreeReader.h:44
A TTree represents a columnar dataset.
Definition: TTree.h:79
virtual Long64_t GetEntries() const
Definition: TTree.h:459
Different standalone functions to work with trees and tuples, not reqiuired to be a member of any cla...
std::vector< std::string > GetTreeFullPaths(const TTree &tree)
Retrieve the full path(s) to a TTree or the trees in a TChain.
RFriendInfo GetFriendInfo(const TTree &tree)
Get and store the names, aliases and file names of the direct friends of the tree.
std::vector< std::string > GetFileNamesFromTree(const TTree &tree)
Get and store the file names associated with the input tree.
basic_string_view< char > string_view
void function(const Char_t *name_, T fun, const Char_t *docstring=0)
Definition: RExports.h:150
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
void EnableThreadSafety()
Enables the global mutex to make ROOT thread safe/aware.
Definition: TROOT.cxx:493
Definition: tree.py:1
Information about friend trees of a certain TTree or TChain object.
std::vector< NameAlias > fFriendNames
Pairs of names and aliases of friend trees/chains.
std::vector< std::vector< std::string > > fFriendFileNames
Names of the files where each friend is stored.
std::vector< std::vector< std::string > > fFriendChainSubNames
Names of the subtrees of a friend TChain.