Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
TTreeProcessorMT.cxx
Go to the documentation of this file.
1// @(#)root/thread:$Id$
2// Authors: Enric Tejedor, Enrico Guiraud CERN 05/06/2018
3
4/*************************************************************************
5 * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12/** \class ROOT::TTreeProcessorMT
13 \ingroup Parallelism
14 \brief A class to process the entries of a TTree in parallel.
15
16By means of its Process method, ROOT::TTreeProcessorMT provides a way to process the
17entries of a TTree in parallel. When invoking TTreeProcessor::Process, the user
18passes a function whose only parameter is a TTreeReader. The function iterates
19on a subrange of entries by using that TTreeReader.
20
21The implementation of ROOT::TTreeProcessorMT parallelizes the processing of the subranges,
22each corresponding to a cluster in the TTree. This is possible thanks to the use
23of a ROOT::TThreadedObject, so that each thread works with its own TFile and TTree
24objects.
25*/
26
27#include "TROOT.h"
29
30using namespace ROOT;
31
32namespace {
33
34/// A cluster of entries
35struct EntryCluster {
36 Long64_t start;
37 Long64_t end;
38};
39
40// note that this routine assumes global entry numbers
41static bool ClustersAreSortedAndContiguous(const std::vector<std::vector<EntryCluster>> &cls)
42{
43 Long64_t last_end = 0ll;
44 for (const auto &fcl : cls) {
45 for (const auto &c : fcl) {
46 if (last_end != c.start)
47 return false;
48 last_end = c.end;
49 }
50 }
51 return true;
52}
53
54/// Take a vector of vectors of EntryClusters (a vector per file), filter the entries according to entryList, and
55/// and return a new vector of vectors of EntryClusters where cluster start/end entry numbers have been converted to
56/// TEntryList-local entry numbers.
57///
58/// This routine assumes that entry numbers in the TEntryList (and, if present, in the sub-entrylists) are in
59/// ascending order, i.e., for n > m:
60/// elist.GetEntry(n) + tree_offset_for_entry_from_elist(n) > elist.GetEntry(m) + tree_offset_for_entry_from_elist(m)
61static std::vector<std::vector<EntryCluster>>
62ConvertToElistClusters(std::vector<std::vector<EntryCluster>> &&clusters, TEntryList &entryList,
63 const std::vector<std::string> &treeNames, const std::vector<std::string> &fileNames,
64 const std::vector<Long64_t> &entriesPerFile)
65{
66 R__ASSERT(entryList.GetN() > 0); // wasteful to call this function if it has nothing to do
67 R__ASSERT(ClustersAreSortedAndContiguous(clusters));
68
69 const bool listHasGlobalEntryNumbers = entryList.GetLists() == nullptr;
70 const auto nFiles = clusters.size();
71
72 std::unique_ptr<TChain> chain;
73 using NextFn_t = Long64_t (*)(Long64_t &, TEntryList &, TChain *);
74 // A function that advances TEntryList and returns global entry numbers or -1 if we reached the end
75 // (might or might not need a TChain depending on whether listHasGlobalEntryNumbers)
76 NextFn_t Next;
77 if (listHasGlobalEntryNumbers) {
78 Next = [](Long64_t &elEntry, TEntryList &elist, TChain *) {
79 ++elEntry;
80 return elist.Next();
81 };
82 } else {
83 // we need `chain` to be able to convert local entry numbers to global entry numbers in `Next`
84 chain.reset(new TChain());
85 for (auto i = 0u; i < nFiles; ++i)
86 chain->Add((fileNames[i] + "?query#" + treeNames[i]).c_str(), entriesPerFile[i]);
87 Next = [](Long64_t &elEntry, TEntryList &elist, TChain *ch) {
88 ++elEntry;
89 int treenum = -1;
90 Long64_t localEntry = elist.GetEntryAndTree(elEntry, treenum);
91 if (localEntry == -1ll)
92 return localEntry;
93 return localEntry + ch->GetTreeOffset()[treenum];
94 };
95 }
96
97 // the call to GetEntry also serves the purpose to reset TEntryList::fLastIndexQueried,
98 // so we can be sure TEntryList::Next will return the correct thing
99 Long64_t elistEntry = 0ll;
100 Long64_t entry = entryList.GetEntry(elistEntry);
101
102 std::vector<std::vector<EntryCluster>> elistClusters;
103
104 for (auto fileN = 0u; fileN < nFiles; ++fileN) {
105 std::vector<EntryCluster> elistClustersForFile;
106 for (const auto &c : clusters[fileN]) {
107 if (entry >= c.end || entry == -1ll) // no entrylist entries in this cluster
108 continue;
109 R__ASSERT(entry >= c.start); // current entry should never come before the cluster we are looking at
110 const Long64_t elistRangeStart = elistEntry;
111 // advance entry list until the entrylist entry goes beyond the end of the cluster
112 while (entry < c.end && entry != -1ll)
113 entry = Next(elistEntry, entryList, chain.get());
114 elistClustersForFile.emplace_back(EntryCluster{elistRangeStart, elistEntry});
115 }
116 elistClusters.emplace_back(std::move(elistClustersForFile));
117 }
118
119 R__ASSERT(elistClusters.size() == clusters.size()); // same number of files
120 R__ASSERT(ClustersAreSortedAndContiguous(elistClusters));
121
122 entryList.GetEntry(0ll); // reset TEntryList internal state, lest we incur in ROOT-10807
123 return elistClusters;
124}
125
126// EntryClusters and number of entries per file
127using ClustersAndEntries = std::pair<std::vector<std::vector<EntryCluster>>, std::vector<Long64_t>>;
128
129////////////////////////////////////////////////////////////////////////
130/// Return a vector of cluster boundaries for the given tree and files.
131static ClustersAndEntries
132MakeClusters(const std::vector<std::string> &treeNames, const std::vector<std::string> &fileNames, const unsigned int maxTasksPerFile)
133{
134 // Note that as a side-effect of opening all files that are going to be used in the
135 // analysis once, all necessary streamers will be loaded into memory.
137 const auto nFileNames = fileNames.size();
138 std::vector<std::vector<EntryCluster>> clustersPerFile;
139 std::vector<Long64_t> entriesPerFile;
140 entriesPerFile.reserve(nFileNames);
141 Long64_t offset = 0ll;
142 for (auto i = 0u; i < nFileNames; ++i) {
143 const auto &fileName = fileNames[i];
144 const auto &treeName = treeNames[i];
145
146 std::unique_ptr<TFile> f(TFile::Open(fileName.c_str())); // need TFile::Open to load plugins if need be
147 if (!f || f->IsZombie()) {
148 const auto msg = "TTreeProcessorMT::Process: an error occurred while opening file \"" + fileName + "\"";
149 throw std::runtime_error(msg);
150 }
151 auto *t = f->Get<TTree>(treeName.c_str()); // t will be deleted by f
152
153 if (!t) {
154 const auto msg = "TTreeProcessorMT::Process: an error occurred while getting tree \"" + treeName +
155 "\" from file \"" + fileName + "\"";
156 throw std::runtime_error(msg);
157 }
158
159 auto clusterIter = t->GetClusterIterator(0);
160 Long64_t start = 0ll, end = 0ll;
161 const Long64_t entries = t->GetEntries();
162 // Iterate over the clusters in the current file
163 std::vector<EntryCluster> clusters;
164 while ((start = clusterIter()) < entries) {
165 end = clusterIter.GetNextEntry();
166 // Add the current file's offset to start and end to make them (chain) global
167 clusters.emplace_back(EntryCluster{start + offset, end + offset});
168 }
169 offset += entries;
170 clustersPerFile.emplace_back(std::move(clusters));
171 entriesPerFile.emplace_back(entries);
172 }
173
174 // Here we "fuse" clusters together if the number of clusters is too big with respect to
175 // the number of slots, otherwise we can incur in an overhead which is big enough
176 // to make parallelisation detrimental to performance.
177 // For example, this is the case when, following a merging of many small files, a file
178 // contains a tree with many entries and with clusters of just a few entries each.
179 // Another problematic case is a high number of slots (e.g. 256) coupled with a high number
180 // of files (e.g. 1000 files): the large amount of files might result in a large amount
181 // of tasks, but the elevated concurrency level makes the little synchronization required by
182 // task initialization very expensive. In this case it's better to simply process fewer, larger tasks.
183 // Cluster-merging can help reduce the number of tasks down to a minumum of one task per file.
184 //
185 // The criterion according to which we fuse clusters together is to have around
186 // TTreeProcessorMT::GetTasksPerWorkerHint() clusters per slot.
187 // Concretely, for each file we will cap the number of tasks to ceil(GetTasksPerWorkerHint() * nWorkers / nFiles).
188
189 std::vector<std::vector<EntryCluster>> eventRangesPerFile(clustersPerFile.size());
190 auto clustersPerFileIt = clustersPerFile.begin();
191 auto eventRangesPerFileIt = eventRangesPerFile.begin();
192 for (; clustersPerFileIt != clustersPerFile.end(); clustersPerFileIt++, eventRangesPerFileIt++) {
193 const auto clustersInThisFileSize = clustersPerFileIt->size();
194 const auto nFolds = clustersInThisFileSize / maxTasksPerFile;
195 // If the number of clusters is less than maxTasksPerFile
196 // we take the clusters as they are
197 if (nFolds == 0) {
198 *eventRangesPerFileIt = std::move(*clustersPerFileIt);
199 continue;
200 }
201 // Otherwise, we have to merge clusters, distributing the reminder evenly
202 // onto the first clusters
203 auto nReminderClusters = clustersInThisFileSize % maxTasksPerFile;
204 const auto &clustersInThisFile = *clustersPerFileIt;
205 for (auto i = 0ULL; i < clustersInThisFileSize; ++i) {
206 const auto start = clustersInThisFile[i].start;
207 // We lump together at least nFolds clusters, therefore
208 // we need to jump ahead of nFolds-1.
209 i += (nFolds - 1);
210 // We now add a cluster if we have some reminder left
211 if (nReminderClusters > 0) {
212 i += 1U;
213 nReminderClusters--;
214 }
215 const auto end = clustersInThisFile[i].end;
216 eventRangesPerFileIt->emplace_back(EntryCluster({start, end}));
217 }
218 }
219
220 return std::make_pair(std::move(eventRangesPerFile), std::move(entriesPerFile));
221}
222
223////////////////////////////////////////////////////////////////////////
224/// Return a vector containing the number of entries of each file of each friend TChain
225static std::vector<std::vector<Long64_t>>
226GetFriendEntries(const std::vector<std::pair<std::string, std::string>> &friendNames,
227 const std::vector<std::vector<std::string>> &friendFileNames)
228{
229 std::vector<std::vector<Long64_t>> friendEntries;
230 const auto nFriends = friendNames.size();
231 for (auto i = 0u; i < nFriends; ++i) {
232 std::vector<Long64_t> nEntries;
233 const auto &thisFriendName = friendNames[i].first;
234 const auto &thisFriendFiles = friendFileNames[i];
235 for (const auto &fname : thisFriendFiles) {
236 std::unique_ptr<TFile> f(TFile::Open(fname.c_str()));
237 TTree *t = nullptr; // owned by TFile
238 f->GetObject(thisFriendName.c_str(), t);
239 nEntries.emplace_back(t->GetEntries());
240 }
241 friendEntries.emplace_back(std::move(nEntries));
242 }
243
244 return friendEntries;
245}
246
247////////////////////////////////////////////////////////////////////////
248/// Return the full path of the TTree or the trees in the TChain
249static std::vector<std::string> GetTreeFullPaths(const TTree &tree)
250{
251 // Case 1: this is a TChain. For each file it contains, GetName returns the name of the tree in that file
252 if (tree.IsA() == TChain::Class()) {
253 auto &chain = static_cast<const TChain &>(tree);
254 auto files = chain.GetListOfFiles();
255 if (!files || files->GetEntries() == 0) {
256 throw std::runtime_error("TTreeProcessorMT: input TChain does not contain any file");
257 }
258 std::vector<std::string> treeNames;
259 for (TObject *f : *files)
260 treeNames.emplace_back(f->GetName());
261
262 return treeNames;
263 }
264
265 // Case 2: this is a TTree: we get the full path of it
266 if (auto motherDir = tree.GetDirectory()) {
267 // We have 2 subcases (ROOT-9948):
268 // - 1. motherDir is a TFile
269 // - 2. motherDir is a directory
270 // If 1. we just return the name of the tree, if 2. we reconstruct the path
271 // to the file.
272 if (motherDir->InheritsFrom("TFile")) {
273 return {tree.GetName()};
274 }
275 std::string fullPath = motherDir->GetPath(); // e.g. "file.root:/dir"
276 fullPath = fullPath.substr(fullPath.find(":/") + 1); // e.g. "/dir"
277 fullPath += "/";
278 fullPath += tree.GetName(); // e.g. "/dir/tree"
279 return {fullPath};
280 }
281
282 // We do our best and return the name of the tree
283 return {tree.GetName()};
284}
285
286} // anonymous namespace
287
288namespace ROOT {
289
291
293
294namespace Internal {
295
296////////////////////////////////////////////////////////////////////////////////
297/// Construct fChain, also adding friends if needed and injecting knowledge of offsets if available.
298/// \param[in] treeNames Name of the tree for each file in `fileNames`.
299/// \param[in] fileNames Files to be opened.
300/// \param[in] friendInfo Information about TTree friends, if any.
301/// \param[in] nEntries Number of entries to be processed.
302/// \param[in] friendEntries Number of entries in each friend. Expected to have same ordering as friendInfo.
303void TTreeView::MakeChain(const std::vector<std::string> &treeNames, const std::vector<std::string> &fileNames,
304 const FriendInfo &friendInfo, const std::vector<Long64_t> &nEntries,
305 const std::vector<std::vector<Long64_t>> &friendEntries)
306{
307 const std::vector<NameAlias> &friendNames = friendInfo.fFriendNames;
308 const std::vector<std::vector<std::string>> &friendFileNames = friendInfo.fFriendFileNames;
309
310 fChain.reset(new TChain());
311 const auto nFiles = fileNames.size();
312 for (auto i = 0u; i < nFiles; ++i) {
313 fChain->Add((fileNames[i] + "?query#" + treeNames[i]).c_str(), nEntries[i]);
314 }
315 fChain->ResetBit(TObject::kMustCleanup);
316
317 fFriends.clear();
318 const auto nFriends = friendNames.size();
319 for (auto i = 0u; i < nFriends; ++i) {
320 const auto &friendName = friendNames[i];
321 const auto &name = friendName.first;
322 const auto &alias = friendName.second;
323
324 // Build a friend chain
325 auto frChain = std::make_unique<TChain>(name.c_str());
326 const auto nFileNames = friendFileNames[i].size();
327 for (auto j = 0u; j < nFileNames; ++j)
328 frChain->Add(friendFileNames[i][j].c_str(), friendEntries[i][j]);
329
330 // Make it friends with the main chain
331 fChain->AddFriend(frChain.get(), alias.c_str());
332 fFriends.emplace_back(std::move(frChain));
333 }
334}
335
336//////////////////////////////////////////////////////////////////////////
337/// Get a TTreeReader for the current tree of this view.
338std::unique_ptr<TTreeReader>
339TTreeView::GetTreeReader(Long64_t start, Long64_t end, const std::vector<std::string> &treeNames,
340 const std::vector<std::string> &fileNames, const FriendInfo &friendInfo,
341 const TEntryList &entryList, const std::vector<Long64_t> &nEntries,
342 const std::vector<std::vector<Long64_t>> &friendEntries)
343{
344 const bool hasEntryList = entryList.GetN() > 0;
345 const bool usingLocalEntries = friendInfo.fFriendNames.empty() && !hasEntryList;
346 const bool needNewChain =
347 fChain == nullptr || (usingLocalEntries && (fileNames[0] != fChain->GetListOfFiles()->At(0)->GetTitle() ||
348 treeNames[0] != fChain->GetListOfFiles()->At(0)->GetName()));
349 if (needNewChain) {
350 MakeChain(treeNames, fileNames, friendInfo, nEntries, friendEntries);
351 if (hasEntryList) {
352 fEntryList.reset(new TEntryList(entryList));
353 if (fEntryList->GetLists() != nullptr) {
354 // need to associate the TEntryList to the TChain for the latter to set entry the fTreeNumbers of the
355 // sub-lists of the former...
356 fChain->SetEntryList(fEntryList.get());
357 fEntryList->ResetBit(TObject::kCanDelete); // ...but we want to retain ownership
358 }
359 }
360 }
361 auto reader = std::make_unique<TTreeReader>(fChain.get(), fEntryList.get());
362 reader->SetEntriesRange(start, end);
363 return reader;
364}
365
366} // namespace Internal
367} // namespace ROOT
368
369////////////////////////////////////////////////////////////////////////////////
370/// Get and store the names, aliases and file names of the friends of the tree.
371/// \param[in] tree The main tree whose friends to
372///
373/// Note that "friends of friends" and circular references in the lists of friends are not supported.
375{
376 std::vector<Internal::NameAlias> friendNames;
377 std::vector<std::vector<std::string>> friendFileNames;
378
379 // Typically, the correct way to call GetListOfFriends would be `tree.GetTree()->GetListOfFriends()`
380 // (see e.g. the discussion at https://github.com/root-project/root/issues/6741).
381 // However, in this case, in case we are dealing with a TChain we really only care about the TChain's
382 // list of friends (which will need to be rebuilt in each processing task) while friends of the TChain's
383 // internal TTree, if any, will be automatically loaded in each task just like they would be automatically
384 // loaded here if we used tree.GetTree()->GetListOfFriends().
385 const auto friends = tree.GetListOfFriends();
386 if (!friends)
387 return Internal::FriendInfo();
388
389 for (auto fr : *friends) {
390 const auto frTree = static_cast<TFriendElement *>(fr)->GetTree();
391 const bool isChain = frTree->IsA() == TChain::Class();
392
393 friendFileNames.emplace_back();
394 auto &fileNames = friendFileNames.back();
395
396 // Check if friend tree/chain has an alias
397 const auto alias_c = tree.GetFriendAlias(frTree);
398 const std::string alias = alias_c != nullptr ? alias_c : "";
399
400 if (isChain) {
401 // Note that each TChainElement returned by chain.GetListOfFiles has a name
402 // equal to the tree name of this TChain and a title equal to the filename.
403 // Accessing the information like this ensures that we get the correct
404 // filenames and treenames if the treename is given as part of the filename
405 // via chain.AddFile(file.root/myTree) and as well if the tree name is given
406 // in the constructor via TChain(myTree) and a file is added later by chain.AddFile(file.root).
407
408 // Get name of the trees building the chain
409 const auto chainFiles = static_cast<TChain*>(frTree)->GetListOfFiles();
410 const auto realName = chainFiles->First()->GetName();
411 friendNames.emplace_back(std::make_pair(realName, alias));
412 // Get filenames stored in the title member
413 for (auto f : *chainFiles) {
414 fileNames.emplace_back(f->GetTitle());
415 }
416 } else {
417 // Get name of the tree
418 const auto realName = GetTreeFullPaths(*frTree)[0];
419 friendNames.emplace_back(std::make_pair(realName, alias));
420
421 // Get filename
422 const auto f = frTree->GetCurrentFile();
423 if (!f)
424 throw std::runtime_error("Friend trees with no associated file are not supported.");
425 fileNames.emplace_back(f->GetName());
426 }
427 }
428
429 return Internal::FriendInfo{std::move(friendNames), std::move(friendFileNames)};
430}
431
432/////////////////////////////////////////////////////////////////////////////////////////////////
433/// Retrieve the names of the TTrees in each of the input files, throw if a TTree cannot be found.
434std::vector<std::string> TTreeProcessorMT::FindTreeNames()
435{
436 std::vector<std::string> treeNames;
437
438 if (fFileNames.empty()) // This can never happen
439 throw std::runtime_error("Empty list of files and no tree name provided");
440
442 for (const auto &fname : fFileNames) {
443 std::string treeName;
444 std::unique_ptr<TFile> f(TFile::Open(fname.c_str()));
445 TIter next(f->GetListOfKeys());
446 while (auto *key = static_cast<TKey *>(next())) {
447 const char *className = key->GetClassName();
448 if (strcmp(className, "TTree") == 0) {
449 treeName = key->GetName();
450 break;
451 }
452 }
453 if (treeName.empty())
454 throw std::runtime_error("Cannot find any tree in file " + fname);
455 treeNames.emplace_back(std::move(treeName));
456 }
457
458 return treeNames;
459}
460
461////////////////////////////////////////////////////////////////////////
462/// Constructor based on a file name.
463/// \param[in] filename Name of the file containing the tree to process.
464/// \param[in] treename Name of the tree to process. If not provided, the implementation will search
465/// for a TTree key in the file and will use the first one it finds.
466/// \param[in] nThreads Number of threads to create in the underlying thread-pool. The semantics of this argument are
467/// the same as for TThreadExecutor.
468TTreeProcessorMT::TTreeProcessorMT(std::string_view filename, std::string_view treename, UInt_t nThreads)
469 : fFileNames({std::string(filename)}),
470 fTreeNames(treename.empty() ? FindTreeNames() : std::vector<std::string>{std::string(treename)}), fFriendInfo(),
471 fPool(nThreads)
472{
474}
475
476std::vector<std::string> CheckAndConvert(const std::vector<std::string_view> &views)
477{
478 if (views.empty())
479 throw std::runtime_error("The provided list of file names is empty");
480
481 std::vector<std::string> strings;
482 strings.reserve(views.size());
483 for (const auto &v : views)
484 strings.emplace_back(v);
485 return strings;
486}
487
488////////////////////////////////////////////////////////////////////////
489/// Constructor based on a collection of file names.
490/// \param[in] filenames Collection of the names of the files containing the tree to process.
491/// \param[in] treename Name of the tree to process. If not provided, the implementation will
492/// search filenames for a TTree key and will use the first one it finds in each file.
493/// \param[in] nThreads Number of threads to create in the underlying thread-pool. The semantics of this argument are
494/// the same as for TThreadExecutor.
495///
496/// If different files contain TTrees with different names and automatic TTree name detection is not an option
497/// (for example, because some of the files contain multiple TTrees) please manually create a TChain and pass
498/// it to the appropriate TTreeProcessorMT constructor.
499TTreeProcessorMT::TTreeProcessorMT(const std::vector<std::string_view> &filenames, std::string_view treename,
500 UInt_t nThreads)
501 : fFileNames(CheckAndConvert(filenames)),
502 fTreeNames(treename.empty() ? FindTreeNames()
503 : std::vector<std::string>(fFileNames.size(), std::string(treename))),
504 fFriendInfo(), fPool(nThreads)
505{
507}
508
509std::vector<std::string> GetFilesFromTree(TTree &tree)
510{
511 std::vector<std::string> filenames;
512
513 const bool isChain = tree.IsA() == TChain::Class();
514 if (isChain) {
515 TObjArray *filelist = static_cast<TChain &>(tree).GetListOfFiles();
516 const auto nFiles = filelist->GetEntries();
517 if (nFiles == 0)
518 throw std::runtime_error("The provided chain of files is empty");
519 filenames.reserve(nFiles);
520 for (auto f : *filelist)
521 filenames.emplace_back(f->GetTitle());
522 } else {
523 TFile *f = tree.GetCurrentFile();
524 if (!f) {
525 const auto msg = "The specified TTree is not linked to any file, in-memory-only trees are not supported.";
526 throw std::runtime_error(msg);
527 }
528
529 filenames.emplace_back(f->GetName());
530 }
531
532 return filenames;
533}
534
535////////////////////////////////////////////////////////////////////////
536/// Constructor based on a TTree and a TEntryList.
537/// \param[in] tree Tree or chain of files containing the tree to process.
538/// \param[in] entries List of entry numbers to process.
539/// \param[in] nThreads Number of threads to create in the underlying thread-pool. The semantics of this argument are
540/// the same as for TThreadExecutor.
542 : fFileNames(GetFilesFromTree(tree)), fTreeNames(GetTreeFullPaths(tree)), fEntryList(entries),
543 fFriendInfo(GetFriendInfo(tree)), fPool(nThreads)
544{
546}
547
548////////////////////////////////////////////////////////////////////////
549/// Constructor based on a TTree.
550/// \param[in] tree Tree or chain of files containing the tree to process.
551/// \param[in] nThreads Number of threads to create in the underlying thread-pool. The semantics of this argument are
552/// the same as for TThreadExecutor.
554{
555}
556
557//////////////////////////////////////////////////////////////////////////////
558/// Process the entries of a TTree in parallel. The user-provided function
559/// receives a TTreeReader which can be used to iterate on a subrange of
560/// entries
561/// ~~~{.cpp}
562/// TTreeProcessorMT::Process([](TTreeReader& readerSubRange) {
563/// // Select branches to read
564/// while (readerSubRange.Next()) {
565/// // Use content of current entry
566/// }
567/// });
568/// ~~~
569/// The user needs to be aware that each of the subranges can potentially
570/// be processed in parallel. This means that the code of the user function
571/// should be thread safe.
572///
573/// \param[in] func User-defined function that processes a subrange of entries
574void TTreeProcessorMT::Process(std::function<void(TTreeReader &)> func)
575{
576 const std::vector<Internal::NameAlias> &friendNames = fFriendInfo.fFriendNames;
577 const std::vector<std::vector<std::string>> &friendFileNames = fFriendInfo.fFriendFileNames;
578
579 // compute number of tasks per file
580 const unsigned int maxTasksPerFile =
581 std::ceil(float(GetTasksPerWorkerHint() * fPool.GetPoolSize()) / float(fFileNames.size()));
582
583 // If an entry list or friend trees are present, we need to generate clusters with global entry numbers,
584 // so we do it here for all files.
585 // Otherwise we can do it later, concurrently for each file, and clusters will contain local entry numbers.
586 // TODO: in practice we could also find clusters per-file in the case of no friends and a TEntryList with
587 // sub-entrylists.
588 const bool hasFriends = !friendNames.empty();
589 const bool hasEntryList = fEntryList.GetN() > 0;
590 const bool shouldRetrieveAllClusters = hasFriends || hasEntryList;
591 ClustersAndEntries clusterAndEntries{};
592 if (shouldRetrieveAllClusters) {
593 clusterAndEntries = MakeClusters(fTreeNames, fFileNames, maxTasksPerFile);
594 if (hasEntryList)
595 clusterAndEntries.first = ConvertToElistClusters(std::move(clusterAndEntries.first), fEntryList, fTreeNames,
596 fFileNames, clusterAndEntries.second);
597 }
598
599 const auto &clusters = clusterAndEntries.first;
600 const auto &entries = clusterAndEntries.second;
601
602 // Retrieve number of entries for each file for each friend tree
603 const auto friendEntries =
604 hasFriends ? GetFriendEntries(friendNames, friendFileNames) : std::vector<std::vector<Long64_t>>{};
605
606 // Parent task, spawns tasks that process each of the entry clusters for each input file
607 // TODO: for readability we should have two versions of this lambda, for shouldRetrieveAllClusters == true/false
608 auto processFile = [&](std::size_t fileIdx) {
609 // theseFiles contains either all files or just the single file to process
610 const auto &theseFiles = shouldRetrieveAllClusters ? fFileNames : std::vector<std::string>({fFileNames[fileIdx]});
611 // either all tree names or just the single tree to process
612 const auto &theseTrees = shouldRetrieveAllClusters ? fTreeNames : std::vector<std::string>({fTreeNames[fileIdx]});
613 // Evaluate clusters (with local entry numbers) and number of entries for this file, if needed
614 const auto theseClustersAndEntries =
615 shouldRetrieveAllClusters ? ClustersAndEntries{} : MakeClusters(theseTrees, theseFiles, maxTasksPerFile);
616
617 // All clusters for the file to process, either with global or local entry numbers
618 const auto &thisFileClusters = shouldRetrieveAllClusters ? clusters[fileIdx] : theseClustersAndEntries.first[0];
619
620 // Either all number of entries or just the ones for this file
621 const auto &theseEntries =
622 shouldRetrieveAllClusters ? entries : std::vector<Long64_t>({theseClustersAndEntries.second[0]});
623
624 auto processCluster = [&](const EntryCluster &c) {
625 auto r = fTreeView->GetTreeReader(c.start, c.end, theseTrees, theseFiles, fFriendInfo, fEntryList,
626 theseEntries, friendEntries);
627 func(*r);
628 };
629
630 fPool.Foreach(processCluster, thisFileClusters);
631 };
632
633 std::vector<std::size_t> fileIdxs(fFileNames.size());
634 std::iota(fileIdxs.begin(), fileIdxs.end(), 0u);
635
636 fPool.Foreach(processFile, fileIdxs);
637}
638
639////////////////////////////////////////////////////////////////////////
640/// \brief This function is deprecated in favor of GetTasksPerWorkerHint().
642{
644}
645
646////////////////////////////////////////////////////////////////////////
647/// \brief Retrieve the current value for the desired number of tasks per worker.
648/// \return The desired number of tasks to be created per worker. TTreeProcessorMT uses this value as an hint.
650{
652}
653
654////////////////////////////////////////////////////////////////////////
655/// \brief This function is deprecated in favor of SetTasksPerWorkerHint().
656void TTreeProcessorMT::SetMaxTasksPerFilePerWorker(unsigned int maxTasksPerFile)
657{
658 fgMaxTasksPerFilePerWorker = maxTasksPerFile;
659}
660
661////////////////////////////////////////////////////////////////////////
662/// \brief Set the hint for the desired number of tasks created per worker.
663/// \param[in] tasksPerWorkerHint Desired number of tasks per worker.
664///
665/// This allows to create a reasonable number of tasks even if any of the
666/// processed files features a bad clustering, for example with a lot of
667/// entries and just a few entries per cluster, or to limit the number of
668/// tasks spawned when a very large number of files and workers is used.
669void TTreeProcessorMT::SetTasksPerWorkerHint(unsigned int tasksPerWorkerHint)
670{
671 fgTasksPerWorkerHint = tasksPerWorkerHint;
672}
ROOT::R::TRInterface & r
Definition Object.C:4
#define f(i)
Definition RSha256.hxx:104
#define c(i)
Definition RSha256.hxx:101
long long Long64_t
Definition RtypesCore.h:73
#define gDirectory
Definition TDirectory.h:290
#define R__ASSERT(e)
Definition TError.h:120
char name[80]
Definition TGX11.cxx:110
std::vector< std::string > GetFilesFromTree(TTree &tree)
std::vector< std::string > CheckAndConvert(const std::vector< std::string_view > &views)
std::unique_ptr< TChain > fChain
Chain on which to operate.
std::unique_ptr< TTreeReader > GetTreeReader(Long64_t start, Long64_t end, const std::vector< std::string > &treeName, const std::vector< std::string > &fileNames, const FriendInfo &friendInfo, const TEntryList &entryList, const std::vector< Long64_t > &nEntries, const std::vector< std::vector< Long64_t > > &friendEntries)
Get a TTreeReader for the current tree of this view.
std::vector< std::unique_ptr< TChain > > fFriends
Friends of the tree/chain, if present.
std::unique_ptr< TEntryList > fEntryList
TEntryList for fChain, if present.
void MakeChain(const std::vector< std::string > &treeName, const std::vector< std::string > &fileNames, const FriendInfo &friendInfo, const std::vector< Long64_t > &nEntries, const std::vector< std::vector< Long64_t > > &friendEntries)
Construct fChain, also adding friends if needed and injecting knowledge of offsets if available.
unsigned GetPoolSize() const
Returns the number of worker threads in the task arena.
void Foreach(F func, unsigned nTimes, unsigned nChunks=0)
Execute a function without arguments several times in parallel, dividing the execution in nChunks.
A class to process the entries of a TTree in parallel.
const std::vector< std::string > fTreeNames
TTree names (always same size and ordering as fFileNames)
static unsigned int GetMaxTasksPerFilePerWorker()
This function is deprecated in favor of GetTasksPerWorkerHint().
std::vector< std::string > FindTreeNames()
Retrieve the names of the TTrees in each of the input files, throw if a TTree cannot be found.
const std::vector< std::string > fFileNames
Names of the files.
static unsigned int fgTasksPerWorkerHint
static void SetMaxTasksPerFilePerWorker(unsigned int m)
This function is deprecated in favor of SetTasksPerWorkerHint().
Internal::FriendInfo GetFriendInfo(TTree &tree)
Get and store the names, aliases and file names of the friends of the tree.
static unsigned int fgMaxTasksPerFilePerWorker
ROOT::TThreadExecutor fPool
! Thread pool for processing.
TEntryList fEntryList
User-defined selection of entry numbers to be processed, empty if none was provided.
static void SetTasksPerWorkerHint(unsigned int m)
Set the hint for the desired number of tasks created per worker.
ROOT::TThreadedObject< ROOT::Internal::TTreeView > fTreeView
Thread-local TreeViews.
TTreeProcessorMT(std::string_view filename, std::string_view treename="", UInt_t nThreads=0u)
Constructor based on a file name.
void Process(std::function< void(TTreeReader &)> func)
Process the entries of a TTree in parallel.
const Internal::FriendInfo fFriendInfo
static unsigned int GetTasksPerWorkerHint()
Retrieve the current value for the desired number of tasks per worker.
A chain is a collection of files containing TTree objects.
Definition TChain.h:33
TObjArray * GetListOfFiles() const
Definition TChain.h:107
Small helper to keep current directory context.
Definition TDirectory.h:52
A List of entry numbers in a TTree or TChain.
Definition TEntryList.h:26
virtual TList * GetLists() const
Definition TEntryList.h:73
virtual Long64_t GetEntry(Int_t index)
Return the number of the entry #index of this TEntryList in the TTree or TChain See also Next().
virtual Long64_t GetN() const
Definition TEntryList.h:75
A ROOT file is a suite of consecutive data records (TKey instances) with a well defined format.
Definition TFile.h:54
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
Create / open a file.
Definition TFile.cxx:3997
A TFriendElement TF describes a TTree object TF in a file.
Book space in a file, create I/O buffers, to fill them, (un)compress them.
Definition TKey.h:28
virtual void Add(TObject *obj)
Definition TList.h:87
virtual const char * GetName() const
Returns name of object.
Definition TNamed.h:47
An array of TObjects.
Definition TObjArray.h:37
Int_t GetEntries() const
Return the number of objects in array (i.e.
Mother of all ROOT objects.
Definition TObject.h:37
@ kCanDelete
if object in a list can be deleted
Definition TObject.h:58
@ kMustCleanup
if object destructor must call RecursiveRemove()
Definition TObject.h:60
A simple, robust and fast interface to read values from ROOT columnar datasets such as TTree,...
Definition TTreeReader.h:44
A TTree represents a columnar dataset.
Definition TTree.h:79
virtual Long64_t GetEntries() const
Definition TTree.h:460
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
void EnableThreadSafety()
Enables the global mutex to make ROOT thread safe/aware.
Definition TROOT.cxx:494
Definition tree.py:1
std::vector< std::vector< std::string > > fFriendFileNames
Names of the files where each friend is stored.
std::vector< Internal::NameAlias > fFriendNames
Pairs of names and aliases of friend trees/chains.