Logo ROOT   6.10/09
Reference Guide
TMPClient.cxx
Go to the documentation of this file.
1 /* @(#)root/multiproc:$Id$ */
2 // Author: Enrico Guiraud July 2015
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 #include "MPCode.h"
13 #include "TGuiFactory.h" //gGuiFactory
14 #include "TError.h" //gErrorIgnoreLevel
15 #include "TMPClient.h"
16 #include "TMPWorker.h"
17 #include "TROOT.h" //gROOT
18 #include "TSocket.h"
19 #include "TSystem.h" //gSystem
20 #include "TVirtualX.h" //gVirtualX
21 #include <errno.h> //errno, used by socketpair
22 #include <memory> //unique_ptr
23 #include <sys/socket.h> //socketpair
24 #include <sys/wait.h> // waitpid
25 #include <unistd.h> // close, fork
26 #include <dlfcn.h>
27 
28 //////////////////////////////////////////////////////////////////////////
29 ///
30 /// \class TMPClient
31 ///
32 /// Base class for multiprocess applications' clients. It provides a
33 /// simple interface to fork a ROOT session into server/worker sessions
34 /// and exchange messages with them. Multiprocessing applications can build
35 /// on TMPClient and TMPWorker: the class providing multiprocess
36 /// functionalities to users should inherit (possibly privately) from
37 /// TMPClient, and the workers executing tasks should inherit from TMPWorker.
38 ///
39 //////////////////////////////////////////////////////////////////////////
40 
41 //////////////////////////////////////////////////////////////////////////
42 /// Class constructor.
43 /// \param nWorkers
44 /// \parblock
45 /// the number of children processes that will be created by
46 /// Fork, i.e. the number of workers that will be available after this call.
47 /// The default value (0) means that a number of workers equal to the number
48 /// of cores of the machine is going to be spawned. If that information is
49 /// not available, 2 workers are created instead.
50 /// \endparblock
51 TMPClient::TMPClient(unsigned nWorkers) : fIsParent(true), fWorkerPids(), fMon(), fNWorkers(0)
52 {
53  // decide on number of workers
54  if (nWorkers) {
55  fNWorkers = nWorkers;
56  } else {
57  SysInfo_t si;
58  if (gSystem->GetSysInfo(&si) == 0)
59  fNWorkers = si.fCpus;
60  else
61  fNWorkers = 2;
62  }
63 }
64 
65 
66 //////////////////////////////////////////////////////////////////////////
67 /// Class destructor.
68 /// This method is in charge of shutting down any remaining worker,
69 /// closing off connections and reap the terminated children processes.
71 {
74  l->Delete();
75  delete l;
77  l->Delete();
78  delete l;
79  fMon.RemoveAll();
80  ReapWorkers();
81 }
82 
83 namespace ROOT {
84  namespace Internal {
85  /// Class to acquire and release the Python GIL where it applies, i.e.
86  /// if libPython is loaded and the interpreter is initialized.
87  class TGILRAII {
88  using Py_IsInitialized_type = int (*)(void);
89  using PyGILState_Ensure_type = void* (*)(void);
90  using PyGILState_Release_type = void (*)(void*);
91  void* fPyGILState_STATE = nullptr;
92  template<class FPTYPE>
93  FPTYPE GetSymT(const char* name) {return (FPTYPE) dlsym(nullptr,name);}
94  public:
95  TGILRAII()
96  {
97  auto Py_IsInitialized = GetSymT<Py_IsInitialized_type>("Py_IsInitialized");
98  if (!Py_IsInitialized || !Py_IsInitialized()) return;
99  auto PyGILState_Ensure = GetSymT<PyGILState_Ensure_type>("PyGILState_Ensure");
100  if (PyGILState_Ensure) fPyGILState_STATE = PyGILState_Ensure();
101  }
102 
103  ~TGILRAII()
104  {
105  auto PyGILState_Release = GetSymT<PyGILState_Release_type>("PyGILState_Release");
106  if (fPyGILState_STATE && PyGILState_Release) PyGILState_Release(fPyGILState_STATE);
107  }
108  };
109  }
110 }
111 
112 //////////////////////////////////////////////////////////////////////////
113 /// This method forks the ROOT session into fNWorkers children processes.
114 /// The ROOT sessions spawned in this way will not have graphical
115 /// capabilities and will not read from standard input, but will be
116 /// connected to the original (interactive) session through TSockets.
117 /// The children processes' PIDs are added to the fWorkerPids vector.
118 /// The parent session can then communicate with the children using the
119 /// Broadcast and MPSend methods, and receive messages through MPRecv.\n
120 /// \param server
121 /// \parblock
122 /// A pointer to an instance of the class that will take control
123 /// of the subprocesses after forking. Applications should implement their
124 /// own class inheriting from TMPWorker. Behaviour can be customized
125 /// overriding TMPWorker::HandleInput.
126 /// \endparblock
127 /// \return true if Fork succeeded, false otherwise
129 {
130  std::string basePath = "/tmp/ROOTMP-";
131 
132  //fork as many times as needed and save pids
133  pid_t pid = 1; //must be positive to handle the case in which fNWorkers is 0
134  int sockets[2]; //sockets file descriptors
135  unsigned nWorker = 0;
136  for (; nWorker < fNWorkers; ++nWorker) {
137  //create socket pair
138  int ret = socketpair(AF_UNIX, SOCK_STREAM, 0, sockets);
139  if (ret != 0) {
140  Error("TMPClient::Fork", "[E][C] Could not create socketpair. Error n. . Now retrying.\n%d", errno);
141  --nWorker;
142  continue;
143  }
144 
145  //fork
146  {
147  ROOT::Internal::TGILRAII tgilraai;
148  pid = fork();
149  }
150 
151  if (!pid) {
152  //child process, exit loop. sockets[1] is the fd that should be used
153  break;
154  } else {
155  //parent process, create TSocket with current value of sockets[0]
156  close(sockets[1]); //we don't need this
157  TSocket *s = new TSocket(sockets[0], (std::to_string(pid)).c_str()); //TSocket's constructor with this signature seems much faster than TSocket(int fd)
158  if (s && s->IsValid()) {
159  fMon.Add(s);
160  fWorkerPids.push_back(pid);
161  } else {
162  Error("TMPClient::Fork","[E][C] Could not connect to worker with pid %d. Giving up.\n", pid);
163  delete s;
164  }
165  }
166  }
167 
168  if (pid) {
169  //parent returns here
170  return true;
171  } else {
172  //CHILD/WORKER
173  fIsParent = false;
174  close(sockets[0]); //we don't need this
175 
176  //override signal handler (make the servers exit on SIGINT)
177  TSeqCollection *signalHandlers = gSystem->GetListOfSignalHandlers();
178  TSignalHandler *sh = nullptr;
179  if (signalHandlers && signalHandlers->GetSize() > 0)
180  sh = (TSignalHandler *)signalHandlers->First();
181  if (sh)
183 
184  //remove stdin from eventloop and close it
185  TSeqCollection *fileHandlers = gSystem->GetListOfFileHandlers();
186  if (fileHandlers) {
187  for (auto h : *fileHandlers) {
188  if (h && ((TFileHandler *)h)->GetFd() == 0) {
190  break;
191  }
192  }
193  }
194  close(0);
195  if (fMon.GetListOfActives()) {
196  while (fMon.GetListOfActives()->GetSize() > 0) {
197  TSocket *s = (TSocket *) fMon.GetListOfActives()->First();
198  fMon.Remove(s);
199  delete s;
200  }
201  }
202  if (fMon.GetListOfDeActives()) {
203  while (fMon.GetListOfDeActives()->GetSize() > 0) {
205  fMon.Remove(s);
206  delete s;
207  }
208  }
209  //disable graphics
210  //these instructions were copied from TApplication::MakeBatch
211  gROOT->SetBatch();
213  delete gGuiFactory;
215 #ifndef R__WIN32
216  if (gVirtualX != gGXBatch)
217  delete gVirtualX;
218 #endif
220 
221  //prepare server and add it to eventloop
222  server.Init(sockets[1], nWorker);
223 
224  //enter worker loop
225  server.Run();
226  }
227 
228  //control should never reach here
229  return true;
230 }
231 
232 
233 //////////////////////////////////////////////////////////////////////////
234 /// Send a message with the specified code to at most nMessages workers.
235 /// Sockets can either be in an "active" or "non-active" state. This method
236 /// activates all the sockets through which the client is connected to the
237 /// workers, and deactivates them when a message is sent to the corresponding
238 /// worker. This way the sockets pertaining to workers who have been left
239 /// idle will be the only ones in the active list
240 /// (TSocket::GetMonitor()->GetListOfActives()) after execution.
241 /// \param code the code to send (e.g. EMPCode)
242 /// \param nMessages
243 /// \parblock
244 /// the maximum number of messages to send.
245 /// If `nMessages == 0 || nMessage > fNWorkers`, send a message to every worker.
246 /// \endparblock
247 /// \return the number of messages successfully sent
248 unsigned TMPClient::Broadcast(unsigned code, unsigned nMessages)
249 {
250  if (nMessages == 0)
251  nMessages = fNWorkers;
252  unsigned count = 0;
253  fMon.ActivateAll();
254 
255  //send message to all sockets
256  std::unique_ptr<TList> lp(fMon.GetListOfActives());
257  for (auto s : *lp) {
258  if (count == nMessages)
259  break;
260  if (MPSend((TSocket *)s, code)) {
261  fMon.DeActivate((TSocket *)s);
262  ++count;
263  } else {
264  Error("TMPClient:Broadcast", "[E] Could not send message to server\n");
265  }
266  }
267 
268  return count;
269 }
270 
271 
272 //////////////////////////////////////////////////////////////////////////
273 /// DeActivate a certain socket.
274 /// This does not remove it from the monitor: it will be reactivated by
275 /// the next call to Broadcast() (or possibly other methods that are
276 /// specified to do so).\n
277 /// A socket should be DeActivated when the corresponding
278 /// worker is done *for now* and we want to stop listening to this worker's
279 /// socket. If the worker is done *forever*, Remove() should be used instead.
280 /// \param s the socket to be deactivated
282 {
283  fMon.DeActivate(s);
284 }
285 
286 
287 //////////////////////////////////////////////////////////////////////////
288 /// Remove a certain socket from the monitor.
289 /// A socket should be Removed from the monitor when the
290 /// corresponding worker is done *forever*. For example HandleMPCode()
291 /// calls this method on sockets pertaining to workers which sent an
292 /// MPCode::kShutdownNotice.\n
293 /// If the worker is done *for now*, DeActivate should be used instead.
294 /// \param s the socket to be removed from the monitor fMon
296 {
297  fMon.Remove(s);
298  delete s;
299 }
300 
301 
302 //////////////////////////////////////////////////////////////////////////
303 /// Wait on worker processes and remove their pids from fWorkerPids.
304 /// A blocking waitpid is called, but this should actually not block
305 /// execution since ReapWorkers should only be called when all workers
306 /// have already quit. ReapWorkers is then called not to leave zombie
307 /// processes hanging around, and to clean-up fWorkerPids.
309 {
310  for (auto &pid : fWorkerPids) {
311  waitpid(pid, nullptr, 0);
312  }
313  fWorkerPids.clear();
314 }
315 
316 
317 //////////////////////////////////////////////////////////////////////////
318 /// Handle messages containing an EMPCode.
319 /// This method should be called upon receiving a message with a code >= 1000
320 /// (i.e. EMPCode). It handles the most generic types of messages.\n
321 /// Classes inheriting from TMPClient should implement a similar method
322 /// to handle message codes specific to the application they're part of.\n
323 /// \param msg the MPCodeBufPair returned by a MPRecv call
324 /// \param s
325 /// \parblock
326 /// a pointer to the socket from which the message has been received is passed.
327 /// This way HandleMPCode knows which socket to reply on.
328 /// \endparblock
330 {
331  unsigned code = msg.first;
332  //message contains server's pid. retrieve it
333  const char *str = ReadBuffer<const char*>(msg.second.get());
334 
335  if (code == MPCode::kMessage) {
336  Error("TMPClient::HandleMPCode", "[I][C] message received: %s\n", str);
337  } else if (code == MPCode::kError) {
338  Error("TMPClient::HandleMPCode", "[E][C] error message received: %s\n", str);
339  } else if (code == MPCode::kShutdownNotice || code == MPCode::kFatalError) {
340  if (gDebug > 0) //generally users don't want to know this
341  Error("TMPClient::HandleMPCode", "[I][C] shutdown notice received from %s\n", str);
342  Remove(s);
343  } else
344  Error("TMPClient::HandleMPCode", "[W][C] unknown code received. code=%d\n", code);
345  delete [] str;
346 }
R__EXTERN TGuiFactory * gBatchGuiFactory
Definition: TGuiFactory.h:67
virtual Bool_t IsValid() const
Definition: TSocket.h:146
std::vector< pid_t > fWorkerPids
A vector containing the PIDs of children processes/workers.
Definition: TMPClient.h:48
virtual void Delete(Option_t *option="")
Remove all objects from the list AND delete all heap based objects.
Definition: TList.cxx:409
virtual void Remove(TSocket *sock)
Remove a socket from the monitor.
Definition: TMonitor.cxx:214
bool Fork(TMPWorker &server)
This method forks the ROOT session into fNWorkers children processes.
Definition: TMPClient.cxx:128
Namespace for new ROOT classes and functions.
Definition: StringConv.hxx:21
virtual TSeqCollection * GetListOfFileHandlers() const
Definition: TSystem.h:364
TMonitor fMon
This object manages the sockets and detect socket events via TMonitor::Select.
Definition: TMPClient.h:49
TH1 * h
Definition: legend2.C:5
This class works in conjuction with TMPClient, reacting to messages received from it as specified by ...
Definition: TMPWorker.h:26
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
Definition: MPSendRecv.cxx:32
#define gROOT
Definition: TROOT.h:375
virtual void Add(TSocket *sock, Int_t interest=kRead)
Add socket to the monitor&#39;s active list.
Definition: TMonitor.cxx:168
Error message.
Definition: MPCode.h:47
virtual void RemoveAll()
Remove all sockets from the monitor.
Definition: TMonitor.cxx:241
~TMPClient()
Class destructor.
Definition: TMPClient.cxx:70
virtual TFileHandler * RemoveFileHandler(TFileHandler *fh)
Remove a file handler from the list of file handlers.
Definition: TSystem.cxx:571
unsigned fNWorkers
The number of workers that should be spawned upon forking.
Definition: TMPClient.h:50
void Remove(TSocket *s)
Remove a certain socket from the monitor.
Definition: TMPClient.cxx:295
Sequenceable collection abstract base class.
virtual int GetSysInfo(SysInfo_t *info) const
Returns static system info, like OS type, CPU type, number of CPUs RAM size, etc into the SysInfo_t s...
Definition: TSystem.cxx:2434
virtual void DeActivate(TSocket *sock)
De-activate a socket.
Definition: TMonitor.cxx:284
void DeActivate(TSocket *s)
DeActivate a certain socket.
Definition: TMPClient.cxx:281
R__EXTERN TGuiFactory * gGuiFactory
Definition: TGuiFactory.h:66
Used by the workers to notify client of shutdown.
Definition: MPCode.h:50
void Error(const char *location, const char *msgfmt,...)
A doubly linked list.
Definition: TList.h:43
Fatal error: whoever sends this message is terminating execution.
Definition: MPCode.h:48
R__EXTERN TVirtualX * gGXBatch
Definition: TVirtualX.h:353
Int_t fCpus
Definition: TSystem.h:155
virtual TObject * First() const
Return the first object in the list. Returns 0 when list is empty.
Definition: TList.cxx:561
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
Definition: MPSendRecv.h:31
R__EXTERN TSystem * gSystem
Definition: TSystem.h:539
TList * GetListOfActives() const
Returns a list with all active sockets.
Definition: TMonitor.cxx:498
virtual TSeqCollection * GetListOfSignalHandlers() const
Definition: TSystem.h:361
TLine * l
Definition: textangle.C:4
#define gVirtualX
Definition: TVirtualX.h:350
void Run()
Definition: TMPWorker.cxx:64
TMPClient(unsigned nWorkers=0)
Class constructor.
Definition: TMPClient.cxx:51
Used by the client to tell servers to shutdown.
Definition: MPCode.h:49
virtual TSignalHandler * RemoveSignalHandler(TSignalHandler *sh)
Remove a signal handler from list of signal handlers.
Definition: TSystem.cxx:549
bool fIsParent
This is true if this is the parent/client process, false if this is a child/worker process...
Definition: TMPClient.h:47
TList * GetListOfDeActives() const
Returns a list with all de-active sockets.
Definition: TMonitor.cxx:515
void HandleMPCode(MPCodeBufPair &msg, TSocket *sender)
Handle messages containing an EMPCode.
Definition: TMPClient.cxx:329
typedef void((*Func_t)())
Generic message.
Definition: MPCode.h:46
virtual void Init(int fd, unsigned workerN)
This method is called by children processes right after forking.
Definition: TMPWorker.cxx:55
R__EXTERN Int_t gDebug
Definition: Rtypes.h:83
virtual void ActivateAll()
Activate all de-activated sockets.
Definition: TMonitor.cxx:268
unsigned Broadcast(unsigned code, unsigned nMessages=0)
Send a message with the specified code to at most nMessages workers.
Definition: TMPClient.cxx:248
virtual Int_t GetSize() const
Definition: TCollection.h:89
void ReapWorkers()
Wait on worker processes and remove their pids from fWorkerPids.
Definition: TMPClient.cxx:308
virtual TObject * First() const =0