ROOT  6.06/09
Reference Guide
TPoolWorker.h
Go to the documentation of this file.
1 /* @(#)root/multiproc:$Id$ */
2 // Author: Enrico Guiraud July 2015
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 #ifndef ROOT_TPoolWorker
13 #define ROOT_TPoolWorker
14 
15 #include "TMPWorker.h"
16 #include "PoolUtils.h"
17 #include "MPCode.h"
18 #include "MPSendRecv.h"
19 #include <string>
20 #include <vector>
21 
22 // Quick guide to TPoolWorker:
23 // For each TProcPool::Map and TProcPool::MapReduce signature
24 // there's a corresponding
25 // specialization of TPoolWorker:
26 // * Map(func, nTimes) --> TPoolWorker<F, void, void>
27 // * Map(func, args) --> TPoolWorker<F, T, void>
28 // * MapReduce(func, nTimes, redfunc) --> TPoolWorker<F, void, R>
29 // * MapReduce(func, args, redfunc) --> TPoolWorker<F, T, R>
30 // I thought about having four different classes (with different names)
31 // instead of four specializations of the same class template, but it really
32 // makes no difference in the end since the different classes would be class
33 // templates anyway, and I would have to find a meaningful name for each one.
34 // About code replication: looking carefully, it can be noticed that there's
35 // very little code replication since the different versions of TPoolWorker
36 // all behave slightly differently, in incompatible ways (e.g. they all need
37 // different data members, different signatures for the ctors, and so on).
38 
39 template<class F, class T = void, class R = void>
40 class TPoolWorker : public TMPWorker {
41 public:
42  // TProcPool is in charge of checking the signatures for incompatibilities:
43  // we trust that decltype(redfunc(std::vector<decltype(func(args[0]))>)) == decltype(args[0])
44  // TODO document somewhere that fReducedResult must have a default ctor
45  TPoolWorker(F func, const std::vector<T> &args, R redfunc) :
46  TMPWorker(), fFunc(func), fArgs(std::move(args)), fRedFunc(redfunc),
47  fReducedResult(), fCanReduce(false)
48  {}
50 
51  void HandleInput(MPCodeBufPair &msg) ///< Execute instructions received from a TProcPool client
52  {
53  unsigned code = msg.first;
54  TSocket *s = GetSocket();
55  std::string reply = "S" + std::to_string(GetNWorker());
56  if (code == PoolCode::kExecFuncWithArg) {
57  unsigned n;
58  msg.second->ReadUInt(n);
59  // execute function on argument n
60  const auto &res = fFunc(fArgs[n]);
61  // tell client we're done
63  // reduce arguments if possible
64  if (fCanReduce) {
65  fReducedResult = fRedFunc({res, fReducedResult}); //TODO try not to copy these into a vector, do everything by ref. std::vector<T&>?
66  } else {
67  fCanReduce = true;
68  fReducedResult = res;
69  }
70  } else if (code == PoolCode::kSendResult) {
72  } else {
73  reply += ": unknown code received: " + std::to_string(code);
74  MPSend(s, MPCode::kError, reply.data());
75  }
76  }
77 
78 private:
79  F fFunc; ///< the function to be executed
80  std::vector<T> fArgs; ///< a vector containing the arguments that must be passed to fFunc
81  R fRedFunc; ///< the reduce function
82  decltype(fFunc(fArgs.front())) fReducedResult; ///< the result of the execution
83  bool fCanReduce; ///< true if fReducedResult can be reduced with a new result, false until we have produced one result
84 };
85 
86 
87 template<class F, class R>
88 class TPoolWorker<F, void, R> : public TMPWorker {
89 public:
90  TPoolWorker(F func, R redfunc) :
91  TMPWorker(), fFunc(func), fRedFunc(redfunc),
92  fReducedResult(), fCanReduce(false)
93  {}
95 
96  void HandleInput(MPCodeBufPair &msg) ///< Execute instructions received from a TProcPool client
97  {
98  unsigned code = msg.first;
99  TSocket *s = GetSocket();
100  std::string reply = "S" + std::to_string(GetNWorker());
101  if (code == PoolCode::kExecFunc) {
102  // execute function
103  const auto &res = fFunc();
104  // tell client we're done
106  // reduce arguments if possible
107  if (fCanReduce) {
108  fReducedResult = fRedFunc({res, fReducedResult}); //TODO try not to copy these into a vector, do everything by ref. std::vector<T&>?
109  } else {
110  fCanReduce = true;
111  fReducedResult = res;
112  }
113  } else if (code == PoolCode::kSendResult) {
115  } else {
116  reply += ": unknown code received: " + std::to_string(code);
117  MPSend(s, MPCode::kError, reply.data());
118  }
119  }
120 
121 private:
122  F fFunc; ///< the function to be executed
123  R fRedFunc; ///< the reduce function
124  decltype(fFunc()) fReducedResult; ///< the result of the execution
125  bool fCanReduce; ///< true if fReducedResult can be reduced with a new result, false until we have produced one result
126 };
127 
128 template<class F, class T>
129 class TPoolWorker<F, T, void> : public TMPWorker {
130 public:
131  TPoolWorker(F func, const std::vector<T> &args) : TMPWorker(), fFunc(func), fArgs(std::move(args)) {}
133  void HandleInput(MPCodeBufPair &msg) ///< Execute instructions received from a TProcPool client
134  {
135  unsigned code = msg.first;
136  TSocket *s = GetSocket();
137  std::string reply = "S" + std::to_string(GetNWorker());
138  if (code == PoolCode::kExecFuncWithArg) {
139  unsigned n;
140  msg.second->ReadUInt(n);
141  MPSend(s, PoolCode::kFuncResult, fFunc(fArgs[n]));
142  } else {
143  reply += ": unknown code received: " + std::to_string(code);
144  MPSend(s, MPCode::kError, reply.data());
145  }
146  }
147 
148 private:
149  F fFunc; ///< the function to be executed
150  std::vector<T> fArgs; ///< a vector containing the arguments that must be passed to fFunc
151 };
152 
153 
154 // doxygen should ignore this specialization
155 /// \cond
156 // The most generic class template is meant to handle functions that
157 // must be executed by passing one argument to them.
158 // This partial specialization is used to handle the case
159 // of functions which must be executed without passing any argument.
160 template<class F>
161 class TPoolWorker<F, void, void> : public TMPWorker {
162 public:
163  explicit TPoolWorker(F func) : TMPWorker(), fFunc(func) {}
164  ~TPoolWorker() {}
165  void HandleInput(MPCodeBufPair &msg)
166  {
167  unsigned code = msg.first;
168  TSocket *s = GetSocket();
169  std::string myId = "S" + std::to_string(GetPid());
170  if (code == PoolCode::kExecFunc) {
172  } else {
173  MPSend(s, MPCode::kError, (myId + ": unknown code received: " + std::to_string(code)).c_str());
174  }
175  }
176 
177 private:
178  F fFunc;
179 };
180 /// \endcond
181 
182 #endif
pid_t GetPid()
Definition: TMPWorker.h:34
The message contains the result of a function execution.
Definition: PoolUtils.h:31
void HandleInput(MPCodeBufPair &msg)
Handle a message with an EMPCode.
Definition: TPoolWorker.h:133
F fFunc
the function to be executed
Definition: TPoolWorker.h:79
Small helper to encapsulate whether to return the value pointed to by the iterator or its address...
double T(double x)
Definition: ChebyshevPol.h:34
R fRedFunc
the reduce function
Definition: TPoolWorker.h:81
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
Definition: MPSendRecv.cxx:21
Error message.
Definition: MPCode.h:30
STL namespace.
ClassImp(TIterator) Bool_t TIterator return false
Compare two iterator objects.
Definition: TIterator.cxx:20
We are ready for the next task.
Definition: PoolUtils.h:33
unsigned GetNWorker() const
Definition: TMPWorker.h:35
R fRedFunc
the reduce function
Definition: TPoolWorker.h:123
decltype(fFunc(fArgs.front())) fReducedResult
the result of the execution
Definition: TPoolWorker.h:82
F fFunc
the function to be executed
Definition: TPoolWorker.h:149
TMPWorker()
Class constructor.
Definition: TMPWorker.cxx:42
TSocket * GetSocket()
Definition: TMPWorker.h:33
Execute function with the argument contained in the message.
Definition: PoolUtils.h:30
F fFunc
the function to be executed
Definition: TPoolWorker.h:122
std::pair< unsigned, std::unique_ptr< TBufferFile >> MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
Definition: MPSendRecv.h:20
TPoolWorker(F func, const std::vector< T > &args)
Definition: TPoolWorker.h:131
Execute function without arguments.
Definition: PoolUtils.h:29
void HandleInput(MPCodeBufPair &msg)
Handle a message with an EMPCode.
Definition: TPoolWorker.h:96
Ask for a kFuncResult/kProcResult.
Definition: PoolUtils.h:34
This class works together with TProcPool to allow the execution of functions in server processes...
Definition: TPoolWorker.h:40
TPoolWorker(F func, R redfunc)
Definition: TPoolWorker.h:90
TPoolWorker(F func, const std::vector< T > &args, R redfunc)
Definition: TPoolWorker.h:45
std::vector< T > fArgs
a vector containing the arguments that must be passed to fFunc
Definition: TPoolWorker.h:80
std::vector< T > fArgs
a vector containing the arguments that must be passed to fFunc
Definition: TPoolWorker.h:150
void HandleInput(MPCodeBufPair &msg)
Handle a message with an EMPCode.
Definition: TPoolWorker.h:51
const Int_t n
Definition: legend1.C:16
TRandom3 R
a TMatrixD.
Definition: testIO.cxx:28