1// @(#)root/thread:$Id$
2// Authors: Enric Tejedor, Enrico Guiraud CERN 05/06/2018
5 * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
12/** \class ROOT::TTreeProcessorMT
13 \ingroup Parallelism
14 \brief A class to process the entries of a TTree in parallel.
16By means of its Process method, ROOT::TTreeProcessorMT provides a way to process the
17entries of a TTree in parallel. When invoking TTreeProcessor::Process, the user
18passes a function whose only parameter is a TTreeReader. The function iterates
19on a subrange of entries by using that TTreeReader.
21The implementation of ROOT::TTreeProcessorMT parallelizes the processing of the subranges,
22each corresponding to a cluster in the TTree. This is possible thanks to the use
23of a ROOT::TThreadedObject, so that each thread works with its own TFile and TTree
27#include "TROOT.h"
30using namespace ROOT;
32namespace {
34/// A cluster of entries
35struct EntryCluster {
36 Long64_t start;
37 Long64_t end;
40// note that this routine assumes global entry numbers
41static bool ClustersAreSortedAndContiguous(const std::vector<std::vector<EntryCluster>> &cls)
43 Long64_t last_end = 0ll;
44 for (const auto &fcl : cls) {
45 for (const auto &c : fcl) {
46 if (last_end != c.start)
47 return false;
48 last_end = c.end;
49 }
50 }
51 return true;
54/// Take a vector of vectors of EntryClusters (a vector per file), filter the entries according to entryList, and
55/// and return a new vector of vectors of EntryClusters where cluster start/end entry numbers have been converted to
56/// TEntryList-local entry numbers.
58/// This routine assumes that entry numbers in the TEntryList (and, if present, in the sub-entrylists) are in
59/// ascending order, i.e., for n > m:
60/// elist.GetEntry(n) + tree_offset_for_entry_from_elist(n) > elist.GetEntry(m) + tree_offset_for_entry_from_elist(m)
61static std::vector<std::vector<EntryCluster>>
62ConvertToElistClusters(std::vector<std::vector<EntryCluster>> &&clusters, TEntryList &entryList,
63 const std::vector<std::string> &treeNames, const std::vector<std::string> &fileNames,
64 const std::vector<Long64_t> &entriesPerFile)
66 R__ASSERT(entryList.GetN() > 0); // wasteful to call this function if it has nothing to do
67 R__ASSERT(ClustersAreSortedAndContiguous(clusters));
69 const bool listHasGlobalEntryNumbers = entryList.GetLists() == nullptr;
70 const auto nFiles = clusters.size();
72 std::unique_ptr<TChain> chain;
73 using NextFn_t = Long64_t (*)(Long64_t &, TEntryList &, TChain *);
74 // A function that advances TEntryList and returns global entry numbers or -1 if we reached the end
75 // (might or might not need a TChain depending on whether listHasGlobalEntryNumbers)
76 NextFn_t Next;
77 if (listHasGlobalEntryNumbers) {
78 Next = [](Long64_t &elEntry, TEntryList &elist, TChain *) {
79 ++elEntry;
80 return elist.Next();
81 };
82 } else {
83 // we need `chain` to be able to convert local entry numbers to global entry numbers in `Next`
84 chain.reset(new TChain());
85 for (auto i = 0u; i < nFiles; ++i)
86 chain->Add((fileNames[i] + "?query#" + treeNames[i]).c_str(), entriesPerFile[i]);
87 Next = [](Long64_t &elEntry, TEntryList &elist, TChain *ch) {
88 ++elEntry;
89 int treenum = -1;
90 Long64_t localEntry = elist.GetEntryAndTree(elEntry, treenum);
91 if (localEntry == -1ll)
92 return localEntry;
93 return localEntry + ch->GetTreeOffset()[treenum];
94 };
95 }
97 // the call to GetEntry also serves the purpose to reset TEntryList::fLastIndexQueried,
98 // so we can be sure TEntryList::Next will return the correct thing
99 Long64_t elistEntry = 0ll;
100 Long64_t entry = entryList.GetEntry(elistEntry);
102 std::vector<std::vector<EntryCluster>> elistClusters;
104 for (auto fileN = 0u; fileN < nFiles; ++fileN) {
105 std::vector<EntryCluster> elistClustersForFile;
106 for (const auto &c : clusters[fileN]) {
107 if (entry >= c.end || entry == -1ll) // no entrylist entries in this cluster
108 continue;
109 R__ASSERT(entry >= c.start); // current entry should never come before the cluster we are looking at
110 const Long64_t elistRangeStart = elistEntry;
111 // advance entry list until the entrylist entry goes beyond the end of the cluster
112 while (entry < c.end && entry != -1ll)
113 entry = Next(elistEntry, entryList, chain.get());
114 elistClustersForFile.emplace_back(EntryCluster{elistRangeStart, elistEntry});
115 }
116 elistClusters.emplace_back(std::move(elistClustersForFile));
117 }
119 R__ASSERT(elistClusters.size() == clusters.size()); // same number of files
120 R__ASSERT(ClustersAreSortedAndContiguous(elistClusters));
122 entryList.GetEntry(0ll); // reset TEntryList internal state, lest we incur in ROOT-10807
123 return elistClusters;
126// EntryClusters and number of entries per file
127using ClustersAndEntries = std::pair<std::vector<std::vector<EntryCluster>>, std::vector<Long64_t>>;
130/// Return a vector of cluster boundaries for the given tree and files.
131static ClustersAndEntries
132MakeClusters(const std::vector<std::string> &treeNames, const std::vector<std::string> &fileNames, const unsigned int maxTasksPerFile)
134 // Note that as a side-effect of opening all files that are going to be used in the
135 // analysis once, all necessary streamers will be loaded into memory.
137 const auto nFileNames = fileNames.size();
138 std::vector<std::vector<EntryCluster>> clustersPerFile;
139 std::vector<Long64_t> entriesPerFile;
140 entriesPerFile.reserve(nFileNames);
141 Long64_t offset = 0ll;
142 for (auto i = 0u; i < nFileNames; ++i) {
143 const auto &fileName = fileNames[i];
144 const auto &treeName = treeNames[i];
146 std::unique_ptr<TFile> f(TFile::Open(fileName.c_str())); // need TFile::Open to load plugins if need be
147 if (!f || f->IsZombie()) {
148 const auto msg = "TTreeProcessorMT::Process: an error occurred while opening file \"" + fileName + "\"";
149 throw std::runtime_error(msg);
150 }
151 auto *t = f->Get<TTree>(treeName.c_str()); // t will be deleted by f
153 if (!t) {
154 const auto msg = "TTreeProcessorMT::Process: an error occurred while getting tree \"" + treeName +
155 "\" from file \"" + fileName + "\"";
156 throw std::runtime_error(msg);
157 }
159 auto clusterIter = t->GetClusterIterator(0);
160 Long64_t start = 0ll, end = 0ll;
161 const Long64_t entries = t->GetEntries();
162 // Iterate over the clusters in the current file
163 std::vector<EntryCluster> clusters;
164 while ((start = clusterIter()) < entries) {
165 end = clusterIter.GetNextEntry();
166 // Add the current file's offset to start and end to make them (chain) global
167 clusters.emplace_back(EntryCluster{start + offset, end + offset});
168 }
169 offset += entries;
170 clustersPerFile.emplace_back(std::move(clusters));
171 entriesPerFile.emplace_back(entries);
172 }
174 // Here we "fuse" clusters together if the number of clusters is too big with respect to
175 // the number of slots, otherwise we can incur in an overhead which is big enough
176 // to make parallelisation detrimental to performance.
177 // For example, this is the case when, following a merging of many small files, a file
178 // contains a tree with many entries and with clusters of just a few entries each.
179 // Another problematic case is a high number of slots (e.g. 256) coupled with a high number
180 // of files (e.g. 1000 files): the large amount of files might result in a large amount
181 // of tasks, but the elevated concurrency level makes the little synchronization required by
182 // task initialization very expensive. In this case it's better to simply process fewer, larger tasks.
183 // Cluster-merging can help reduce the number of tasks down to a minumum of one task per file.
184 //
185 // The criterion according to which we fuse clusters together is to have around
186 // TTreeProcessorMT::GetTasksPerWorkerHint() clusters per slot.
187 // Concretely, for each file we will cap the number of tasks to ceil(GetTasksPerWorkerHint() * nWorkers / nFiles).
189 std::vector<std::vector<EntryCluster>> eventRangesPerFile(clustersPerFile.size());
190 auto clustersPerFileIt = clustersPerFile.begin();
191 auto eventRangesPerFileIt = eventRangesPerFile.begin();
192 for (; clustersPerFileIt != clustersPerFile.end(); clustersPerFileIt++, eventRangesPerFileIt++) {
193 const auto clustersInThisFileSize = clustersPerFileIt->size();
194 const auto nFolds = clustersInThisFileSize / maxTasksPerFile;
195 // If the number of clusters is less than maxTasksPerFile
196 // we take the clusters as they are
197 if (nFolds == 0) {
198 *eventRangesPerFileIt = std::move(*clustersPerFileIt);
199 continue;
200 }
201 // Otherwise, we have to merge clusters, distributing the reminder evenly
202 // onto the first clusters
203 auto nReminderClusters = clustersInThisFileSize % maxTasksPerFile;
204 const auto &clustersInThisFile = *clustersPerFileIt;
205 for (auto i = 0ULL; i < clustersInThisFileSize; ++i) {
206 const auto start = clustersInThisFile[i].start;
207 // We lump together at least nFolds clusters, therefore
208 // we need to jump ahead of nFolds-1.
209 i += (nFolds - 1);
210 // We now add a cluster if we have some reminder left
211 if (nReminderClusters > 0) {
212 i += 1U;
213 nReminderClusters--;
214 }
215 const auto end = clustersInThisFile[i].end;
216 eventRangesPerFileIt->emplace_back(EntryCluster({start, end}));
217 }
218 }
220 return std::make_pair(std::move(eventRangesPerFile), std::move(entriesPerFile));
224/// Return a vector containing the number of entries of each file of each friend TChain
225static std::vector<std::vector<Long64_t>>
226GetFriendEntries(const std::vector<std::pair<std::string, std::string>> &friendNames,
227 const std::vector<std::vector<std::string>> &friendFileNames)
229 std::vector<std::vector<Long64_t>> friendEntries;
230 const auto nFriends = friendNames.size();
231 for (auto i = 0u; i < nFriends; ++i) {
232 std::vector<Long64_t> nEntries;
233 const auto &thisFriendName = friendNames[i].first;
234 const auto &thisFriendFiles = friendFileNames[i];
235 for (const auto &fname : thisFriendFiles) {
236 std::unique_ptr<TFile> f(TFile::Open(fname.c_str()));
237 TTree *t = nullptr; // owned by TFile
238 f->GetObject(thisFriendName.c_str(), t);
239 nEntries.emplace_back(t->GetEntries());
240 }
241 friendEntries.emplace_back(std::move(nEntries));
242 }
244 return friendEntries;
248/// Return the full path of the TTree or the trees in the TChain
249static std::vector<std::string> GetTreeFullPaths(const TTree &tree)
251 // Case 1: this is a TChain. For each file it contains, GetName returns the name of the tree in that file
252 if (tree.IsA() == TChain::Class()) {
253 auto &chain = static_cast<const TChain &>(tree);
254 auto files = chain.GetListOfFiles();
255 if (!files || files->GetEntries() == 0) {
256 throw std::runtime_error("TTreeProcessorMT: input TChain does not contain any file");
257 }
258 std::vector<std::string> treeNames;
259 for (TObject *f : *files)
260 treeNames.emplace_back(f->GetName());
262 return treeNames;
263 }
265 // Case 2: this is a TTree: we get the full path of it
266 if (auto motherDir = tree.GetDirectory()) {
267 // We have 2 subcases (ROOT-9948):
268 // - 1. motherDir is a TFile
269 // - 2. motherDir is a directory
270 // If 1. we just return the name of the tree, if 2. we reconstruct the path
271 // to the file.
272 if (motherDir->InheritsFrom("TFile")) {
273 return {tree.GetName()};
274 }
275 std::string fullPath = motherDir->GetPath(); // e.g. "file.root:/dir"
276 fullPath = fullPath.substr(fullPath.find(":/") + 1); // e.g. "/dir"
277 fullPath += "/";
278 fullPath += tree.GetName(); // e.g. "/dir/tree"
279 return {fullPath};
280 }
282 // We do our best and return the name of the tree
283 return {tree.GetName()};
286} // anonymous namespace
288namespace ROOT {
294namespace Internal {
297/// Construct fChain, also adding friends if needed and injecting knowledge of offsets if available.
298/// \param[in] treeNames Name of the tree for each file in `fileNames`.
299/// \param[in] fileNames Files to be opened.
300/// \param[in] friendInfo Information about TTree friends, if any.
301/// \param[in] nEntries Number of entries to be processed.
302/// \param[in] friendEntries Number of entries in each friend. Expected to have same ordering as friendInfo.
303void TTreeView::MakeChain(const std::vector<std::string> &treeNames, const std::vector<std::string> &fileNames,
304 const FriendInfo &friendInfo, const std::vector<Long64_t> &nEntries,
305 const std::vector<std::vector<Long64_t>> &friendEntries)
307 const std::vector<NameAlias> &friendNames = friendInfo.fFriendNames;
308 const std::vector<std::vector<std::string>> &friendFileNames = friendInfo.fFriendFileNames;
310 fChain.reset(new TChain());
311 const auto nFiles = fileNames.size();
312 for (auto i = 0u; i < nFiles; ++i) {
313 fChain->Add((fileNames[i] + "?query#" + treeNames[i]).c_str(), nEntries[i]);
314 }
315 fChain->ResetBit(TObject::kMustCleanup);
317 fFriends.clear();
318 const auto nFriends = friendNames.size();
319 for (auto i = 0u; i < nFriends; ++i) {
320 const auto &friendName = friendNames[i];
321 const auto &name = friendName.first;
322 const auto &alias = friendName.second;
324 // Build a friend chain
325 auto frChain = std::make_unique<TChain>(name.c_str());
326 const auto nFileNames = friendFileNames[i].size();
327 for (auto j = 0u; j < nFileNames; ++j)
328 frChain->Add(friendFileNames[i][j].c_str(), friendEntries[i][j]);
330 // Make it friends with the main chain
331 fChain->AddFriend(frChain.get(), alias.c_str());
332 fFriends.emplace_back(std::move(frChain));
333 }
337/// Get a TTreeReader for the current tree of this view.
339TTreeView::GetTreeReader(Long64_t start, Long64_t end, const std::vector<std::string> &treeNames,
340 const std::vector<std::string> &fileNames, const FriendInfo &friendInfo,
341 const TEntryList &entryList, const std::vector<Long64_t> &nEntries,
342 const std::vector<std::vector<Long64_t>> &friendEntries)
344 const bool hasEntryList = entryList.GetN() > 0;
345 const bool usingLocalEntries = friendInfo.fFriendNames.empty() && !hasEntryList;
346 const bool needNewChain =
347 fChain == nullptr || (usingLocalEntries && (fileNames[0] != fChain->GetListOfFiles()->At(0)->GetTitle() ||
348 treeNames[0] != fChain->GetListOfFiles()->At(0)->GetName()));
349 if (needNewChain) {
350 MakeChain(treeNames, fileNames, friendInfo, nEntries, friendEntries);
351 if (hasEntryList) {
352 fEntryList.reset(new TEntryList(entryList));
353 if (fEntryList->GetLists() != nullptr) {
354 // need to associate the TEntryList to the TChain for the latter to set entry the fTreeNumbers of the
355 // sub-lists of the former...
356 fChain->SetEntryList(fEntryList.get());
357 fEntryList->ResetBit(TObject::kCanDelete); // ...but we want to retain ownership
358 }
359 }
360 }
361 auto reader = std::make_unique<TTreeReader>(fChain.get(), fEntryList.get());
362 reader->SetEntriesRange(start, end);
363 return reader;
366} // namespace Internal
367} // namespace ROOT
370/// Get and store the names, aliases and file names of the friends of the tree.
371/// \param[in] tree The main tree whose friends to
373/// Note that "friends of friends" and circular references in the lists of friends are not supported.
376 std::vector<Internal::NameAlias> friendNames;
377 std::vector<std::vector<std::string>> friendFileNames;
379 // Typically, the correct way to call GetListOfFriends would be `tree.GetTree()->GetListOfFriends()`
380 // (see e.g. the discussion at https://github.com/root-project/root/issues/6741).
381 // However, in this case, in case we are dealing with a TChain we really only care about the TChain's
382 // list of friends (which will need to be rebuilt in each processing task) while friends of the TChain's
383 // internal TTree, if any, will be automatically loaded in each task just like they would be automatically
384 // loaded here if we used tree.GetTree()->GetListOfFriends().
385 const auto friends = tree.GetListOfFriends();
386 if (!friends)
387 return Internal::FriendInfo();
389 for (auto fr : *friends) {
390 const auto frTree = static_cast<TFriendElement *>(fr)->GetTree();
391 const bool isChain = frTree->IsA() == TChain::Class();
393 friendFileNames.emplace_back();
394 auto &fileNames = friendFileNames.back();
396 // Check if friend tree/chain has an alias
397 const auto alias_c = tree.GetFriendAlias(frTree);
398 const std::string alias = alias_c != nullptr ? alias_c : "";
400 if (isChain) {
401 // Note that each TChainElement returned by chain.GetListOfFiles has a name
402 // equal to the tree name of this TChain and a title equal to the filename.
403 // Accessing the information like this ensures that we get the correct
404 // filenames and treenames if the treename is given as part of the filename
405 // via chain.AddFile(file.root/myTree) and as well if the tree name is given
406 // in the constructor via TChain(myTree) and a file is added later by chain.AddFile(file.root).
408 // Get name of the trees building the chain
409 const auto chainFiles = static_cast<TChain*>(frTree)->GetListOfFiles();
410 const auto realName = chainFiles->First()->GetName();
411 friendNames.emplace_back(std::make_pair(realName, alias));
412 // Get filenames stored in the title member
413 for (auto f : *chainFiles) {
414 fileNames.emplace_back(f->GetTitle());
415 }
416 } else {
417 // Get name of the tree
418 const auto realName = GetTreeFullPaths(*frTree)[0];
419 friendNames.emplace_back(std::make_pair(realName, alias));
421 // Get filename
422 const auto f = frTree->GetCurrentFile();
423 if (!f)
424 throw std::runtime_error("Friend trees with no associated file are not supported.");
425 fileNames.emplace_back(f->GetName());
426 }
427 }
429 return Internal::FriendInfo{std::move(friendNames), std::move(friendFileNames)};
433/// Retrieve the names of the TTrees in each of the input files, throw if a TTree cannot be found.
434std::vector<std::string> TTreeProcessorMT::FindTreeNames()
436 std::vector<std::string> treeNames;
438 if (fFileNames.empty()) // This can never happen
439 throw std::runtime_error("Empty list of files and no tree name provided");
442 for (const auto &fname : fFileNames) {
443 std::string treeName;
444 std::unique_ptr<TFile> f(TFile::Open(fname.c_str()));
445 TIter next(f->GetListOfKeys());
446 while (auto *key = static_cast<TKey *>(next())) {
447 const char *className = key->GetClassName();
448 if (strcmp(className, "TTree") == 0) {
449 treeName = key->GetName();
450 break;
451 }
452 }
453 if (treeName.empty())
454 throw std::runtime_error("Cannot find any tree in file " + fname);
455 treeNames.emplace_back(std::move(treeName));
456 }
458 return treeNames;
462/// Constructor based on a file name.
463/// \param[in] filename Name of the file containing the tree to process.
464/// \param[in] treename Name of the tree to process. If not provided, the implementation will search
465/// for a TTree key in the file and will use the first one it finds.
466/// \param[in] nThreads Number of threads to create in the underlying thread-pool. The semantics of this argument are
467/// the same as for TThreadExecutor.
468TTreeProcessorMT::TTreeProcessorMT(std::string_view filename, std::string_view treename, UInt_t nThreads)
469 : fFileNames({std::string(filename)}),
470 fTreeNames(treename.empty() ? FindTreeNames() : std::vector<std::string>{std::string(treename)}), fFriendInfo(),
471 fPool(nThreads)
476std::vector<std::string> CheckAndConvert(const std::vector<std::string_view> &views)
478 if (views.empty())
479 throw std::runtime_error("The provided list of file names is empty");
481 std::vector<std::string> strings;
482 strings.reserve(views.size());
483 for (const auto &v : views)
484 strings.emplace_back(v);
485 return strings;
489/// Constructor based on a collection of file names.
490/// \param[in] filenames Collection of the names of the files containing the tree to process.
491/// \param[in] treename Name of the tree to process. If not provided, the implementation will
492/// search filenames for a TTree key and will use the first one it finds in each file.
493/// \param[in] nThreads Number of threads to create in the underlying thread-pool. The semantics of this argument are
494/// the same as for TThreadExecutor.
496/// If different files contain TTrees with different names and automatic TTree name detection is not an option
497/// (for example, because some of the files contain multiple TTrees) please manually create a TChain and pass
498/// it to the appropriate TTreeProcessorMT constructor.
499TTreeProcessorMT::TTreeProcessorMT(const std::vector<std::string_view> &filenames, std::string_view treename,
500 UInt_t nThreads)
501 : fFileNames(CheckAndConvert(filenames)),
502 fTreeNames(treename.empty() ? FindTreeNames()
503 : std::vector<std::string>(fFileNames.size(), std::string(treename))),
504 fFriendInfo(), fPool(nThreads)
509std::vector<std::string> GetFilesFromTree(TTree &tree)
511 std::vector<std::string> filenames;
513 const bool isChain = tree.IsA() == TChain::Class();
514 if (isChain) {
515 TObjArray *filelist = static_cast<TChain &>(tree).GetListOfFiles();
516 const auto nFiles = filelist->GetEntries();
517 if (nFiles == 0)
518 throw std::runtime_error("The provided chain of files is empty");
519 filenames.reserve(nFiles);
520 for (auto f : *filelist)
521 filenames.emplace_back(f->GetTitle());
522 } else {
523 TFile *f = tree.GetCurrentFile();
524 if (!f) {
525 const auto msg = "The specified TTree is not linked to any file, in-memory-only trees are not supported.";
526 throw std::runtime_error(msg);
527 }
529 filenames.emplace_back(f->GetName());
530 }
532 return filenames;
536/// Constructor based on a TTree and a TEntryList.
537/// \param[in] tree Tree or chain of files containing the tree to process.
538/// \param[in] entries List of entry numbers to process.
539/// \param[in] nThreads Number of threads to create in the underlying thread-pool. The semantics of this argument are
540/// the same as for TThreadExecutor.
542 : fFileNames(GetFilesFromTree(tree)), fTreeNames(GetTreeFullPaths(tree)), fEntryList(entries),
543 fFriendInfo(GetFriendInfo(tree)), fPool(nThreads)
549/// Constructor based on a TTree.
550/// \param[in] tree Tree or chain of files containing the tree to process.
551/// \param[in] nThreads Number of threads to create in the underlying thread-pool. The semantics of this argument are
552/// the same as for TThreadExecutor.
558/// Process the entries of a TTree in parallel. The user-provided function
559/// receives a TTreeReader which can be used to iterate on a subrange of
560/// entries
561/// ~~~{.cpp}
562/// TTreeProcessorMT::Process([](TTreeReader& readerSubRange) {
563/// // Select branches to read
564/// while (readerSubRange.Next()) {
565/// // Use content of current entry
566/// }
567/// });
568/// ~~~
569/// The user needs to be aware that each of the subranges can potentially
570/// be processed in parallel. This means that the code of the user function
571/// should be thread safe.
573/// \param[in] func User-defined function that processes a subrange of entries
574void TTreeProcessorMT::Process(std::function<void(TTreeReader &)> func)
576 const std::vector<Internal::NameAlias> &friendNames = fFriendInfo.fFriendNames;
577 const std::vector<std::vector<std::string>> &friendFileNames = fFriendInfo.fFriendFileNames;
579 // compute number of tasks per file
580 const unsigned int maxTasksPerFile =
581 std::ceil(float(GetTasksPerWorkerHint() * fPool.GetPoolSize()) / float(fFileNames.size()));
583 // If an entry list or friend trees are present, we need to generate clusters with global entry numbers,
584 // so we do it here for all files.
585 // Otherwise we can do it later, concurrently for each file, and clusters will contain local entry numbers.
586 // TODO: in practice we could also find clusters per-file in the case of no friends and a TEntryList with
587 // sub-entrylists.
588 const bool hasFriends = !friendNames.empty();
589 const bool hasEntryList = fEntryList.GetN() > 0;
590 const bool shouldRetrieveAllClusters = hasFriends || hasEntryList;
591 ClustersAndEntries clusterAndEntries{};
592 if (shouldRetrieveAllClusters) {
593 clusterAndEntries = MakeClusters(fTreeNames, fFileNames, maxTasksPerFile);
594 if (hasEntryList)
595 clusterAndEntries.first = ConvertToElistClusters(std::move(clusterAndEntries.first), fEntryList, fTreeNames,
596 fFileNames, clusterAndEntries.second);
597 }
599 const auto &clusters = clusterAndEntries.first;
600 const auto &entries = clusterAndEntries.second;
602 // Retrieve number of entries for each file for each friend tree
603 const auto friendEntries =
604 hasFriends ? GetFriendEntries(friendNames, friendFileNames) : std::vector<std::vector<Long64_t>>{};
606 // Parent task, spawns tasks that process each of the entry clusters for each input file
607 // TODO: for readability we should have two versions of this lambda, for shouldRetrieveAllClusters == true/false
608 auto processFile = [&](std::size_t fileIdx) {
609 // theseFiles contains either all files or just the single file to process
610 const auto &theseFiles = shouldRetrieveAllClusters ? fFileNames : std::vector<std::string>({fFileNames[fileIdx]});
611 // either all tree names or just the single tree to process
612 const auto &theseTrees = shouldRetrieveAllClusters ? fTreeNames : std::vector<std::string>({fTreeNames[fileIdx]});
613 // Evaluate clusters (with local entry numbers) and number of entries for this file, if needed
614 const auto theseClustersAndEntries =
615 shouldRetrieveAllClusters ? ClustersAndEntries{} : MakeClusters(theseTrees, theseFiles, maxTasksPerFile);
617 // All clusters for the file to process, either with global or local entry numbers
618 const auto &thisFileClusters = shouldRetrieveAllClusters ? clusters[fileIdx] : theseClustersAndEntries.first[0];
620 // Either all number of entries or just the ones for this file
621 const auto &theseEntries =
622 shouldRetrieveAllClusters ? entries : std::vector<Long64_t>({theseClustersAndEntries.second[0]});
624 auto processCluster = [&](const EntryCluster &c) {
625 auto r = fTreeView->GetTreeReader(c.start, c.end, theseTrees, theseFiles, fFriendInfo, fEntryList,
626 theseEntries, friendEntries);
627 func(*r);
628 };
630 fPool.Foreach(processCluster, thisFileClusters);
631 };
633 std::vector<std::size_t> fileIdxs(fFileNames.size());
634 std::iota(fileIdxs.begin(), fileIdxs.end(), 0u);
636 fPool.Foreach(processFile, fileIdxs);
640/// \brief This function is deprecated in favor of GetTasksPerWorkerHint().
647/// \brief Retrieve the current value for the desired number of tasks per worker.
648/// \return The desired number of tasks to be created per worker. TTreeProcessorMT uses this value as an hint.
655/// \brief This function is deprecated in favor of SetTasksPerWorkerHint().
656void TTreeProcessorMT::SetMaxTasksPerFilePerWorker(unsigned int maxTasksPerFile)
658 fgMaxTasksPerFilePerWorker = maxTasksPerFile;
662/// \brief Set the hint for the desired number of tasks created per worker.
663/// \param[in] tasksPerWorkerHint Desired number of tasks per worker.
665/// This allows to create a reasonable number of tasks even if any of the
666/// processed files features a bad clustering, for example with a lot of
667/// entries and just a few entries per cluster, or to limit the number of
668/// tasks spawned when a very large number of files and workers is used.
669void TTreeProcessorMT::SetTasksPerWorkerHint(unsigned int tasksPerWorkerHint)
671 fgTasksPerWorkerHint = tasksPerWorkerHint;
