13 #ifndef ROOT_TProcessExecutor    14 #define ROOT_TProcessExecutor    31 #include <type_traits>     47    template<
class F, 
class Cond = noReferenceCond<F>>
    49    template<
class F, 
class INTEGER, 
class Cond = noReferenceCond<F, INTEGER>>
    51    template<
class F, 
class T, 
class Cond = noReferenceCond<F, T>>
    58    template<
class F, 
class R, 
class Cond = noReferenceCond<F>>
    60    template<
class F, 
class T, 
class R, 
class Cond = noReferenceCond<F, T>>
    64    template<
class T, 
class R> 
T Reduce(
const std::vector<T> &objs, 
R redfunc);
    67    template<
class T> 
void Collect(std::vector<T> &reslist);
    80    enum class ETask : unsigned char {
    99 template<
class F, 
class Cond>
   102    using retType = decltype(func());
   109    if (nTimes < oldNWorkers)
   112    bool ok = 
Fork(worker);
   116       Error(
"TProcessExecutor::Map", 
"[E][C] Could not fork. Aborting operation.");
   117       return std::vector<retType>();
   122    std::vector<retType> reslist;
   123    reslist.reserve(fNToProcess);
   141 template<
class F, 
class T, 
class Cond>
   145    using retType = decltype(func(args.front()));
   153    if (args.size() < oldNWorkers)
   156    bool ok = 
Fork(worker);
   160       Error(
"TProcessExecutor::Map", 
"[E][C] Could not fork. Aborting operation.");
   161       return std::vector<retType>();
   166    std::vector<retType> reslist;
   167    reslist.reserve(fNToProcess);
   168    std::vector<unsigned> range(fNToProcess);
   169    std::iota(range.begin(), range.end(), 0);
   185 template<
class F, 
class INTEGER, 
class Cond>
   188    std::vector<INTEGER> vargs(args.size());
   189    std::copy(args.begin(), args.end(), vargs.begin());
   190    const auto &reslist = 
Map(func, vargs);
   200 template<
class F, 
class R, 
class Cond>
   203    using retType = decltype(func());
   210    if (nTimes < oldNWorkers)
   213    bool ok = 
Fork(worker);
   216       std::cerr << 
"[E][C] Could not fork. Aborting operation\n";
   222    std::vector<retType> reslist;
   232    return redfunc(reslist);
   241 template<
class F, 
class T, 
class R, 
class Cond>
   245    using retType = decltype(func(args.front()));
   252    if (args.size() < oldNWorkers)
   255    bool ok = 
Fork(worker);
   258       std::cerr << 
"[E][C] Could not fork. Aborting operation\n";
   259       return decltype(func(args.front()))();
   264    std::vector<retType> reslist;
   267    std::iota(range.begin(), range.end(), 0);
   275    return Reduce(reslist, redfunc);
   281 template<
class T, 
class R>
   285    static_assert(std::is_same<decltype(redfunc(objs)), 
T>::value, 
"redfunc does not have the correct signature");
   286    return redfunc(objs);
   294    unsigned code = msg.first;
   296       reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
   301       if(msg.second != 
nullptr)
   302          reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
   305       const char *str = ReadBuffer<const char*>(msg.second.get());
   306       Error(
"TProcessExecutor::HandlePoolCode", 
"[E][C] a worker encountered an error: %s\n"   307                                          "Continuing execution ignoring these entries.", str);
   312       Error(
"TProcessExecutor::HandlePoolCode", 
"[W][C] unknown code received from server. code=%d", code);
   329          Error(
"TProcessExecutor::Collect", 
"[E][C] Lost connection to a worker");
   331       } 
else if (msg.first < 1000)
 
TProcessExecutor(unsigned nWorkers=0)
Class constructor. 
void SetNWorkers(unsigned n)
bool Fork(TMPWorker &server)
This method forks the ROOT session into fNWorkers children processes. 
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list. 
Namespace for new ROOT classes and functions. 
This class works together with TProcessExecutor to allow the execution of functions in server process...
unsigned GetNWorkers() const
TProcessExecutor & operator=(const TProcessExecutor &)=delete
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket. 
Error while reading from the socket. 
This class defines an interface to execute the same task multiple times in parallel, possibly with different arguments every time. 
unsigned fNProcessed
number of arguments already passed to the workers 
void Remove(TSocket *s)
Remove a certain socket from the monitor. 
void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector< T > &reslist)
Handle message and reply to the worker. 
void ReplyToFuncResult(TSocket *s)
Reply to a worker who just sent a result. 
a Map method with arguments is being executed 
void ReplyToIdle(TSocket *s)
Reply to a worker who is idle. 
The message contains the result of the processing of a TTree. 
void Reset()
Reset TProcessExecutor's state. 
TSocket * Select()
Return pointer to socket for which an event is waiting. 
a MapReduce method with arguments is being executed 
The message contains the result of a function execution. 
This class provides a simple interface to execute the same task multiple times in parallel...
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message. 
Execute function with the argument contained in the message. 
a Map method with no arguments is being executed 
ETask fTaskType
the kind of task that is being executed, if any 
no task is being executed 
Used by the client to tell servers to shutdown. 
~TProcessExecutor()=default
unsigned fNToProcess
total number of arguments to pass to the workers 
A pseudo container class which is a generator of indices. 
Base class for multiprocess applications' clients. 
static constexpr double s
void SetNWorkers(unsigned n)
Set the number of workers that will be spawned by the next call to Fork() 
void HandleMPCode(MPCodeBufPair &msg, TSocket *sender)
Handle messages containing an EMPCode. 
a MapReduce method with no arguments is being executed 
auto Map(F func, unsigned nTimes) -> std::vector< typename std::result_of< F()>::type >
Execute func (with no arguments) nTimes in parallel. 
auto MapReduce(F func, unsigned nTimes, R redfunc) -> typename std::result_of< F()>::type
This method behaves just like Map, but an additional redfunc function must be provided. 
virtual void ActivateAll()
Activate all de-activated sockets. 
unsigned Broadcast(unsigned code, unsigned nMessages=0)
Send a message with the specified code to at most nMessages workers. 
Tell the client there was an error while processing. 
ETask
A collection of the types of tasks that TProcessExecutor can execute. 
We are ready for the next task. 
void ReapWorkers()
Wait on worker processes and remove their pids from fWorkerPids. 
unsigned GetNWorkers() const
MPCodeBufPair MPRecv(TSocket *s)
Receive message from a socket. 
void Collect(std::vector< T > &reslist)
Listen for messages sent by the workers and call the appropriate handler function. 
void Error(ErrorHandler_t func, int code, const char *va_(fmt),...)
Write error message and call a handler, if required. 
Execute function without arguments. 
T Reduce(const std::vector< T > &objs, R redfunc)
"Reduce" an std::vector into a single object by passing a function as the second argument defining th...