13#ifndef ROOT_TTreeProcessorMP 
   14#define ROOT_TTreeProcessorMP 
   39   template <
typename F, 
typename... Args>
 
 
  197   static_assert(std::is_constructible<TObject*, retType>::value,
 
  198                 "procFunc must return a pointer to a class inheriting from TObject," 
  199                 " and must take a reference to TTreeReader as the only argument");
 
  203      Warning(
"Process", 
"support for generic 'first entry' (jFirst > 0) not implemented yet - ignoring");
 
  212   TEntryList *elist = (entries.IsValid()) ? &entries : 
nullptr;
 
  217      Error(
"TTreeProcessorMP::Process", 
"[E][C] Could not fork. Aborting operation.");
 
  224      fTaskType = ETask::kProcByRange;
 
  227      std::vector<unsigned> args(
nWorkers);
 
  228      std::iota(args.begin(), args.end(), 0);
 
  231         Error(
"TTreeProcessorMP::Process", 
"[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
 
  234      fTaskType = ETask::kProcByFile;
 
  236      std::vector<unsigned> args(
nWorkers);
 
  237      std::iota(args.begin(), args.end(), 0);
 
  240         Error(
"TTreeProcessorMP::Process", 
"[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
 
  253   fTaskType = ETask::kNoTask;
 
  254   return static_cast<retType>(res);
 
 
  303   static_assert(std::is_constructible<TObject*, retType>::value, 
"procFunc must return a pointer to a class inheriting from TObject, and must take a reference to TTreeReader as the only argument");
 
  307      Warning(
"Process", 
"support for generic 'first entry' (jFirst > 0) not implemented yet - ignoring");
 
  316   TEntryList *elist = (entries.IsValid()) ? &entries : 
nullptr;
 
  321      Error(
"TTreeProcessorMP::Process", 
"[E][C] Could not fork. Aborting operation.");
 
  326   fTaskType = ETask::kProcByRange;
 
  330   std::vector<unsigned> args(
nWorkers);
 
  331   std::iota(args.begin(), args.end(), 0);
 
  334      Error(
"TTreeProcessorMP::Process", 
"[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
 
  346   fTaskType = ETask::kNoTask;
 
  347   return static_cast<retType>(res);
 
 
  409   unsigned code = 
msg.first;
 
  413      if(
msg.second != 
nullptr)
 
  418      Error(
"TTreeProcessorMP::HandlePoolCode", 
"[E][C] a worker encountered an error: %s\n" 
  419                                         "Continuing execution ignoring these entries.", str);
 
  424      Error(
"TTreeProcessorMP::HandlePoolCode", 
"[W][C] unknown code received from server. code=%d", code);
 
 
  437   while (
mon.GetActive() > 0) {
 
  441         Error(
"TTreeProcessorMP::Collect", 
"[E][C] Lost connection to a worker");
 
  443      } 
else if (
msg.first < 1000)
 
 
MPCodeBufPair MPRecv(TSocket *s)
Receive message from a socket.
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
unsigned long long ULong64_t
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
Merge collection of TObjects.
This class provides an interface to process a TTree dataset in parallel with multi-process technology...
ROOT::TypeTraits::InvokeResult_t< F, Args... > InvokeResult_t
~TTreeProcessorMP()=default
unsigned fNProcessed
number of arguments already passed to the workers
void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector< T > &reslist)
Handle message and reply to the worker.
TTreeProcessorMP(const TTreeProcessorMP &)=delete
void Reset()
Reset TTreeProcessorMP's state.
void FixLists(std::vector< TObject * > &lists)
Fix list of lists before merging (to avoid errors about duplicated objects)
void Collect(std::vector< T > &reslist)
Listen for messages sent by the workers and call the appropriate handler function.
TTreeProcessorMP(UInt_t nWorkers=0)
Class constructor.
ETask
A collection of the types of tasks that TTreeProcessorMP can execute.
@ kNoTask
no task is being executed
@ kProcByRange
a Process method is being executed and each worker will process a certain range of each file
@ kProcByFile
a Process method is being executed and each worker will process a different file
ETask fTaskType
the kind of task that is being executed, if any
unsigned fNToProcess
total number of arguments to pass to the workers
void ReplyToIdle(TSocket *s)
Reply to a worker who is idle.
unsigned GetNWorkers() const
void SetNWorkers(unsigned n)
auto Process(const std::vector< std::string > &fileNames, F procFunc, TEntryList &entries, const std::string &treeName="", ULong64_t nToProcess=0, ULong64_t jFirst=0) -> InvokeResult_t< F, std::reference_wrapper< TTreeReader > >
Process a TTree dataset with a functor.
TTreeProcessorMP & operator=(const TTreeProcessorMP &)=delete
A chain is a collection of files containing TTree objects.
A List of entry numbers in a TTree or TChain.
Class that contains a list of TFileInfo's and accumulated meta data information about its entries.
Class describing a generic file including meta information.
THashList implements a hybrid collection class consisting of a hash table and a list to store TObject...
Base class for multiprocess applications' clients.
unsigned GetNWorkers() const
void HandleMPCode(MPCodeBufPair &msg, TSocket *sender)
Handle messages containing an EMPCode.
void SetNWorkers(unsigned n)
Set the number of workers that will be spawned by the next call to Fork()
void Remove(TSocket *s)
Remove a certain socket from the monitor.
A TSelector object is used by the TTree::Draw, TTree::Scan, TTree::Process to navigate in a TTree and...
A TTree represents a columnar dataset.
@ kProcFile
Tell a TMPWorkerTree which tree to process. The object sent is a TreeInfo.
@ kRecvError
Error while reading from the socket.
@ kIdling
We are ready for the next task.
@ kProcRange
Tell a TMPWorkerTree which tree and entries range to process. The object sent is a TreeRangeInfo.
@ kShutdownOrder
Used by the client to tell servers to shutdown.
@ kProcTree
Tell a TMPWorkerTree to process the tree that was passed to it at construction time.
@ kProcError
Tell the client there was an error while processing.
@ kProcResult
The message contains the result of the processing of a TTree.
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...