12 #ifndef ROOT_TProcPool
13 #define ROOT_TProcPool
31 #include <initializer_list>
32 #include <type_traits>
40 explicit TProcPool(
unsigned nWorkers = 0);
49 template<
class F>
auto Map(
F func,
unsigned nTimes) -> std::vector<decltype(func())>;
50 template<
class F,
class T>
auto Map(
F func,
T &args) -> std::vector < decltype(++(args.begin()), args.end(),
func(args.front())) >;
53 template<
class F,
class T>
auto Map(
F func, std::initializer_list<T> args) -> std::vector<decltype(
func(*args.begin()))>;
54 template<
class F,
class T>
auto Map(
F func, std::vector<T> &args) -> std::vector<decltype(
func(args.front()))>;
61 template<
class F,
class R>
auto MapReduce(
F func,
unsigned nTimes,
R redfunc) -> decltype(
func());
62 template<
class F,
class T,
class R>
auto MapReduce(
F func,
T &args,
R redfunc) -> decltype(++(args.begin()), args.end(),
func(args.front()));
65 template<
class F,
class T,
class R>
auto MapReduce(
F func, std::initializer_list<T> args,
R redfunc) -> decltype(
func(*args.begin()));
66 template<
class F,
class T,
class R>
auto MapReduce(
F func, std::vector<T> &args,
R redfunc) -> decltype(
func(args.front()));
71 template<
class F>
auto ProcTree(
const std::vector<std::string>& fileNames,
F procFunc,
const std::string& treeName =
"",
ULong64_t nToProcess = 0) ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type;
72 template<
class F>
auto ProcTree(
const std::string& fileName,
F procFunc,
const std::string& treeName =
"",
ULong64_t nToProcess = 0) ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type;
73 template<
class F>
auto ProcTree(
TFileCollection& files,
F procFunc,
const std::string& treeName =
"",
ULong64_t nToProcess = 0) ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type;
74 template<
class F>
auto ProcTree(
TChain& files,
F procFunc,
const std::string& treeName =
"",
ULong64_t nToProcess = 0) ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type;
75 template<
class F>
auto ProcTree(
TTree&
tree,
F procFunc,
ULong64_t nToProcess = 0) ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type;
81 template<
class T>
void Collect(std::vector<T> &reslist);
85 template<
class T,
class R>
T Reduce(
const std::vector<T> &objs,
R redfunc);
117 using retType = decltype(
func());
123 unsigned oldNWorkers = GetNWorkers();
124 if (nTimes < oldNWorkers)
127 bool ok = Fork(worker);
128 SetNWorkers(oldNWorkers);
131 std::cerr <<
"[E][C] Could not fork. Aborting operation\n";
132 return std::vector<retType>();
136 fNToProcess = nTimes;
137 std::vector<retType> reslist;
138 reslist.reserve(fNToProcess);
146 fTask = ETask::kNoTask;
158 template<
class F,
class T>
159 auto TProcPool::Map(
F func,
T &args) -> std::vector < decltype(++(args.begin()), args.end(),
func(args.front())) >
161 std::vector<typename T::value_type> vargs(
162 std::make_move_iterator(std::begin(args)),
163 std::make_move_iterator(std::end(args))
165 const auto &reslist = Map(func, vargs);
177 "func should return a pointer to TObject or derived classes");
180 std::vector<TObject *> vargs(args.
GetSize());
181 auto it = vargs.begin();
182 for (
auto o : args) {
188 const auto &reslist =
Map(func, vargs);
192 for (
const auto &res : reslist)
198 template<
class F,
class T>
199 auto TProcPool::Map(
F func, std::initializer_list<T> args) -> std::vector<decltype(
func(*args.begin()))>
201 std::vector<T> vargs(std::move(args));
202 const auto &reslist = Map(func, vargs);
209 template<
class F,
class T>
210 auto TProcPool::Map(
F func, std::vector<T> &args) -> std::vector<decltype(
func(args.front()))>
213 using retType = decltype(
func(args.front()));
216 fTask = ETask::kMapWithArg;
220 unsigned oldNWorkers = GetNWorkers();
221 if (args.size() < oldNWorkers)
222 SetNWorkers(args.size());
224 bool ok = Fork(worker);
225 SetNWorkers(oldNWorkers);
228 std::cerr <<
"[E][C] Could not fork. Aborting operation\n";
229 return std::vector<retType>();
233 fNToProcess = args.size();
234 std::vector<retType> reslist;
235 reslist.reserve(fNToProcess);
236 std::vector<unsigned> range(fNToProcess);
237 std::iota(range.begin(), range.end(), 0);
245 fTask = ETask::kNoTask;
258 template<
class F,
class R>
261 using retType = decltype(
func());
264 fTask = ETask::kMapRed;
267 unsigned oldNWorkers = GetNWorkers();
268 if (nTimes < oldNWorkers)
271 bool ok = Fork(worker);
272 SetNWorkers(oldNWorkers);
274 std::cerr <<
"[E][C] Could not fork. Aborting operation\n";
279 fNToProcess = nTimes;
280 std::vector<retType> reslist;
281 reslist.reserve(fNToProcess);
289 fTask = ETask::kNoTask;
290 return redfunc(reslist);
299 template<
class F,
class T,
class R>
302 std::vector<typename T::value_type> vargs(
303 std::make_move_iterator(std::begin(args)),
304 std::make_move_iterator(std::end(args))
306 const auto &reslist = MapReduce(func, vargs, redfunc);
311 template<
class F,
class R>
315 std::vector<TObject *> vargs(args.GetSize());
316 auto it = vargs.begin();
317 for (
auto o : args) {
323 auto res = MapReduce(func, vargs, redfunc);
329 template<
class F,
class T,
class R>
332 std::vector<T> vargs(std::move(args));
333 const auto &reslist = MapReduce(func, vargs, redfunc);
338 template<
class F,
class T,
class R>
341 using retType = decltype(
func(args.front()));
344 fTask = ETask::kMapRedWithArg;
347 unsigned oldNWorkers = GetNWorkers();
348 if (args.size() < oldNWorkers)
349 SetNWorkers(args.size());
351 bool ok = Fork(worker);
352 SetNWorkers(oldNWorkers);
354 std::cerr <<
"[E][C] Could not fork. Aborting operation\n";
359 fNToProcess = args.size();
360 std::vector<retType> reslist;
361 reslist.reserve(fNToProcess);
362 std::vector<unsigned> range(fNToProcess);
363 std::iota(range.begin(), range.end(), 0);
370 fTask = ETask::kNoTask;
371 return redfunc(reslist);
377 auto TProcPool::ProcTree(
const std::vector<std::string>& fileNames,
F procFunc,
const std::string& treeName,
ULong64_t nToProcess) ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type
379 using retType =
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type;
384 unsigned nWorkers = GetNWorkers();
388 bool ok = Fork(worker);
390 std::cerr <<
"[E][C] Could not fork. Aborting operation\n";
394 if(fileNames.size() < nWorkers) {
396 fTask = ETask::kProcByRange;
398 fNToProcess = nWorkers*fileNames.size();
399 std::vector<unsigned> args(nWorkers);
400 std::iota(args.begin(), args.end(), 0);
402 if(fNProcessed < nWorkers)
403 std::cerr <<
"[E][C] There was an error while sending tasks to workers. Some entries might not be processed.\n";
406 fTask = ETask::kProcByFile;
407 fNToProcess = fileNames.size();
408 std::vector<unsigned> args(nWorkers);
409 std::iota(args.begin(), args.end(), 0);
411 if(fNProcessed < nWorkers)
412 std::cerr <<
"[E][C] There was an error while sending tasks to workers. Some entries might not be processed.\n";
416 std::vector<TObject*> reslist;
424 fTask = ETask::kNoTask;
425 return static_cast<retType
>(res);
430 auto TProcPool::ProcTree(
const std::string& fileName,
F procFunc,
const std::string& treeName,
ULong64_t nToProcess) ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type
432 std::vector<std::string> singleFileName(1, fileName);
433 return ProcTree(singleFileName, procFunc, treeName, nToProcess);
440 std::vector<std::string> fileNames(files.GetNFiles());
442 for(
auto f : *static_cast<THashList*>(files.GetList()))
443 fileNames[count++] =
static_cast<TFileInfo*
>(
f)->GetCurrentUrl()->GetFile();
445 return ProcTree(fileNames, procFunc, treeName, nToProcess);
452 TObjArray* filelist = files.GetListOfFiles();
453 std::vector<std::string> fileNames(filelist->
GetEntries());
455 for(
auto f : *filelist)
458 return ProcTree(fileNames, procFunc, treeName, nToProcess);
465 using retType =
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type;
470 unsigned nWorkers = GetNWorkers();
474 bool ok = Fork(worker);
476 std::cerr <<
"[E][C] Could not fork. Aborting operation\n";
481 fTask = ETask::kProcByRange;
484 fNToProcess = nWorkers;
485 std::vector<unsigned> args(nWorkers);
486 std::iota(args.begin(), args.end(), 0);
488 if(fNProcessed < nWorkers)
489 std::cerr <<
"[E][C] There was an error while sending tasks to workers. Some entries might not be processed.\n";
492 std::vector<TObject*> reslist;
500 fTask = ETask::kNoTask;
501 return static_cast<retType
>(res);
517 std::cerr <<
"[E][C] Lost connection to a worker\n";
519 }
else if (msg.first < 1000)
532 unsigned code = msg.first;
534 reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
539 if(msg.second !=
nullptr)
540 reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
543 const char *str = ReadBuffer<const char*>(msg.second.get());
544 std::cerr <<
"[E][C] a worker encountered an error: " << str <<
"\n"
545 <<
"Continuing execution ignoring these entries.\n";
550 std::cerr <<
"[W][C] unknown code received from server. code=" << code <<
"\n";
555 template<
class T,
class R>
559 static_assert(std::is_same<decltype(redfunc(objs)),
T>::
value,
"redfunc does not have the correct signature");
561 return redfunc(objs);
a MapReduce method with no arguments is being executed
virtual const char * GetTitle() const
Returns title of object.
The message contains the result of a function execution.
TObject * ReduceObjects(const std::vector< TObject * > &objs)
Merge collection of TObjects.
Tell a TPoolProcessor to process the tree that was passed to it at construction time.
T Reduce(const std::vector< T > &objs, R redfunc)
Check that redfunc has the right signature and call it on objs.
Tell a TPoolProcessor which tree and entries range to process. The object sent is a TreeRangeInfo...
void ReplyToIdle(TSocket *s)
Reply to a worker who is idle.
void Collect(std::vector< T > &reslist)
Listen for messages sent by the workers and call the appropriate handler function.
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
a MapReduce method with arguments is being executed
ETask
A collection of the types of tasks that TProcPool can execute.
auto Map(F func, unsigned nTimes) -> std::vector< decltype(func())>
Execute func (with no arguments) nTimes in parallel.
Error while reading from the socket.
Tell a TPoolProcessor which tree to process. The object sent is a TreeInfo.
We are ready for the next task.
no task is being executed
unsigned GetNWorkers() const
void Remove(TSocket *s)
Remove a certain socket from the monitor.
unsigned GetNWorkers() const
enum TProcPool::ETask fTask
the kind of task that is being executed, if any
a ProcTree method is being executed and each worker will process a certain range of each file ...
auto ProcTree(const std::vector< std::string > &fileNames, F procFunc, const std::string &treeName="", ULong64_t nToProcess=0) -> typename std::result_of< F(std::reference_wrapper< TTreeReader >)>::type
unsigned fNToProcess
total number of arguments to pass to the workers
This class provides a simple interface to execute the same task multiple times in parallel...
TSocket * Select()
Return pointer to socket for which an event is waiting.
TProcPool(unsigned nWorkers=0)
Class constructor.
Collection abstract base class.
TProcPool & operator=(const TProcPool &)=delete
Execute function with the argument contained in the message.
void Reset(Detail::TBranchProxy *x)
The message contains the result of the processing of a TTree.
void SetNWorkers(unsigned n)
std::pair< unsigned, std::unique_ptr< TBufferFile >> MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
Execute function without arguments.
Used by the client to tell servers to shutdown.
Tell the client there was an error while processing.
virtual Int_t GetSize() const
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
unsigned long long ULong64_t
This class works together with TProcPool to allow the execution of functions in server processes...
double func(double *x, double *p)
Base class for multiprocess applications' clients.
void SetNWorkers(unsigned n)
Set the number of workers that will be spawned by the next call to Fork()
Int_t GetEntries() const
Return the number of objects in array (i.e.
void HandleMPCode(MPCodeBufPair &msg, TSocket *sender)
Handle messages containing an EMPCode.
a ProcTree method is being executed and each worker will process a different file ...
Mother of all ROOT objects.
void ReplyToFuncResult(TSocket *s)
Reply to a worker who just sent a result.
a Map method with no arguments is being executed
auto MapReduce(F func, unsigned nTimes, R redfunc) -> decltype(func())
This method behaves just like Map, but an additional redfunc function must be provided.
void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector< T > &reslist)
Handle message and reply to the worker.
Class that contains a list of TFileInfo's and accumulated meta data information about its entries...
A chain is a collection of files containg TTree objects.
A TTree object has a header with a name and a title.
a Map method with arguments is being executed
Class describing a generic file including meta information.
virtual void ActivateAll()
Activate all de-activated sockets.
MPCodeBufPair MPRecv(TSocket *s)
Receive message from a socket.
void Reset()
Reset TProcPool's state.
unsigned fNProcessed
number of arguments already passed to the workers