12 #ifndef ROOT_TProcessExecutor 13 #define ROOT_TProcessExecutor 33 #include <type_traits> 48 template<
class F,
class Cond = noReferenceCond<F>>
51 template<
class F,
class INTEGER,
class Cond = noReferenceCond<F, INTEGER>>
53 template<
class F,
class T,
class Cond = noReferenceCond<F, T>>
56 using TExecutor<TProcessExecutor>::Map;
60 template<
class F>
auto ProcTree(
const std::vector<std::string>& fileNames,
F procFunc,
const std::string& treeName =
"",
ULong64_t nToProcess = 0) ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type;
61 template<
class F>
auto ProcTree(
const std::string& fileName,
F procFunc,
const std::string& treeName =
"",
ULong64_t nToProcess = 0) ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type;
62 template<
class F>
auto ProcTree(
TFileCollection& files,
F procFunc,
const std::string& treeName =
"",
ULong64_t nToProcess = 0) ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type;
63 template<
class F>
auto ProcTree(
TChain& files,
F procFunc,
const std::string& treeName =
"",
ULong64_t nToProcess = 0) ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type;
64 template<
class F>
auto ProcTree(
TTree&
tree,
F procFunc,
ULong64_t nToProcess = 0) ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type;
75 template<
class T,
class R>
T Reduce(
const std::vector<T> &objs,
R redfunc);
78 template<
class T>
void Collect(std::vector<T> &reslist);
81 void FixLists(std::vector<TObject*> &lists);
92 enum class ETask : unsigned char {
111 template<
class F,
class Cond>
114 using retType = decltype(
func());
121 if (nTimes < oldNWorkers)
124 bool ok =
Fork(worker);
128 Error(
"TProcessExecutor::Map",
"[E][C] Could not fork. Aborting operation.");
129 return std::vector<retType>();
134 std::vector<retType> reslist;
135 reslist.reserve(fNToProcess);
152 template<
class F,
class T,
class Cond>
156 using retType = decltype(
func(args.front()));
164 if (args.size() < oldNWorkers)
167 bool ok =
Fork(worker);
171 Error(
"TProcessExecutor::Map",
"[E][C] Could not fork. Aborting operation.");
172 return std::vector<retType>();
177 std::vector<retType> reslist;
178 reslist.reserve(fNToProcess);
179 std::vector<unsigned> range(fNToProcess);
180 std::iota(range.begin(), range.end(), 0);
192 template<
class F,
class INTEGER,
class Cond>
195 std::vector<INTEGER> vargs(args.
size());
196 std::copy(args.
begin(), args.
end(), vargs.begin());
197 const auto &reslist =
Map(func, vargs);
203 template<
class T,
class R>
207 static_assert(std::is_same<decltype(redfunc(objs)),
T>::value,
"redfunc does not have the correct signature");
208 return redfunc(objs);
214 using retType =
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type;
215 static_assert(std::is_constructible<TObject*, retType>::value,
"procFunc must return a pointer to a class inheriting from TObject, and must take a reference to TTreeReader as the only argument");
223 bool ok =
Fork(worker);
225 Error(
"TProcessExecutor::ProcTree",
"[E][C] Could not fork. Aborting operation.");
229 if(fileNames.size() < nWorkers) {
234 std::vector<unsigned> args(nWorkers);
235 std::iota(args.begin(), args.end(), 0);
237 if(fNProcessed < nWorkers)
238 Error(
"TProcessExecutor::ProcTree",
"[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
243 std::vector<unsigned> args(nWorkers);
244 std::iota(args.begin(), args.end(), 0);
246 if(fNProcessed < nWorkers)
247 Error(
"TProcessExecutor::ProcTree",
"[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
251 std::vector<TObject*> reslist;
256 auto res = redfunc(reslist);
261 return static_cast<retType
>(res);
268 std::vector<std::string> singleFileName(1, fileName);
269 return ProcTree(singleFileName, procFunc, treeName, nToProcess);
276 std::vector<std::string> fileNames(files.GetNFiles());
278 for(
auto f : *static_cast<THashList*>(files.GetList()))
279 fileNames[count++] =
static_cast<TFileInfo*
>(
f)->GetCurrentUrl()->GetUrl();
281 return ProcTree(fileNames, procFunc, treeName, nToProcess);
288 TObjArray* filelist = files.GetListOfFiles();
289 std::vector<std::string> fileNames(filelist->
GetEntries());
291 for(
auto f : *filelist)
292 fileNames[count++] =
f->GetTitle();
294 return ProcTree(fileNames, procFunc, treeName, nToProcess);
301 using retType =
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type;
302 static_assert(std::is_constructible<TObject*, retType>::value,
"procFunc must return a pointer to a class inheriting from TObject, and must take a reference to TTreeReader as the only argument");
310 bool ok =
Fork(worker);
312 Error(
"TProcessExecutor::ProcTree",
"[E][C] Could not fork. Aborting operation.");
321 std::vector<unsigned> args(nWorkers);
322 std::iota(args.begin(), args.end(), 0);
324 if(fNProcessed < nWorkers)
325 Error(
"TProcessExecutor::ProcTree",
"[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
328 std::vector<TObject*> reslist;
333 auto res = redfunc(reslist);
338 return static_cast<retType
>(res);
346 unsigned code = msg.first;
348 reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
353 if(msg.second !=
nullptr)
354 reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
357 const char *str = ReadBuffer<const char*>(msg.second.get());
358 Error(
"TProcessExecutor::HandlePoolCode",
"[E][C] a worker encountered an error: %s\n" 359 "Continuing execution ignoring these entries.", str);
364 Error(
"TProcessExecutor::HandlePoolCode",
"[W][C] unknown code received from server. code=%d", code);
381 Error(
"TProcessExecutor::Collect",
"[E][C] Lost connection to a worker");
383 }
else if (msg.first < 1000)
auto Map(F func, unsigned nTimes) -> std::vector< typename std::result_of< F()>::type >
Execute func (with no arguments) nTimes in parallel.
void Collect(std::vector< T > &reslist)
Listen for messages sent by the workers and call the appropriate handler function.
void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector< T > &reslist)
Handle message and reply to the worker.
The message contains the result of a function execution.
auto ProcTree(const std::vector< std::string > &fileNames, F procFunc, const std::string &treeName="", ULong64_t nToProcess=0) -> typename std::result_of< F(std::reference_wrapper< TTreeReader >)>::type
void SetNWorkers(unsigned n)
bool Fork(TMPWorker &server)
This method forks the ROOT session into fNWorkers children processes.
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
Tell a TPoolProcessor to process the tree that was passed to it at construction time.
Tell a TPoolProcessor which tree and entries range to process. The object sent is a TreeRangeInfo...
This namespace contains pre-defined functions to be used in conjuction with TExecutor::Map and TExecu...
unsigned GetNWorkers() const
TProcessExecutor & operator=(const TProcessExecutor &)=delete
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
Error while reading from the socket.
unsigned fNProcessed
number of arguments already passed to the workers
Tell a TPoolProcessor which tree to process. The object sent is a TreeInfo.
We are ready for the next task.
void Remove(TSocket *s)
Remove a certain socket from the monitor.
a Map method with arguments is being executed
Merge collection of TObjects.
TProcessExecutor(unsigned nWorkers=0)
Class constructor.
TSocket * Select()
Return pointer to socket for which an event is waiting.
void ReplyToFuncResult(TSocket *s)
Reply to a worker who just sent a result.
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
Execute function with the argument contained in the message.
a ProcTree method is being executed and each worker will process a certain range of each file ...
a Map method with no arguments is being executed
ETask fTaskType
the kind of task that is being executed, if any
The message contains the result of the processing of a TTree.
no task is being executed
Execute function without arguments.
T Reduce(const std::vector< T > &objs, R redfunc)
Used by the client to tell servers to shutdown.
Tell the client there was an error while processing.
void ReplyToIdle(TSocket *s)
Reply to a worker who is idle.
~TProcessExecutor()=default
unsigned fNToProcess
total number of arguments to pass to the workers
A pseudo container class which is a generator of indices.
unsigned long long ULong64_t
This class works together with TProcessExecutor to allow the execution of functions in server process...
double func(double *x, double *p)
Base class for multiprocess applications' clients.
void SetNWorkers(unsigned n)
Set the number of workers that will be spawned by the next call to Fork()
void Reset()
Reset TProcessExecutor's state.
void HandleMPCode(MPCodeBufPair &msg, TSocket *sender)
Handle messages containing an EMPCode.
a ProcTree method is being executed and each worker will process a different file ...
Class that contains a list of TFileInfo's and accumulated meta data information about its entries...
A chain is a collection of files containg TTree objects.
Int_t GetEntries() const
Return the number of objects in array (i.e.
A TTree object has a header with a name and a title.
Class describing a generic file including meta information.
virtual void ActivateAll()
Activate all de-activated sockets.
unsigned Broadcast(unsigned code, unsigned nMessages=0)
Send a message with the specified code to at most nMessages workers.
A TSelector object is used by the TTree::Draw, TTree::Scan, TTree::Process to navigate in a TTree and...
ETask
A collection of the types of tasks that TProcessExecutor can execute.
void FixLists(std::vector< TObject *> &lists)
Fix list of lists before merging (to avoid errors about duplicated objects)
void ReapWorkers()
Wait on worker processes and remove their pids from fWorkerPids.
unsigned GetNWorkers() const
MPCodeBufPair MPRecv(TSocket *s)
Receive message from a socket.
void Error(ErrorHandler_t func, int code, const char *va_(fmt),...)
Write error message and call a handler, if required.