Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
TTreeProcessorMT.cxx
Go to the documentation of this file.
1// @(#)root/thread:$Id$
2// Authors: Enric Tejedor, Enrico Guiraud CERN 05/06/2018
3
4/*************************************************************************
5 * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12/** \class ROOT::TTreeProcessorMT
13 \ingroup Parallelism
14 \brief A class to process the entries of a TTree in parallel.
15
16By means of its Process method, ROOT::TTreeProcessorMT provides a way to process the
17entries of a TTree in parallel. When invoking TTreeProcessor::Process, the user
18passes a function whose only parameter is a TTreeReader. The function iterates
19on a subrange of entries by using that TTreeReader.
20
21The implementation of ROOT::TTreeProcessorMT parallelizes the processing of the subranges,
22each corresponding to a cluster in the TTree. This is possible thanks to the use
23of a ROOT::TThreadedObject, so that each thread works with its own TFile and TTree
24objects.
25*/
26
27#include "TROOT.h"
29
30using namespace ROOT;
31
32namespace {
33
34/// A cluster of entries
35struct EntryCluster {
36 Long64_t start;
37 Long64_t end;
38};
39
40// note that this routine assumes global entry numbers
41static bool ClustersAreSortedAndContiguous(const std::vector<std::vector<EntryCluster>> &cls)
42{
43 Long64_t last_end = 0ll;
44 for (const auto &fcl : cls) {
45 for (const auto &c : fcl) {
46 if (last_end != c.start)
47 return false;
48 last_end = c.end;
49 }
50 }
51 return true;
52}
53
54/// Take a vector of vectors of EntryClusters (a vector per file), filter the entries according to entryList, and
55/// and return a new vector of vectors of EntryClusters where cluster start/end entry numbers have been converted to
56/// TEntryList-local entry numbers.
57///
58/// This routine assumes that entry numbers in the TEntryList (and, if present, in the sub-entrylists) are in
59/// ascending order, i.e., for n > m:
60/// elist.GetEntry(n) + tree_offset_for_entry_from_elist(n) > elist.GetEntry(m) + tree_offset_for_entry_from_elist(m)
61static std::vector<std::vector<EntryCluster>>
62ConvertToElistClusters(std::vector<std::vector<EntryCluster>> &&clusters, TEntryList &entryList,
63 const std::vector<std::string> &treeNames, const std::vector<std::string> &fileNames,
64 const std::vector<Long64_t> &entriesPerFile)
65{
66 R__ASSERT(entryList.GetN() > 0); // wasteful to call this function if it has nothing to do
67 R__ASSERT(ClustersAreSortedAndContiguous(clusters));
68
69 const bool listHasGlobalEntryNumbers = entryList.GetLists() == nullptr;
70 const auto nFiles = clusters.size();
71
72 std::unique_ptr<TChain> chain;
73 using NextFn_t = Long64_t (*)(Long64_t &, TEntryList &, TChain *);
74 // A function that advances TEntryList and returns global entry numbers or -1 if we reached the end
75 // (might or might not need a TChain depending on whether listHasGlobalEntryNumbers)
76 NextFn_t Next;
77 if (listHasGlobalEntryNumbers) {
78 Next = [](Long64_t &elEntry, TEntryList &elist, TChain *) {
79 ++elEntry;
80 return elist.Next();
81 };
82 } else {
83 // we need `chain` to be able to convert local entry numbers to global entry numbers in `Next`
85 for (auto i = 0u; i < nFiles; ++i)
86 chain->Add((fileNames[i] + "?#" + treeNames[i]).c_str(), entriesPerFile[i]);
87 Next = [](Long64_t &elEntry, TEntryList &elist, TChain *ch) {
88 ++elEntry;
89 int treenum = -1;
90 Long64_t localEntry = elist.GetEntryAndTree(elEntry, treenum);
91 if (localEntry == -1ll)
92 return localEntry;
93 return localEntry + ch->GetTreeOffset()[treenum];
94 };
95 }
96
97 // the call to GetEntry also serves the purpose to reset TEntryList::fLastIndexQueried,
98 // so we can be sure TEntryList::Next will return the correct thing
99 Long64_t elistEntry = 0ll;
100 Long64_t entry = entryList.GetEntry(elistEntry);
101
102 std::vector<std::vector<EntryCluster>> elistClusters;
103
104 for (auto fileN = 0u; fileN < nFiles; ++fileN) {
105 std::vector<EntryCluster> elistClustersForFile;
106 for (const auto &c : clusters[fileN]) {
107 if (entry >= c.end || entry == -1ll) // no entrylist entries in this cluster
108 continue;
109 R__ASSERT(entry >= c.start); // current entry should never come before the cluster we are looking at
110 const Long64_t elistRangeStart = elistEntry;
111 // advance entry list until the entrylist entry goes beyond the end of the cluster
112 while (entry < c.end && entry != -1ll)
113 entry = Next(elistEntry, entryList, chain.get());
114 elistClustersForFile.emplace_back(EntryCluster{elistRangeStart, elistEntry});
115 }
116 elistClusters.emplace_back(std::move(elistClustersForFile));
117 }
118
119 R__ASSERT(elistClusters.size() == clusters.size()); // same number of files
120 R__ASSERT(ClustersAreSortedAndContiguous(elistClusters));
121
122 entryList.GetEntry(0ll); // reset TEntryList internal state, lest we incur in ROOT-10807
123 return elistClusters;
124}
125
126// EntryClusters and number of entries per file
127using ClustersAndEntries = std::pair<std::vector<std::vector<EntryCluster>>, std::vector<Long64_t>>;
128
129////////////////////////////////////////////////////////////////////////
130/// Return a vector of cluster boundaries for the given tree and files.
131static ClustersAndEntries MakeClusters(const std::vector<std::string> &treeNames,
132 const std::vector<std::string> &fileNames, const unsigned int maxTasksPerFile)
133{
134 // Note that as a side-effect of opening all files that are going to be used in the
135 // analysis once, all necessary streamers will be loaded into memory.
137 const auto nFileNames = fileNames.size();
138 std::vector<std::vector<EntryCluster>> clustersPerFile;
139 std::vector<Long64_t> entriesPerFile;
140 entriesPerFile.reserve(nFileNames);
141 Long64_t offset = 0ll;
142 for (auto i = 0u; i < nFileNames; ++i) {
143 const auto &fileName = fileNames[i];
144 const auto &treeName = treeNames[i];
145
146 std::unique_ptr<TFile> f(TFile::Open(
147 fileName.c_str(), "READ_WITHOUT_GLOBALREGISTRATION")); // need TFile::Open to load plugins if need be
148 if (!f || f->IsZombie()) {
149 const auto msg = "TTreeProcessorMT::Process: an error occurred while opening file \"" + fileName + "\"";
150 throw std::runtime_error(msg);
151 }
152 auto *t = f->Get<TTree>(treeName.c_str()); // t will be deleted by f
153
154 if (!t) {
155 const auto msg = "TTreeProcessorMT::Process: an error occurred while getting tree \"" + treeName +
156 "\" from file \"" + fileName + "\"";
157 throw std::runtime_error(msg);
158 }
159
160 // Avoid calling TROOT::RecursiveRemove for this tree, it takes the read lock and we don't need it.
161 t->ResetBit(kMustCleanup);
162 auto clusterIter = t->GetClusterIterator(0);
163 Long64_t start = 0ll, end = 0ll;
164 const Long64_t entries = t->GetEntries();
165 // Iterate over the clusters in the current file
166 std::vector<EntryCluster> clusters;
167 while ((start = clusterIter()) < entries) {
168 end = clusterIter.GetNextEntry();
169 // Add the current file's offset to start and end to make them (chain) global
170 clusters.emplace_back(EntryCluster{start + offset, end + offset});
171 }
172 offset += entries;
173 clustersPerFile.emplace_back(std::move(clusters));
174 entriesPerFile.emplace_back(entries);
175 }
176
177 // Here we "fuse" clusters together if the number of clusters is too big with respect to
178 // the number of slots, otherwise we can incur in an overhead which is big enough
179 // to make parallelisation detrimental to performance.
180 // For example, this is the case when, following a merging of many small files, a file
181 // contains a tree with many entries and with clusters of just a few entries each.
182 // Another problematic case is a high number of slots (e.g. 256) coupled with a high number
183 // of files (e.g. 1000 files): the large amount of files might result in a large amount
184 // of tasks, but the elevated concurrency level makes the little synchronization required by
185 // task initialization very expensive. In this case it's better to simply process fewer, larger tasks.
186 // Cluster-merging can help reduce the number of tasks down to a minumum of one task per file.
187 //
188 // The criterion according to which we fuse clusters together is to have around
189 // TTreeProcessorMT::GetTasksPerWorkerHint() clusters per slot.
190 // Concretely, for each file we will cap the number of tasks to ceil(GetTasksPerWorkerHint() * nWorkers / nFiles).
191
192 std::vector<std::vector<EntryCluster>> eventRangesPerFile(clustersPerFile.size());
193 auto clustersPerFileIt = clustersPerFile.begin();
194 auto eventRangesPerFileIt = eventRangesPerFile.begin();
195 for (; clustersPerFileIt != clustersPerFile.end(); clustersPerFileIt++, eventRangesPerFileIt++) {
196 const auto clustersInThisFileSize = clustersPerFileIt->size();
197 const auto nFolds = clustersInThisFileSize / maxTasksPerFile;
198 // If the number of clusters is less than maxTasksPerFile
199 // we take the clusters as they are
200 if (nFolds == 0) {
201 *eventRangesPerFileIt = std::move(*clustersPerFileIt);
202 continue;
203 }
204 // Otherwise, we have to merge clusters, distributing the reminder evenly
205 // onto the first clusters
206 auto nReminderClusters = clustersInThisFileSize % maxTasksPerFile;
207 const auto &clustersInThisFile = *clustersPerFileIt;
208 for (auto i = 0ULL; i < clustersInThisFileSize; ++i) {
209 const auto start = clustersInThisFile[i].start;
210 // We lump together at least nFolds clusters, therefore
211 // we need to jump ahead of nFolds-1.
212 i += (nFolds - 1);
213 // We now add a cluster if we have some reminder left
214 if (nReminderClusters > 0) {
215 i += 1U;
216 nReminderClusters--;
217 }
218 const auto end = clustersInThisFile[i].end;
219 eventRangesPerFileIt->emplace_back(EntryCluster({start, end}));
220 }
221 }
222
223 return std::make_pair(std::move(eventRangesPerFile), std::move(entriesPerFile));
224}
225
226////////////////////////////////////////////////////////////////////////
227/// Return a vector containing the number of entries of each file of each friend TChain
228static std::vector<std::vector<Long64_t>> GetFriendEntries(const Internal::TreeUtils::RFriendInfo &friendInfo)
229{
230
231 const auto &friendNames = friendInfo.fFriendNames;
232 const auto &friendFileNames = friendInfo.fFriendFileNames;
233 const auto &friendChainSubNames = friendInfo.fFriendChainSubNames;
234
235 std::vector<std::vector<Long64_t>> friendEntries;
236 const auto nFriends = friendNames.size();
237 for (auto i = 0u; i < nFriends; ++i) {
238 std::vector<Long64_t> nEntries;
239 const auto &thisFriendName = friendNames[i].first;
240 const auto &thisFriendFiles = friendFileNames[i];
241 const auto &thisFriendChainSubNames = friendChainSubNames[i];
242 // If this friend has chain sub names, it means it's a TChain.
243 // In this case, we need to traverse all files that make up the TChain,
244 // retrieve the correct sub tree from each file and store the number of
245 // entries for that sub tree.
246 if (!thisFriendChainSubNames.empty()) {
247 // Traverse together filenames and respective treenames
248 for (auto fileidx = 0u; fileidx < thisFriendFiles.size(); ++fileidx) {
249 std::unique_ptr<TFile> curfile(
250 TFile::Open(thisFriendFiles[fileidx].c_str(), "READ_WITHOUT_GLOBALREGISTRATION"));
251 if (!curfile || curfile->IsZombie())
252 throw std::runtime_error("TTreeProcessorMT::GetFriendEntries: Could not open file \"" +
253 thisFriendFiles[fileidx] + "\"");
254 // thisFriendChainSubNames[fileidx] stores the name of the current
255 // subtree in the TChain stored in the current file.
256 TTree *curtree = curfile->Get<TTree>(thisFriendChainSubNames[fileidx].c_str());
257 if (!curtree)
258 throw std::runtime_error("TTreeProcessorMT::GetFriendEntries: Could not retrieve TTree \"" +
259 thisFriendChainSubNames[fileidx] + "\" from file \"" +
260 thisFriendFiles[fileidx] + "\"");
261 nEntries.emplace_back(curtree->GetEntries());
262 }
263 // Otherwise, if there are no sub names for the current friend, it means
264 // it's a TTree. We can safely use `thisFriendName` as the name of the tree
265 // to retrieve from the file in `thisFriendFiles`
266 } else {
267 for (const auto &fname : thisFriendFiles) {
268 std::unique_ptr<TFile> f(TFile::Open(fname.c_str(), "READ_WITHOUT_GLOBALREGISTRATION"));
269 if (!f || f->IsZombie())
270 throw std::runtime_error("TTreeProcessorMT::GetFriendEntries: Could not open file \"" + fname + "\"");
271 TTree *t = f->Get<TTree>(thisFriendName.c_str());
272 if (!t)
273 throw std::runtime_error("TTreeProcessorMT::GetFriendEntries: Could not retrieve TTree \"" +
274 thisFriendName + "\" from file \"" + fname + "\"");
275 nEntries.emplace_back(t->GetEntries());
276 }
277 }
278 // Store the vector with entries for each file in the current tree/chain.
279 friendEntries.emplace_back(std::move(nEntries));
280 }
281
282 return friendEntries;
283}
284
285} // anonymous namespace
286
287namespace ROOT {
288
290
291namespace Internal {
292
293////////////////////////////////////////////////////////////////////////////////
294/// Construct fChain, also adding friends if needed and injecting knowledge of offsets if available.
295/// \param[in] treeNames Name of the tree for each file in `fileNames`.
296/// \param[in] fileNames Files to be opened.
297/// \param[in] friendInfo Information about TTree friends, if any.
298/// \param[in] nEntries Number of entries to be processed.
299/// \param[in] friendEntries Number of entries in each friend. Expected to have same ordering as friendInfo.
300void TTreeView::MakeChain(const std::vector<std::string> &treeNames, const std::vector<std::string> &fileNames,
301 const TreeUtils::RFriendInfo &friendInfo, const std::vector<Long64_t> &nEntries,
302 const std::vector<std::vector<Long64_t>> &friendEntries)
303{
304
305 const auto &friendNames = friendInfo.fFriendNames;
306 const auto &friendFileNames = friendInfo.fFriendFileNames;
307 const auto &friendChainSubNames = friendInfo.fFriendChainSubNames;
308
310 const auto nFiles = fileNames.size();
311 for (auto i = 0u; i < nFiles; ++i) {
312 fChain->Add((fileNames[i] + "?#" + treeNames[i]).c_str(), nEntries[i]);
313 }
314 fChain->ResetBit(TObject::kMustCleanup);
315
316 fFriends.clear();
317 const auto nFriends = friendNames.size();
318 for (auto i = 0u; i < nFriends; ++i) {
319 const auto &thisFriendNameAlias = friendNames[i];
320 const auto &thisFriendName = thisFriendNameAlias.first;
321 const auto &thisFriendAlias = thisFriendNameAlias.second;
322 const auto &thisFriendFiles = friendFileNames[i];
323 const auto &thisFriendChainSubNames = friendChainSubNames[i];
324 const auto &thisFriendEntries = friendEntries[i];
325
326 // Build a friend chain
327 auto frChain = std::make_unique<TChain>(thisFriendName.c_str(), "", TChain::kWithoutGlobalRegistration);
328 const auto nFileNames = friendFileNames[i].size();
329 if (thisFriendChainSubNames.empty()) {
330 // If there are no chain subnames, the friend was a TTree. It's safe
331 // to add to the chain the filename directly.
332 for (auto j = 0u; j < nFileNames; ++j) {
333 frChain->Add(thisFriendFiles[j].c_str(), thisFriendEntries[j]);
334 }
335 } else {
336 // Otherwise, the new friend chain needs to be built using the nomenclature
337 // "filename/treename" as argument to `TChain::Add`
338 for (auto j = 0u; j < nFileNames; ++j) {
339 frChain->Add((thisFriendFiles[j] + "?#" + thisFriendChainSubNames[j]).c_str(), thisFriendEntries[j]);
340 }
341 }
342
343 // Make it friends with the main chain
344 fChain->AddFriend(frChain.get(), thisFriendAlias.c_str());
345 fFriends.emplace_back(std::move(frChain));
346 }
347}
348
349//////////////////////////////////////////////////////////////////////////
350/// Get a TTreeReader for the current tree of this view.
351std::unique_ptr<TTreeReader>
352TTreeView::GetTreeReader(Long64_t start, Long64_t end, const std::vector<std::string> &treeNames,
353 const std::vector<std::string> &fileNames, const TreeUtils::RFriendInfo &friendInfo,
354 const TEntryList &entryList, const std::vector<Long64_t> &nEntries,
355 const std::vector<std::vector<Long64_t>> &friendEntries)
356{
357 const bool hasEntryList = entryList.GetN() > 0;
358 const bool usingLocalEntries = friendInfo.fFriendNames.empty() && !hasEntryList;
359 const bool needNewChain =
360 fChain == nullptr || (usingLocalEntries && (fileNames[0] != fChain->GetListOfFiles()->At(0)->GetTitle() ||
361 treeNames[0] != fChain->GetListOfFiles()->At(0)->GetName()));
362 if (needNewChain) {
363 MakeChain(treeNames, fileNames, friendInfo, nEntries, friendEntries);
364 if (hasEntryList) {
365 fEntryList.reset(new TEntryList(entryList));
366 if (fEntryList->GetLists() != nullptr) {
367 // need to associate the TEntryList to the TChain for the latter to set entry the fTreeNumbers of the
368 // sub-lists of the former...
369 fChain->SetEntryList(fEntryList.get());
370 fEntryList->ResetBit(TObject::kCanDelete); // ...but we want to retain ownership
371 }
372 }
373 }
374 auto reader = std::make_unique<TTreeReader>(fChain.get(), fEntryList.get());
375 reader->SetEntriesRange(start, end);
376 return reader;
377}
378
379////////////////////////////////////////////////////////////////////////
380/// Clear the resources
382{
383 fChain.reset();
384 fEntryList.reset();
385 fFriends.clear();
386}
387
388} // namespace Internal
389} // namespace ROOT
390
391/////////////////////////////////////////////////////////////////////////////////////////////////
392/// Retrieve the names of the TTrees in each of the input files, throw if a TTree cannot be found.
393std::vector<std::string> TTreeProcessorMT::FindTreeNames()
394{
395 std::vector<std::string> treeNames;
396
397 if (fFileNames.empty()) // This can never happen
398 throw std::runtime_error("Empty list of files and no tree name provided");
399
401 for (const auto &fname : fFileNames) {
402 std::string treeName;
403 std::unique_ptr<TFile> f(TFile::Open(fname.c_str()));
404 TIter next(f->GetListOfKeys());
405 while (auto *key = static_cast<TKey *>(next())) {
406 const char *className = key->GetClassName();
407 if (strcmp(className, "TTree") == 0) {
408 treeName = key->GetName();
409 break;
410 }
411 }
412 if (treeName.empty())
413 throw std::runtime_error("Cannot find any tree in file " + fname);
414 treeNames.emplace_back(std::move(treeName));
415 }
416
417 return treeNames;
418}
419
420////////////////////////////////////////////////////////////////////////
421/// Constructor based on a file name.
422/// \param[in] filename Name of the file containing the tree to process.
423/// \param[in] treename Name of the tree to process. If not provided, the implementation will search
424/// for a TTree key in the file and will use the first one it finds.
425/// \param[in] nThreads Number of threads to create in the underlying thread-pool. The semantics of this argument are
426/// the same as for TThreadExecutor.
427TTreeProcessorMT::TTreeProcessorMT(std::string_view filename, std::string_view treename, UInt_t nThreads)
428 : fFileNames({std::string(filename)}),
429 fTreeNames(treename.empty() ? FindTreeNames() : std::vector<std::string>{std::string(treename)}), fFriendInfo(),
430 fPool(nThreads)
431{
433}
434
435std::vector<std::string> CheckAndConvert(const std::vector<std::string_view> &views)
436{
437 if (views.empty())
438 throw std::runtime_error("The provided list of file names is empty");
439
440 std::vector<std::string> strings;
441 strings.reserve(views.size());
442 for (const auto &v : views)
443 strings.emplace_back(v);
444 return strings;
445}
446
447////////////////////////////////////////////////////////////////////////
448/// Constructor based on a collection of file names.
449/// \param[in] filenames Collection of the names of the files containing the tree to process.
450/// \param[in] treename Name of the tree to process. If not provided, the implementation will
451/// search filenames for a TTree key and will use the first one it finds in each file.
452/// \param[in] nThreads Number of threads to create in the underlying thread-pool. The semantics of this argument are
453/// the same as for TThreadExecutor.
454///
455/// If different files contain TTrees with different names and automatic TTree name detection is not an option
456/// (for example, because some of the files contain multiple TTrees) please manually create a TChain and pass
457/// it to the appropriate TTreeProcessorMT constructor.
458TTreeProcessorMT::TTreeProcessorMT(const std::vector<std::string_view> &filenames, std::string_view treename,
459 UInt_t nThreads)
460 : fFileNames(CheckAndConvert(filenames)),
461 fTreeNames(treename.empty() ? FindTreeNames()
462 : std::vector<std::string>(fFileNames.size(), std::string(treename))),
463 fFriendInfo(), fPool(nThreads)
464{
466}
467
468////////////////////////////////////////////////////////////////////////
469/// Constructor based on a TTree and a TEntryList.
470/// \param[in] tree Tree or chain of files containing the tree to process.
471/// \param[in] entries List of entry numbers to process.
472/// \param[in] nThreads Number of threads to create in the underlying thread-pool. The semantics of this argument are
473/// the same as for TThreadExecutor.
475 : fFileNames(Internal::TreeUtils::GetFileNamesFromTree(tree)),
476 fTreeNames(Internal::TreeUtils::GetTreeFullPaths(tree)), fEntryList(entries),
477 fFriendInfo(Internal::TreeUtils::GetFriendInfo(tree)), fPool(nThreads)
478{
480}
481
482////////////////////////////////////////////////////////////////////////
483/// Constructor based on a TTree.
484/// \param[in] tree Tree or chain of files containing the tree to process.
485/// \param[in] nThreads Number of threads to create in the underlying thread-pool. The semantics of this argument are
486/// the same as for TThreadExecutor.
488
489//////////////////////////////////////////////////////////////////////////////
490/// Process the entries of a TTree in parallel. The user-provided function
491/// receives a TTreeReader which can be used to iterate on a subrange of
492/// entries
493/// ~~~{.cpp}
494/// TTreeProcessorMT::Process([](TTreeReader& readerSubRange) {
495/// // Select branches to read
496/// while (readerSubRange.Next()) {
497/// // Use content of current entry
498/// }
499/// });
500/// ~~~
501/// The user needs to be aware that each of the subranges can potentially
502/// be processed in parallel. This means that the code of the user function
503/// should be thread safe.
504///
505/// \param[in] func User-defined function that processes a subrange of entries
506void TTreeProcessorMT::Process(std::function<void(TTreeReader &)> func)
507{
508 // compute number of tasks per file
509 const unsigned int maxTasksPerFile =
510 std::ceil(float(GetTasksPerWorkerHint() * fPool.GetPoolSize()) / float(fFileNames.size()));
511
512 // If an entry list or friend trees are present, we need to generate clusters with global entry numbers,
513 // so we do it here for all files.
514 // Otherwise we can do it later, concurrently for each file, and clusters will contain local entry numbers.
515 // TODO: in practice we could also find clusters per-file in the case of no friends and a TEntryList with
516 // sub-entrylists.
517 const bool hasFriends = !fFriendInfo.fFriendNames.empty();
518 const bool hasEntryList = fEntryList.GetN() > 0;
519 const bool shouldRetrieveAllClusters = hasFriends || hasEntryList;
520 ClustersAndEntries clusterAndEntries{};
521 if (shouldRetrieveAllClusters) {
522 clusterAndEntries = MakeClusters(fTreeNames, fFileNames, maxTasksPerFile);
523 if (hasEntryList)
524 clusterAndEntries.first = ConvertToElistClusters(std::move(clusterAndEntries.first), fEntryList, fTreeNames,
525 fFileNames, clusterAndEntries.second);
526 }
527
528 const auto &clusters = clusterAndEntries.first;
529 const auto &entries = clusterAndEntries.second;
530
531 // Retrieve number of entries for each file for each friend tree
532 const auto friendEntries = hasFriends ? GetFriendEntries(fFriendInfo) : std::vector<std::vector<Long64_t>>{};
533
534 // Parent task, spawns tasks that process each of the entry clusters for each input file
535 // TODO: for readability we should have two versions of this lambda, for shouldRetrieveAllClusters == true/false
536 auto processFile = [&](std::size_t fileIdx) {
537 // theseFiles contains either all files or just the single file to process
538 const auto &theseFiles = shouldRetrieveAllClusters ? fFileNames : std::vector<std::string>({fFileNames[fileIdx]});
539 // either all tree names or just the single tree to process
540 const auto &theseTrees = shouldRetrieveAllClusters ? fTreeNames : std::vector<std::string>({fTreeNames[fileIdx]});
541 // Evaluate clusters (with local entry numbers) and number of entries for this file, if needed
542 const auto theseClustersAndEntries =
543 shouldRetrieveAllClusters ? ClustersAndEntries{} : MakeClusters(theseTrees, theseFiles, maxTasksPerFile);
544
545 // All clusters for the file to process, either with global or local entry numbers
546 const auto &thisFileClusters = shouldRetrieveAllClusters ? clusters[fileIdx] : theseClustersAndEntries.first[0];
547
548 // Either all number of entries or just the ones for this file
549 const auto &theseEntries =
550 shouldRetrieveAllClusters ? entries : std::vector<Long64_t>({theseClustersAndEntries.second[0]});
551
552 auto processCluster = [&](const EntryCluster &c) {
553 auto r = fTreeView->GetTreeReader(c.start, c.end, theseTrees, theseFiles, fFriendInfo, fEntryList,
554 theseEntries, friendEntries);
555 func(*r);
556 };
557
558 fPool.Foreach(processCluster, thisFileClusters);
559 };
560
561 std::vector<std::size_t> fileIdxs(fFileNames.size());
562 std::iota(fileIdxs.begin(), fileIdxs.end(), 0u);
563
564 fPool.Foreach(processFile, fileIdxs);
565
566 // make sure TChains and TFiles are cleaned up since they are not globally tracked
567 for (unsigned int islot = 0; islot < fTreeView.GetNSlots(); ++islot) {
569 if (view != nullptr) {
570 view->Reset();
571 }
572 }
573}
574
575////////////////////////////////////////////////////////////////////////
576/// \brief Retrieve the current value for the desired number of tasks per worker.
577/// \return The desired number of tasks to be created per worker. TTreeProcessorMT uses this value as an hint.
579{
581}
582
583////////////////////////////////////////////////////////////////////////
584/// \brief Set the hint for the desired number of tasks created per worker.
585/// \param[in] tasksPerWorkerHint Desired number of tasks per worker.
586///
587/// This allows to create a reasonable number of tasks even if any of the
588/// processed files features a bad clustering, for example with a lot of
589/// entries and just a few entries per cluster, or to limit the number of
590/// tasks spawned when a very large number of files and workers is used.
591void TTreeProcessorMT::SetTasksPerWorkerHint(unsigned int tasksPerWorkerHint)
592{
593 fgTasksPerWorkerHint = tasksPerWorkerHint;
594}
ROOT::R::TRInterface & r
Definition Object.C:4
#define f(i)
Definition RSha256.hxx:104
#define c(i)
Definition RSha256.hxx:101
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
long long Long64_t
Definition RtypesCore.h:80
#define gDirectory
Definition TDirectory.h:385
#define R__ASSERT(e)
Definition TError.h:118
@ kMustCleanup
Definition TObject.h:370
std::vector< std::string > CheckAndConvert(const std::vector< std::string_view > &views)
std::unique_ptr< TChain > fChain
Chain on which to operate.
std::vector< std::unique_ptr< TChain > > fFriends
Friends of the tree/chain, if present.
std::unique_ptr< TEntryList > fEntryList
TEntryList for fChain, if present.
void Reset()
Clear the resources.
void MakeChain(const std::vector< std::string > &treeName, const std::vector< std::string > &fileNames, const TreeUtils::RFriendInfo &friendInfo, const std::vector< Long64_t > &nEntries, const std::vector< std::vector< Long64_t > > &friendEntries)
Construct fChain, also adding friends if needed and injecting knowledge of offsets if available.
std::unique_ptr< TTreeReader > GetTreeReader(Long64_t start, Long64_t end, const std::vector< std::string > &treeName, const std::vector< std::string > &fileNames, const TreeUtils::RFriendInfo &friendInfo, const TEntryList &entryList, const std::vector< Long64_t > &nEntries, const std::vector< std::vector< Long64_t > > &friendEntries)
Get a TTreeReader for the current tree of this view.
unsigned GetPoolSize() const
Returns the number of worker threads in the task arena.
void Foreach(F func, unsigned nTimes, unsigned nChunks=0)
Execute a function without arguments several times in parallel, dividing the execution in nChunks.
T * GetAtSlotRaw(unsigned i) const
Access a particular slot which corresponds to a single thread.
unsigned GetNSlots() const
Return the number of currently available slot.
A class to process the entries of a TTree in parallel.
const std::vector< std::string > fTreeNames
TTree names (always same size and ordering as fFileNames)
std::vector< std::string > FindTreeNames()
Retrieve the names of the TTrees in each of the input files, throw if a TTree cannot be found.
const std::vector< std::string > fFileNames
Names of the files.
static unsigned int fgTasksPerWorkerHint
const Internal::TreeUtils::RFriendInfo fFriendInfo
ROOT::TThreadExecutor fPool
! Thread pool for processing.
TEntryList fEntryList
User-defined selection of entry numbers to be processed, empty if none was provided.
static void SetTasksPerWorkerHint(unsigned int m)
Set the hint for the desired number of tasks created per worker.
ROOT::TThreadedObject< ROOT::Internal::TTreeView > fTreeView
Thread-local TreeViews.
TTreeProcessorMT(std::string_view filename, std::string_view treename="", UInt_t nThreads=0u)
Constructor based on a file name.
void Process(std::function< void(TTreeReader &)> func)
Process the entries of a TTree in parallel.
static unsigned int GetTasksPerWorkerHint()
Retrieve the current value for the desired number of tasks per worker.
A chain is a collection of files containing TTree objects.
Definition TChain.h:33
@ kWithoutGlobalRegistration
Definition TChain.h:70
TDirectory::TContext keeps track and restore the current directory.
Definition TDirectory.h:89
A List of entry numbers in a TTree or TChain.
Definition TEntryList.h:26
virtual TList * GetLists() const
Definition TEntryList.h:75
virtual Long64_t GetEntry(Long64_t index)
Return the number of the entry #index of this TEntryList in the TTree or TChain See also Next().
virtual Long64_t GetN() const
Definition TEntryList.h:77
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
Create / open a file.
Definition TFile.cxx:4025
Book space in a file, create I/O buffers, to fill them, (un)compress them.
Definition TKey.h:28
virtual void Add(TObject *obj)
Definition TList.h:81
@ kCanDelete
if object in a list can be deleted
Definition TObject.h:62
@ kMustCleanup
if object destructor must call RecursiveRemove()
Definition TObject.h:64
A simple, robust and fast interface to read values from ROOT columnar datasets such as TTree,...
Definition TTreeReader.h:44
A TTree represents a columnar dataset.
Definition TTree.h:79
virtual Long64_t GetEntries() const
Definition TTree.h:460
Different standalone functions to work with trees and tuples, not reqiuired to be a member of any cla...
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
void EnableThreadSafety()
Enables the global mutex to make ROOT thread safe/aware.
Definition TROOT.cxx:493
Definition tree.py:1
Information about friend trees of a certain TTree or TChain object.
std::vector< NameAlias > fFriendNames
Pairs of names and aliases of friend trees/chains.
std::vector< std::vector< std::string > > fFriendFileNames
Names of the files where each friend is stored.
std::vector< std::vector< std::string > > fFriendChainSubNames
Names of the subtrees of a friend TChain.