13#ifndef ROOT_TProcessExecutor 
   14#define ROOT_TProcessExecutor 
   40   template <
typename F, 
typename... Args>
 
   58   template<
class F, 
class R, 
class Cond = noReferenceCond<F>>
 
   60   template<
class F, 
class T, 
class R, 
class Cond = noReferenceCond<F, T>>
 
   62   template<
class F, 
class T, 
class R, 
class Cond = noReferenceCond<F, T>>
 
   80   template<
class F, 
class Cond = noReferenceCond<F>>
 
   81   auto MapImpl(
F func, 
unsigned nTimes) -> std::vector<InvokeResult_t<F>>;
 
   82   template<
class F, 
class INTEGER, 
class Cond = noReferenceCond<F, INTEGER>>
 
   84   template<
class F, 
class T, 
class Cond = noReferenceCond<F, T>>
 
   85   auto MapImpl(
F func, std::vector<T> &args) -> std::vector<InvokeResult_t<F, T>>;
 
   86   template<
class F, 
class T, 
class Cond = noReferenceCond<F, T>>
 
   87   auto MapImpl(
F func, 
const std::vector<T> &args) -> std::vector<InvokeResult_t<F, T>>;
 
   89   template<
class T> 
void Collect(std::vector<T> &reslist);
 
  121template<
class F, 
class Cond>
 
  124   using retType = 
decltype(func());
 
  127   fTaskType = ETask::kMap;
 
  130   unsigned oldNWorkers = GetPoolSize();
 
  131   if (nTimes < oldNWorkers)
 
  134   bool ok = Fork(worker);
 
  135   SetNWorkers(oldNWorkers);
 
  138      Error(
"TProcessExecutor::Map", 
"[E][C] Could not fork. Aborting operation.");
 
  139      return std::vector<retType>();
 
  143   fNToProcess = nTimes;
 
  144   std::vector<retType> reslist;
 
  145   reslist.reserve(fNToProcess);
 
  153   fTaskType = ETask::kNoTask;
 
  162template<
class F, 
class T, 
class Cond>
 
  166   using retType = 
decltype(func(args.front()));
 
  169   fTaskType = ETask::kMapWithArg;
 
  173   unsigned oldNWorkers = GetPoolSize();
 
  174   if (args.size() < oldNWorkers)
 
  175      SetNWorkers(args.size());
 
  177   bool ok = Fork(worker);
 
  178   SetNWorkers(oldNWorkers);
 
  181      Error(
"TProcessExecutor::Map", 
"[E][C] Could not fork. Aborting operation.");
 
  182      return std::vector<retType>();
 
  186   fNToProcess = args.size();
 
  187   std::vector<retType> reslist;
 
  188   reslist.reserve(fNToProcess);
 
  189   std::vector<unsigned> range(fNToProcess);
 
  190   std::iota(range.begin(), range.end(), 0);
 
  198   fTaskType = ETask::kNoTask;
 
  207template<
class F, 
class T, 
class Cond>
 
  211   using retType = 
decltype(func(args.front()));
 
  214   fTaskType = ETask::kMapWithArg;
 
  218   unsigned oldNWorkers = GetPoolSize();
 
  219   if (args.size() < oldNWorkers)
 
  220      SetNWorkers(args.size());
 
  222   bool ok = Fork(worker);
 
  223   SetNWorkers(oldNWorkers);
 
  226      Error(
"TProcessExecutor::Map", 
"[E][C] Could not fork. Aborting operation.");
 
  227      return std::vector<retType>();
 
  231   fNToProcess = args.size();
 
  232   std::vector<retType> reslist;
 
  233   reslist.reserve(fNToProcess);
 
  234   std::vector<unsigned> range(fNToProcess);
 
  235   std::iota(range.begin(), range.end(), 0);
 
  243   fTaskType = ETask::kNoTask;
 
  252template<
class F, 
class INTEGER, 
class Cond>
 
  255   std::vector<INTEGER> vargs(args.size());
 
  256   std::copy(args.begin(), args.end(), vargs.begin());
 
  257   const auto &reslist = Map(func, vargs);
 
  264template<
class F, 
class R, 
class Cond>
 
  267   using retType = 
decltype(func());
 
  270   fTaskType= ETask::kMapRed;
 
  273   unsigned oldNWorkers = GetPoolSize();
 
  274   if (nTimes < oldNWorkers)
 
  277   bool ok = Fork(worker);
 
  278   SetNWorkers(oldNWorkers);
 
  280      std::cerr << 
"[E][C] Could not fork. Aborting operation\n";
 
  285   fNToProcess = nTimes;
 
  286   std::vector<retType> reslist;
 
  287   reslist.reserve(fNToProcess);
 
  295   fTaskType= ETask::kNoTask;
 
  296   return redfunc(reslist);
 
  304template<
class F, 
class T, 
class R, 
class Cond>
 
  308   using retType = 
decltype(func(args.front()));
 
  311   fTaskType= ETask::kMapRedWithArg;
 
  314   unsigned oldNWorkers = GetPoolSize();
 
  315   if (args.size() < oldNWorkers)
 
  316      SetNWorkers(args.size());
 
  318   bool ok = Fork(worker);
 
  319   SetNWorkers(oldNWorkers);
 
  321      std::cerr << 
