Logo ROOT   6.12/07
Reference Guide
TMPWorkerExecutor.h
Go to the documentation of this file.
1 /* @(#)root/multiproc:$Id$ */
2 // Author: Enrico Guiraud July 2015
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 #ifndef ROOT_TMPWorkerExecutor
13 #define ROOT_TMPWorkerExecutor
14 
15 #include "MPCode.h"
16 #include "MPSendRecv.h"
17 #include "PoolUtils.h"
18 #include "TMPWorker.h"
19 #include <string>
20 #include <vector>
21 
22 //////////////////////////////////////////////////////////////////////////
23 ///
24 /// \class TMPWorkerExecutor
25 ///
26 /// This class works together with TProcessExecutor to allow the execution of
27 /// functions in server processes. Depending on the exact task that the
28 /// worker is required to execute, a different version of the class
29 /// can be called.
30 ///
31 /// ### TMPWorkerExecutor<F, T, R>
32 /// The most general case, used by
33 /// TProcessExecutor::MapReduce(F func, T& args, R redfunc).
34 /// This worker is build with:
35 /// * a function of signature F (the one to be executed)
36 /// * a collection of arguments of type T on which to apply the function
37 /// * a reduce function with signature R to be used to squash many
38 /// returned values together.
39 ///
40 /// ### Partial specializations
41 /// A few partial specializations are provided for less general cases:
42 /// * TMPWorkerExecutor<F, T, void> handles the case of a function that takes
43 /// one argument and does not perform reduce operations
44 /// (TProcessExecutor::Map(F func, T& args)).
45 /// * TMPWorkerExecutor<F, void, R> handles the case of a function that takes
46 /// no arguments, to be executed a specified amount of times, which
47 /// returned values are squashed together (reduced)
48 /// (TProcessExecutor::Map(F func, unsigned nTimes, R redfunc))
49 /// * TMPWorkerExecutor<F, void, void> handles the case of a function that takes
50 /// no arguments and whose arguments are not "reduced"
51 /// (TProcessExecutor::Map(F func, unsigned nTimes))
52 ///
53 /// Since all the important data are passed to TMPWorkerExecutor at construction
54 /// time, the kind of messages that client and workers have to exchange
55 /// are usually very simple.
56 ///
57 //////////////////////////////////////////////////////////////////////////
58 
59 // Quick guide to TMPWorkerExecutor:
60 // For each TProcessExecutor::Map and TProcessExecutor::MapReduce signature
61 // there's a corresponding
62 // specialization of TMPWorkerExecutor:
63 // * Map(func, nTimes) --> TMPWorkerExecutor<F, void, void>
64 // * Map(func, args) --> TMPWorkerExecutor<F, T, void>
65 // * MapReduce(func, nTimes, redfunc) --> TMPWorkerExecutor<F, void, R>
66 // * MapReduce(func, args, redfunc) --> TMPWorkerExecutor<F, T, R>
67 // I thought about having four different classes (with different names)
68 // instead of four specializations of the same class template, but it really
69 // makes no difference in the end since the different classes would be class
70 // templates anyway, and I would have to find a meaningful name for each one.
71 // About code replication: looking carefully, it can be noticed that there's
72 // very little code replication since the different versions of TMPWorkerExecutor
73 // all behave slightly differently, in incompatible ways (e.g. they all need
74 // different data members, different signatures for the ctors, and so on).
75 
76 template<class F, class T = void, class R = void>
77 class TMPWorkerExecutor : public TMPWorker {
78 public:
79  // TProcessExecutor is in charge of checking the signatures for incompatibilities:
80  // we trust that decltype(redfunc(std::vector<decltype(func(args[0]))>)) == decltype(args[0])
81  // TODO document somewhere that fReducedResult must have a default ctor
82  TMPWorkerExecutor(F func, const std::vector<T> &args, R redfunc) :
83  TMPWorker(), fFunc(func), fArgs(args), fRedFunc(redfunc),
84  fReducedResult(), fCanReduce(false)
85  {}
87 
88  void HandleInput(MPCodeBufPair &msg) ///< Execute instructions received from a TProcessExecutor client
89  {
90  unsigned code = msg.first;
91  TSocket *s = GetSocket();
92  std::string reply = "S" + std::to_string(GetNWorker());
93  if (code == MPCode::kExecFuncWithArg) {
94  unsigned n;
95  msg.second->ReadUInt(n);
96  // execute function on argument n
97  const auto &res = fFunc(fArgs[n]);
98  // tell client we're done
100  // reduce arguments if possible
101  if (fCanReduce) {
102  using FINAL = decltype(fReducedResult);
103  using ORIGINAL = decltype(fRedFunc({res, fReducedResult}));
104  fReducedResult = ROOT::Internal::PoolUtils::ResultCaster<ORIGINAL, FINAL>::CastIfNeeded(fRedFunc({res, fReducedResult})); //TODO try not to copy these into a vector, do everything by ref. std::vector<T&>?
105  } else {
106  fCanReduce = true;
107  fReducedResult = res;
108  }
109  } else if (code == MPCode::kSendResult) {
111  } else {
112  reply += ": unknown code received: " + std::to_string(code);
113  MPSend(s, MPCode::kError, reply.c_str());
114  }
115  }
116 
117 private:
118  F fFunc; ///< the function to be executed
119  std::vector<T> fArgs; ///< a vector containing the arguments that must be passed to fFunc
120  R fRedFunc; ///< the reduce function
121  decltype(fFunc(fArgs.front())) fReducedResult; ///< the result of the execution
122  bool fCanReduce; ///< true if fReducedResult can be reduced with a new result, false until we have produced one result
123 };
124 
125 
126 template<class F, class R>
127 class TMPWorkerExecutor<F, void, R> : public TMPWorker {
128 public:
129  TMPWorkerExecutor(F func, R redfunc) :
130  TMPWorker(), fFunc(func), fRedFunc(redfunc),
131  fReducedResult(), fCanReduce(false)
132  {}
134 
135  void HandleInput(MPCodeBufPair &msg) ///< Execute instructions received from a TProcessExecutor client
136  {
137  unsigned code = msg.first;
138  TSocket *s = GetSocket();
139  std::string reply = "S" + std::to_string(GetNWorker());
140  if (code == MPCode::kExecFunc) {
141  // execute function
142  const auto &res = fFunc();
143  // tell client we're done
145  // reduce arguments if possible
146  if (fCanReduce) {
147  fReducedResult = fRedFunc({res, fReducedResult}); //TODO try not to copy these into a vector, do everything by ref. std::vector<T&>?
148  } else {
149  fCanReduce = true;
150  fReducedResult = res;
151  }
152  } else if (code == MPCode::kSendResult) {
154  } else {
155  reply += ": unknown code received: " + std::to_string(code);
156  MPSend(s, MPCode::kError, reply.c_str());
157  }
158  }
159 
160 private:
161  F fFunc; ///< the function to be executed
162  R fRedFunc; ///< the reduce function
163  decltype(fFunc()) fReducedResult; ///< the result of the execution
164  bool fCanReduce; ///< true if fReducedResult can be reduced with a new result, false until we have produced one result
165 };
166 
167 template<class F, class T>
168 class TMPWorkerExecutor<F, T, void> : public TMPWorker {
169 public:
170  TMPWorkerExecutor(F func, const std::vector<T> &args) : TMPWorker(), fFunc(func), fArgs(std::move(args)) {}
172  void HandleInput(MPCodeBufPair &msg) ///< Execute instructions received from a TProcessExecutor client
173  {
174  unsigned code = msg.first;
175  TSocket *s = GetSocket();
176  std::string reply = "S" + std::to_string(GetNWorker());
177  if (code == MPCode::kExecFuncWithArg) {
178  unsigned n;
179  msg.second->ReadUInt(n);
180  MPSend(s, MPCode::kFuncResult, fFunc(fArgs[n]));
181  } else {
182  reply += ": unknown code received: " + std::to_string(code);
183  MPSend(s, MPCode::kError, reply.c_str());
184  }
185  }
186 
187 private:
188  F fFunc; ///< the function to be executed
189  std::vector<T> fArgs; ///< a vector containing the arguments that must be passed to fFunc
190 };
191 
192 
193 // doxygen should ignore this specialization
194 /// \cond
195 // The most generic class template is meant to handle functions that
196 // must be executed by passing one argument to them.
197 // This partial specialization is used to handle the case
198 // of functions which must be executed without passing any argument.
199 template<class F>
200 class TMPWorkerExecutor<F, void, void> : public TMPWorker {
201 public:
202  explicit TMPWorkerExecutor(F func) : TMPWorker(), fFunc(func) {}
203  ~TMPWorkerExecutor() {}
204  void HandleInput(MPCodeBufPair &msg)
205  {
206  unsigned code = msg.first;
207  TSocket *s = GetSocket();
208  std::string myId = "S" + std::to_string(GetPid());
209  if (code == MPCode::kExecFunc) {
211  } else {
212  MPSend(s, MPCode::kError, (myId + ": unknown code received: " + std::to_string(code)).c_str());
213  }
214  }
215 
216 private:
217  F fFunc;
218 };
219 /// \endcond
220 
221 #endif
pid_t GetPid()
Definition: TMPWorker.h:44
F fFunc
the function to be executed
void HandleInput(MPCodeBufPair &msg)
Handle a message with an EMPCode.
This class works together with TProcessExecutor to allow the execution of functions in server process...
double T(double x)
Definition: ChebyshevPol.h:34
TMPWorkerExecutor(F func, const std::vector< T > &args, R redfunc)
This class works in conjuction with TMPClient, reacting to messages received from it as specified by ...
Definition: TMPWorker.h:26
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
Definition: MPSendRecv.cxx:32
Error message.
Definition: MPCode.h:47
STL namespace.
decltype(fFunc(fArgs.front())) fReducedResult
the result of the execution
unsigned GetNWorker() const
Definition: TMPWorker.h:45
void HandleInput(MPCodeBufPair &msg)
Handle a message with an EMPCode.
Ask for a kFuncResult/kProcResult.
Definition: MPCode.h:36
bool fCanReduce
true if fReducedResult can be reduced with a new result, false until we have produced one result ...
#define F(x, y, z)
The message contains the result of a function execution.
Definition: MPCode.h:33
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
Definition: MPSendRecv.h:31
Execute function with the argument contained in the message.
Definition: MPCode.h:32
TMPWorker()
Definition: TMPWorker.h:31
TSocket * GetSocket()
Definition: TMPWorker.h:43
std::vector< T > fArgs
a vector containing the arguments that must be passed to fFunc
R fRedFunc
the reduce function
static constexpr double s
void HandleInput(MPCodeBufPair &msg)
Handle a message with an EMPCode.
TMPWorkerExecutor(F func, const std::vector< T > &args)
F fFunc
the function to be executed
std::vector< T > fArgs
a vector containing the arguments that must be passed to fFunc
constexpr Double_t R()
Definition: TMath.h:213
We are ready for the next task.
Definition: MPCode.h:35
const Int_t n
Definition: legend1.C:16
F fFunc
the function to be executed
Execute function without arguments.
Definition: MPCode.h:31