Logo ROOT  
Reference Guide
TMPWorkerExecutor.h
Go to the documentation of this file.
1/* @(#)root/multiproc:$Id$ */
2// Author: Enrico Guiraud July 2015
3
4/*************************************************************************
5 * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12#ifndef ROOT_TMPWorkerExecutor
13#define ROOT_TMPWorkerExecutor
14
15#include "MPCode.h"
16#include "MPSendRecv.h"
17#include "PoolUtils.h"
18#include "TMPWorker.h"
19#include <string>
20#include <vector>
21
22//////////////////////////////////////////////////////////////////////////
23///
24/// \class TMPWorkerExecutor
25///
26/// This class works together with TProcessExecutor to allow the execution of
27/// functions in server processes. Depending on the exact task that the
28/// worker is required to execute, a different version of the class
29/// can be called.
30///
31/// ### TMPWorkerExecutor<F, T, R>
32/// The most general case, used by
33/// TProcessExecutor::MapReduce(F func, T& args, R redfunc).
34/// This worker is build with:
35/// * a function of signature F (the one to be executed)
36/// * a collection of arguments of type T on which to apply the function
37/// * a reduce function with signature R to be used to squash many
38/// returned values together.
39///
40/// ### Partial specializations
41/// A few partial specializations are provided for less general cases:
42/// * TMPWorkerExecutor<F, T, void> handles the case of a function that takes
43/// one argument and does not perform reduce operations
44/// (TProcessExecutor::Map(F func, T& args)).
45/// * TMPWorkerExecutor<F, void, R> handles the case of a function that takes
46/// no arguments, to be executed a specified amount of times, which
47/// returned values are squashed together (reduced)
48/// (TProcessExecutor::Map(F func, unsigned nTimes, R redfunc))
49/// * TMPWorkerExecutor<F, void, void> handles the case of a function that takes
50/// no arguments and whose arguments are not "reduced"
51/// (TProcessExecutor::Map(F func, unsigned nTimes))
52///
53/// Since all the important data are passed to TMPWorkerExecutor at construction
54/// time, the kind of messages that client and workers have to exchange
55/// are usually very simple.
56///
57//////////////////////////////////////////////////////////////////////////
58
59// Quick guide to TMPWorkerExecutor:
60// For each TProcessExecutor::Map and TProcessExecutor::MapReduce signature
61// there's a corresponding
62// specialization of TMPWorkerExecutor:
63// * Map(func, nTimes) --> TMPWorkerExecutor<F, void, void>
64// * Map(func, args) --> TMPWorkerExecutor<F, T, void>
65// * MapReduce(func, nTimes, redfunc) --> TMPWorkerExecutor<F, void, R>
66// * MapReduce(func, args, redfunc) --> TMPWorkerExecutor<F, T, R>
67// I thought about having four different classes (with different names)
68// instead of four specializations of the same class template, but it really
69// makes no difference in the end since the different classes would be class
70// templates anyway, and I would have to find a meaningful name for each one.
71// About code replication: looking carefully, it can be noticed that there's
72// very little code replication since the different versions of TMPWorkerExecutor
73// all behave slightly differently, in incompatible ways (e.g. they all need
74// different data members, different signatures for the ctors, and so on).
75
76template<class F, class T = void, class R = void>
78public:
79 // TProcessExecutor is in charge of checking the signatures for incompatibilities:
80 // we trust that decltype(redfunc(std::vector<decltype(func(args[0]))>)) == decltype(args[0])
81 // TODO document somewhere that fReducedResult must have a default ctor
82 TMPWorkerExecutor(F func, const std::vector<T> &args, R redfunc) :
83 TMPWorker(), fFunc(func), fArgs(args), fRedFunc(redfunc),
85 {}
87
88 void HandleInput(MPCodeBufPair &msg) ///< Execute instructions received from a TProcessExecutor client
89 {
90 unsigned code = msg.first;
91 TSocket *s = GetSocket();
92 std::string reply = "S" + std::to_string(GetNWorker());
93 if (code == MPCode::kExecFuncWithArg) {
94 unsigned n;
95 msg.second->ReadUInt(n);
96 // execute function on argument n
97 const auto &res = fFunc(fArgs[n]);
98 // tell client we're done
100 // reduce arguments if possible
101 if (fCanReduce) {
102 using FINAL = decltype(fReducedResult);
103 using ORIGINAL = decltype(fRedFunc({res, fReducedResult}));
104 fReducedResult = ROOT::Internal::PoolUtils::ResultCaster<ORIGINAL, FINAL>::CastIfNeeded(fRedFunc({res, fReducedResult})); //TODO try not to copy these into a vector, do everything by ref. std::vector<T&>?
105 } else {
106 fCanReduce = true;
107 fReducedResult = res;
108 }
109 } else if (code == MPCode::kSendResult) {
111 } else {
112 reply += ": unknown code received: " + std::to_string(code);
113 MPSend(s, MPCode::kError, reply.c_str());
114 }
115 }
116
117private:
118 F fFunc; ///< the function to be executed
119 std::vector<T> fArgs; ///< a vector containing the arguments that must be passed to fFunc
120 R fRedFunc; ///< the reduce function
121 decltype(fFunc(fArgs.front())) fReducedResult; ///< the result of the execution
122 bool fCanReduce; ///< true if fReducedResult can be reduced with a new result, false until we have produced one result
123};
124
125
126template<class F, class R>
127class TMPWorkerExecutor<F, void, R> : public TMPWorker {
128public:
129 TMPWorkerExecutor(F func, R redfunc) :
130 TMPWorker(), fFunc(func), fRedFunc(redfunc),
131 fReducedResult(), fCanReduce(false)
132 {}
134
135 void HandleInput(MPCodeBufPair &msg) ///< Execute instructions received from a TProcessExecutor client
136 {
137 unsigned code = msg.first;
138 TSocket *s = GetSocket();
139 std::string reply = "S" + std::to_string(GetNWorker());
140 if (code == MPCode::kExecFunc) {
141 // execute function
142 const auto &res = fFunc();
143 // tell client we're done
145 // reduce arguments if possible
146 if (fCanReduce) {
147 fReducedResult = fRedFunc({res, fReducedResult}); //TODO try not to copy these into a vector, do everything by ref. std::vector<T&>?
148 } else {
149 fCanReduce = true;
150 fReducedResult = res;
151 }
152 } else if (code == MPCode::kSendResult) {
154 } else {
155 reply += ": unknown code received: " + std::to_string(code);
156 MPSend(s, MPCode::kError, reply.c_str());
157 }
158 }
159
160private:
161 F fFunc; ///< the function to be executed
162 R fRedFunc; ///< the reduce function
163 decltype(fFunc()) fReducedResult; ///< the result of the execution
164 bool fCanReduce; ///< true if fReducedResult can be reduced with a new result, false until we have produced one result
165};
166
167template<class F, class T>
168class TMPWorkerExecutor<F, T, void> : public TMPWorker {
169public:
170 TMPWorkerExecutor(F func, const std::vector<T> &args) : TMPWorker(), fFunc(func), fArgs(std::move(args)) {}
172 void HandleInput(MPCodeBufPair &msg) ///< Execute instructions received from a TProcessExecutor client
173 {
174 unsigned code = msg.first;
175 TSocket *s = GetSocket();
176 std::string reply = "S" + std::to_string(GetNWorker());
177 if (code == MPCode::kExecFuncWithArg) {
178 unsigned n;
179 msg.second->ReadUInt(n);
181 } else {
182 reply += ": unknown code received: " + std::to_string(code);
183 MPSend(s, MPCode::kError, reply.c_str());
184 }
185 }
186
187private:
188 F fFunc; ///< the function to be executed
189 std::vector<T> fArgs; ///< a vector containing the arguments that must be passed to fFunc
190};
191
192
193// doxygen should ignore this specialization
194/// \cond
195// The most generic class template is meant to handle functions that
196// must be executed by passing one argument to them.
197// This partial specialization is used to handle the case
198// of functions which must be executed without passing any argument.
199template<class F>
200class TMPWorkerExecutor<F, void, void> : public TMPWorker {
201public:
202 explicit TMPWorkerExecutor(F func) : TMPWorker(), fFunc(func) {}
204 void HandleInput(MPCodeBufPair &msg)
205 {
206 unsigned code = msg.first;
207 TSocket *s = GetSocket();
208 std::string myId = "S" + std::to_string(GetPid());
209 if (code == MPCode::kExecFunc) {
211 } else {
212 MPSend(s, MPCode::kError, (myId + ": unknown code received: " + std::to_string(code)).c_str());
213 }
214 }
215
216private:
217 F fFunc;
218};
219/// \endcond
220
221#endif
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
Definition: MPSendRecv.h:31
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
Definition: MPSendRecv.cxx:32
#define R(a, b, c, d, e, f, g, h, i)
Definition: RSha256.hxx:110
typedef void((*Func_t)())
F fFunc
the function to be executed
TMPWorkerExecutor(F func, const std::vector< T > &args)
void HandleInput(MPCodeBufPair &msg)
Handle a message with an EMPCode.
std::vector< T > fArgs
a vector containing the arguments that must be passed to fFunc
void HandleInput(MPCodeBufPair &msg)
Handle a message with an EMPCode.
F fFunc
the function to be executed
bool fCanReduce
true if fReducedResult can be reduced with a new result, false until we have produced one result
This class works together with TProcessExecutor to allow the execution of functions in server process...
bool fCanReduce
true if fReducedResult can be reduced with a new result, false until we have produced one result
F fFunc
the function to be executed
R fRedFunc
the reduce function
void HandleInput(MPCodeBufPair &msg)
Handle a message with an EMPCode.
decltype(fFunc(fArgs.front())) fReducedResult
the result of the execution
TMPWorkerExecutor(F func, const std::vector< T > &args, R redfunc)
std::vector< T > fArgs
a vector containing the arguments that must be passed to fFunc
This class works in conjuction with TMPClient, reacting to messages received from it as specified by ...
Definition: TMPWorker.h:26
unsigned GetNWorker() const
Definition: TMPWorker.h:42
TMPWorker()
Definition: TMPWorker.h:28
pid_t GetPid()
Definition: TMPWorker.h:41
TSocket * GetSocket()
Definition: TMPWorker.h:40
const Int_t n
Definition: legend1.C:16
#define F(x, y, z)
@ kSendResult
Ask for a kFuncResult/kProcResult.
Definition: MPCode.h:36
@ kIdling
We are ready for the next task.
Definition: MPCode.h:35
@ kError
Error message.
Definition: MPCode.h:47
@ kFuncResult
The message contains the result of a function execution.
Definition: MPCode.h:33
@ kExecFuncWithArg
Execute function with the argument contained in the message.
Definition: MPCode.h:32
@ kExecFunc
Execute function without arguments.
Definition: MPCode.h:31
double T(double x)
Definition: ChebyshevPol.h:34
static constexpr double s