"[E][C] Could not fork. Aborting operation\n";
 
  322      return decltype(func(args.front()))();
 
  326   fNToProcess = args.size();
 
  327   std::vector<retType> reslist;
 
  328   reslist.reserve(fNToProcess);
 
  329   std::vector<unsigned> range(fNToProcess);
 
  330   std::iota(range.begin(), range.end(), 0);
 
  337   fTaskType= ETask::kNoTask;
 
  338   return Reduce(reslist, redfunc);
 
  346template<
class F, 
class T, 
class R, 
class Cond>
 
  350   using retType = 
decltype(func(args.front()));
 
  353   fTaskType= ETask::kMapRedWithArg;
 
  356   unsigned oldNWorkers = GetPoolSize();
 
  357   if (args.size() < oldNWorkers)
 
  358      SetNWorkers(args.size());
 
  360   bool ok = Fork(worker);
 
  361   SetNWorkers(oldNWorkers);
 
  363      std::cerr << 
"[E][C] Could not fork. Aborting operation\n";
 
  364      return decltype(func(args.front()))();
 
  368   fNToProcess = args.size();
 
  369   std::vector<retType> reslist;
 
  370   reslist.reserve(fNToProcess);
 
  371   std::vector<unsigned> range(fNToProcess);
 
  372   std::iota(range.begin(), range.end(), 0);
 
  379   fTaskType= ETask::kNoTask;
 
  380   return Reduce(reslist, redfunc);
 
  388   unsigned code = msg.first;
 
  390      reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
 
  395      if(msg.second != 
nullptr)
 
  396         reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
 
  399      const char *str = ReadBuffer<const char*>(msg.second.get());
 
  400      Error(
"TProcessExecutor::HandlePoolCode", 
"[E][C] a worker encountered an error: %s\n" 
  401                                         "Continuing execution ignoring these entries.", str);
 
  406      Error(
"TProcessExecutor::HandlePoolCode", 
"[W][C] unknown code received from server. code=%d", code);
 
  423         Error(
"TProcessExecutor::Collect", 
"[E][C] Lost connection to a worker");
 
  425      } 
else if (msg.first < 1000)
 
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
MPCodeBufPair MPRecv(TSocket *s)
Receive message from a socket.
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
This class defines an interface to execute the same task multiple times, possibly in parallel and wit...
auto Map(F func, unsigned nTimes) -> std::vector< InvokeResult_t< F > >
Execute a function without arguments several times.
T * Reduce(const std::vector< T * > &mergeObjs)
"Reduce" an std::vector into a single object by using the object's Merge method.
This class provides a simple interface to execute the same task multiple times in parallel,...
ETask fTaskType
the kind of task that is being executed, if any
unsigned GetPoolSize() const
Return the number of pooled parallel workers.
ETask
A collection of the types of tasks that TProcessExecutor can execute.
@ kNoTask
no task is being executed
@ kMapWithArg
a Map method with arguments is being executed
@ kMapRed
a MapReduce method with no arguments is being executed
@ kMapRedWithArg
a MapReduce method with arguments is being executed
@ kMap
a Map method with no arguments is being executed
TProcessExecutor & operator=(const TProcessExecutor &)=delete
void ReplyToFuncResult(TSocket *s)
Reply to a worker who just sent a result.
unsigned fNProcessed
number of arguments already passed to the workers
ROOT::TypeTraits::InvokeResult_t< F, Args... > InvokeResult_t
void Collect(std::vector< T > &reslist)
Listen for messages sent by the workers and call the appropriate handler function.
TProcessExecutor(const TProcessExecutor &)=delete
auto MapReduce(F func, unsigned nTimes, R redfunc) -> InvokeResult_t< F >
Execute a function nTimes in parallel (Map) and accumulate the results into a single value (Reduce).
unsigned fNToProcess
total number of arguments to pass to the workers
auto MapImpl(F func, unsigned nTimes) -> std::vector< InvokeResult_t< F > >
Execute a function without arguments several times in parallel.
void Reset()
Reset TProcessExecutor's state.
void SetNWorkers(unsigned n)
void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector< T > &reslist)
Handle message and reply to the worker.
~TProcessExecutor()=default
void ReplyToIdle(TSocket *s)
Reply to a worker who is idle.
A pseudo container class which is a generator of indices.
Base class for multiprocess applications' clients.
unsigned GetNWorkers() const
void HandleMPCode(MPCodeBufPair &msg, TSocket *sender)
Handle messages containing an EMPCode.
void SetNWorkers(unsigned n)
Set the number of workers that will be spawned by the next call to Fork()
void Remove(TSocket *s)
Remove a certain socket from the monitor.
This class works together with TProcessExecutor to allow the execution of functions in server process...
virtual void ActivateAll()
Activate all de-activated sockets.
TSocket * Select()
Return pointer to socket for which an event is waiting.
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
@ kRecvError
Error while reading from the socket.
@ kIdling
We are ready for the next task.
@ kFuncResult
The message contains the result of a function execution.
@ kExecFuncWithArg
Execute function with the argument contained in the message.
@ kShutdownOrder
Used by the client to tell servers to shutdown.
@ kProcError
Tell the client there was an error while processing.
@ kExecFunc
Execute function without arguments.
@ kProcResult
The message contains the result of the processing of a TTree.
This file contains a specialised ROOT message handler to test for diagnostic in unit tests.