Logo ROOT   6.08/07
Reference Guide
TMPClient.cxx
Go to the documentation of this file.
1 /* @(#)root/multiproc:$Id$ */
2 // Author: Enrico Guiraud July 2015
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 #include "MPCode.h"
13 #include "TGuiFactory.h" //gGuiFactory
14 #include "TError.h" //gErrorIgnoreLevel
15 #include "TMPClient.h"
16 #include "TMPWorker.h"
17 #include "TROOT.h" //gROOT
18 #include "TSocket.h"
19 #include "TSystem.h" //gSystem
20 #include "TVirtualX.h" //gVirtualX
21 #include <errno.h> //errno, used by socketpair
22 #include <memory> //unique_ptr
23 #include <sys/socket.h> //socketpair
24 #include <sys/wait.h> // waitpid
25 #include <unistd.h> // close, fork
26 
27 //////////////////////////////////////////////////////////////////////////
28 ///
29 /// \class TMPClient
30 ///
31 /// Base class for multiprocess applications' clients. It provides a
32 /// simple interface to fork a ROOT session into server/worker sessions
33 /// and exchange messages with them. Multiprocessing applications can build
34 /// on TMPClient and TMPWorker: the class providing multiprocess
35 /// functionalities to users should inherit (possibly privately) from
36 /// TMPClient, and the workers executing tasks should inherit from TMPWorker.
37 ///
38 //////////////////////////////////////////////////////////////////////////
39 
40 //////////////////////////////////////////////////////////////////////////
41 /// Class constructor.
42 /// \param nWorkers
43 /// \parblock
44 /// the number of children processes that will be created by
45 /// Fork, i.e. the number of workers that will be available after this call.
46 /// The default value (0) means that a number of workers equal to the number
47 /// of cores of the machine is going to be spawned. If that information is
48 /// not available, 2 workers are created instead.
49 /// \endparblock
50 TMPClient::TMPClient(unsigned nWorkers) : fIsParent(true), fWorkerPids(), fMon(), fNWorkers(0)
51 {
52  // decide on number of workers
53  if (nWorkers) {
54  fNWorkers = nWorkers;
55  } else {
56  SysInfo_t si;
57  if (gSystem->GetSysInfo(&si) == 0)
58  fNWorkers = si.fCpus;
59  else
60  fNWorkers = 2;
61  }
62 }
63 
64 
65 //////////////////////////////////////////////////////////////////////////
66 /// Class destructor.
67 /// This method is in charge of shutting down any remaining worker,
68 /// closing off connections and reap the terminated children processes.
70 {
73  l->Delete();
74  delete l;
76  l->Delete();
77  delete l;
78  fMon.RemoveAll();
79  ReapWorkers();
80 }
81 
82 
83 //////////////////////////////////////////////////////////////////////////
84 /// This method forks the ROOT session into fNWorkers children processes.
85 /// The ROOT sessions spawned in this way will not have graphical
86 /// capabilities and will not read from standard input, but will be
87 /// connected to the original (interactive) session through TSockets.
88 /// The children processes' PIDs are added to the fWorkerPids vector.
89 /// The parent session can then communicate with the children using the
90 /// Broadcast and MPSend methods, and receive messages through MPRecv.\n
91 /// \param server
92 /// \parblock
93 /// A pointer to an instance of the class that will take control
94 /// of the subprocesses after forking. Applications should implement their
95 /// own class inheriting from TMPWorker. Behaviour can be customized
96 /// overriding TMPWorker::HandleInput.
97 /// \endparblock
98 /// \return true if Fork succeeded, false otherwise
100 {
101  std::string basePath = "/tmp/ROOTMP-";
102 
103  //fork as many times as needed and save pids
104  pid_t pid = 1; //must be positive to handle the case in which fNWorkers is 0
105  int sockets[2]; //sockets file descriptors
106  unsigned nWorker = 0;
107  for (; nWorker < fNWorkers; ++nWorker) {
108  //create socket pair
109  int ret = socketpair(AF_UNIX, SOCK_STREAM, 0, sockets);
110  if (ret != 0) {
111  Error("TMPClient::Fork", "[E][C] Could not create socketpair. Error n. . Now retrying.\n%d", errno);
112  --nWorker;
113  continue;
114  }
115 
116  //fork
117  pid = fork();
118 
119  if (!pid) {
120  //child process, exit loop. sockets[1] is the fd that should be used
121  break;
122  } else {
123  //parent process, create TSocket with current value of sockets[0]
124  close(sockets[1]); //we don't need this
125  TSocket *s = new TSocket(sockets[0], (std::to_string(pid)).c_str()); //TSocket's constructor with this signature seems much faster than TSocket(int fd)
126  if (s && s->IsValid()) {
127  fMon.Add(s);
128  fWorkerPids.push_back(pid);
129  } else {
130  Error("TMPClient::Fork","[E][C] Could not connect to worker with pid %d. Giving up.\n", pid);
131  delete s;
132  }
133  }
134  }
135 
136  if (pid) {
137  //parent returns here
138  return true;
139  } else {
140  //CHILD/WORKER
141  fIsParent = false;
142  close(sockets[0]); //we don't need this
143 
144  //override signal handler (make the servers exit on SIGINT)
145  TSeqCollection *signalHandlers = gSystem->GetListOfSignalHandlers();
146  TSignalHandler *sh = nullptr;
147  if (signalHandlers && signalHandlers->GetSize() > 0)
148  sh = (TSignalHandler *)signalHandlers->First();
149  if (sh)
151 
152  //remove stdin from eventloop and close it
153  TSeqCollection *fileHandlers = gSystem->GetListOfFileHandlers();
154  if (fileHandlers) {
155  for (auto h : *fileHandlers) {
156  if (h && ((TFileHandler *)h)->GetFd() == 0) {
158  break;
159  }
160  }
161  }
162  close(0);
163  if (fMon.GetListOfActives()) {
164  while (fMon.GetListOfActives()->GetSize() > 0) {
165  TSocket *s = (TSocket *) fMon.GetListOfActives()->First();
166  fMon.Remove(s);
167  delete s;
168  }
169  }
170  if (fMon.GetListOfDeActives()) {
171  while (fMon.GetListOfDeActives()->GetSize() > 0) {
173  fMon.Remove(s);
174  delete s;
175  }
176  }
177  //disable graphics
178  //these instructions were copied from TApplication::MakeBatch
179  gROOT->SetBatch();
181  delete gGuiFactory;
183 #ifndef R__WIN32
184  if (gVirtualX != gGXBatch)
185  delete gVirtualX;
186 #endif
188 
189  //prepare server and add it to eventloop
190  server.Init(sockets[1], nWorker);
191 
192  //enter worker loop
193  server.Run();
194  }
195 
196  //control should never reach here
197  return true;
198 }
199 
200 
201 //////////////////////////////////////////////////////////////////////////
202 /// Send a message with the specified code to at most nMessages workers.
203 /// Sockets can either be in an "active" or "non-active" state. This method
204 /// activates all the sockets through which the client is connected to the
205 /// workers, and deactivates them when a message is sent to the corresponding
206 /// worker. This way the sockets pertaining to workers who have been left
207 /// idle will be the only ones in the active list
208 /// (TSocket::GetMonitor()->GetListOfActives()) after execution.
209 /// \param code the code to send (e.g. EMPCode)
210 /// \param nMessages
211 /// \parblock
212 /// the maximum number of messages to send.
213 /// If `nMessages == 0 || nMessage > fNWorkers`, send a message to every worker.
214 /// \endparblock
215 /// \return the number of messages successfully sent
216 unsigned TMPClient::Broadcast(unsigned code, unsigned nMessages)
217 {
218  if (nMessages == 0)
219  nMessages = fNWorkers;
220  unsigned count = 0;
221  fMon.ActivateAll();
222 
223  //send message to all sockets
224  std::unique_ptr<TList> lp(fMon.GetListOfActives());
225  for (auto s : *lp) {
226  if (count == nMessages)
227  break;
228  if (MPSend((TSocket *)s, code)) {
229  fMon.DeActivate((TSocket *)s);
230  ++count;
231  } else {
232  Error("TMPClient:Broadcast", "[E] Could not send message to server\n");
233  }
234  }
235 
236  return count;
237 }
238 
239 
240 //////////////////////////////////////////////////////////////////////////
241 /// DeActivate a certain socket.
242 /// This does not remove it from the monitor: it will be reactivated by
243 /// the next call to Broadcast() (or possibly other methods that are
244 /// specified to do so).\n
245 /// A socket should be DeActivated when the corresponding
246 /// worker is done *for now* and we want to stop listening to this worker's
247 /// socket. If the worker is done *forever*, Remove() should be used instead.
248 /// \param s the socket to be deactivated
250 {
251  fMon.DeActivate(s);
252 }
253 
254 
255 //////////////////////////////////////////////////////////////////////////
256 /// Remove a certain socket from the monitor.
257 /// A socket should be Removed from the monitor when the
258 /// corresponding worker is done *forever*. For example HandleMPCode()
259 /// calls this method on sockets pertaining to workers which sent an
260 /// MPCode::kShutdownNotice.\n
261 /// If the worker is done *for now*, DeActivate should be used instead.
262 /// \param s the socket to be removed from the monitor fMon
264 {
265  fMon.Remove(s);
266  delete s;
267 }
268 
269 
270 //////////////////////////////////////////////////////////////////////////
271 /// Wait on worker processes and remove their pids from fWorkerPids.
272 /// A blocking waitpid is called, but this should actually not block
273 /// execution since ReapWorkers should only be called when all workers
274 /// have already quit. ReapWorkers is then called not to leave zombie
275 /// processes hanging around, and to clean-up fWorkerPids.
277 {
278  for (auto &pid : fWorkerPids) {
279  waitpid(pid, nullptr, 0);
280  }
281  fWorkerPids.clear();
282 }
283 
284 
285 //////////////////////////////////////////////////////////////////////////
286 /// Handle messages containing an EMPCode.
287 /// This method should be called upon receiving a message with a code >= 1000
288 /// (i.e. EMPCode). It handles the most generic types of messages.\n
289 /// Classes inheriting from TMPClient should implement a similar method
290 /// to handle message codes specific to the application they're part of.\n
291 /// \param msg the MPCodeBufPair returned by a MPRecv call
292 /// \param s
293 /// \parblock
294 /// a pointer to the socket from which the message has been received is passed.
295 /// This way HandleMPCode knows which socket to reply on.
296 /// \endparblock
298 {
299  unsigned code = msg.first;
300  //message contains server's pid. retrieve it
301  const char *str = ReadBuffer<const char*>(msg.second.get());
302 
303  if (code == MPCode::kMessage) {
304  Error("TMPClient::HandleMPCode", "[I][C] message received: %s\n", str);
305  } else if (code == MPCode::kError) {
306  Error("TMPClient::HandleMPCode", "[E][C] error message received: %s\n", str);
307  } else if (code == MPCode::kShutdownNotice || code == MPCode::kFatalError) {
308  if (gDebug > 0) //generally users don't want to know this
309  Error("TMPClient::HandleMPCode", "[I][C] shutdown notice received from %s\n", str);
310  Remove(s);
311  } else
312  Error("TMPClient::HandleMPCode", "[W][C] unknown code received. code=%d\n", code);
313  delete [] str;
314 }
R__EXTERN TGuiFactory * gBatchGuiFactory
Definition: TGuiFactory.h:69
virtual Bool_t IsValid() const
Definition: TSocket.h:162
std::vector< pid_t > fWorkerPids
A vector containing the PIDs of children processes/workers.
Definition: TMPClient.h:48
virtual void Delete(Option_t *option="")
Remove all objects from the list AND delete all heap based objects.
Definition: TList.cxx:405
virtual void Remove(TSocket *sock)
Remove a socket from the monitor.
Definition: TMonitor.cxx:214
bool Fork(TMPWorker &server)
This method forks the ROOT session into fNWorkers children processes.
Definition: TMPClient.cxx:99
virtual TSeqCollection * GetListOfFileHandlers() const
Definition: TSystem.h:374
TMonitor fMon
This object manages the sockets and detect socket events via TMonitor::Select.
Definition: TMPClient.h:49
TH1 * h
Definition: legend2.C:5
This class works in conjuction with TMPClient, reacting to messages received from it as specified by ...
Definition: TMPWorker.h:30
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
Definition: MPSendRecv.cxx:32
#define gROOT
Definition: TROOT.h:364
virtual void Add(TSocket *sock, Int_t interest=kRead)
Add socket to the monitor&#39;s active list.
Definition: TMonitor.cxx:168
Error message.
Definition: MPCode.h:30
virtual void RemoveAll()
Remove all sockets from the monitor.
Definition: TMonitor.cxx:241
~TMPClient()
Class destructor.
Definition: TMPClient.cxx:69
virtual TFileHandler * RemoveFileHandler(TFileHandler *fh)
Remove a file handler from the list of file handlers.
Definition: TSystem.cxx:569
unsigned fNWorkers
The number of workers that should be spawned upon forking.
Definition: TMPClient.h:50
void Remove(TSocket *s)
Remove a certain socket from the monitor.
Definition: TMPClient.cxx:263
Sequenceable collection abstract base class.
virtual int GetSysInfo(SysInfo_t *info) const
Returns static system info, like OS type, CPU type, number of CPUs RAM size, etc into the SysInfo_t s...
Definition: TSystem.cxx:2440
virtual void DeActivate(TSocket *sock)
De-activate a socket.
Definition: TMonitor.cxx:284
void DeActivate(TSocket *s)
DeActivate a certain socket.
Definition: TMPClient.cxx:249
R__EXTERN TGuiFactory * gGuiFactory
Definition: TGuiFactory.h:68
Used by the workers to notify client of shutdown.
Definition: MPCode.h:33
void Error(const char *location, const char *msgfmt,...)
A doubly linked list.
Definition: TList.h:47
Fatal error: whoever sends this message is terminating execution.
Definition: MPCode.h:31
R__EXTERN TVirtualX * gGXBatch
Definition: TVirtualX.h:365
Int_t fCpus
Definition: TSystem.h:165
virtual TObject * First() const
Return the first object in the list. Returns 0 when list is empty.
Definition: TList.cxx:557
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
Definition: MPSendRecv.h:31
R__EXTERN TSystem * gSystem
Definition: TSystem.h:549
TList * GetListOfActives() const
Returns a list with all active sockets.
Definition: TMonitor.cxx:498
virtual TSeqCollection * GetListOfSignalHandlers() const
Definition: TSystem.h:371
TLine * l
Definition: textangle.C:4
#define gVirtualX
Definition: TVirtualX.h:362
void Run()
Definition: TMPWorker.cxx:120
TMPClient(unsigned nWorkers=0)
Class constructor.
Definition: TMPClient.cxx:50
Used by the client to tell servers to shutdown.
Definition: MPCode.h:32
virtual TSignalHandler * RemoveSignalHandler(TSignalHandler *sh)
Remove a signal handler from list of signal handlers.
Definition: TSystem.cxx:547
bool fIsParent
This is true if this is the parent/client process, false if this is a child/worker process...
Definition: TMPClient.h:47
TList * GetListOfDeActives() const
Returns a list with all de-active sockets.
Definition: TMonitor.cxx:515
void HandleMPCode(MPCodeBufPair &msg, TSocket *sender)
Handle messages containing an EMPCode.
Definition: TMPClient.cxx:297
Generic message.
Definition: MPCode.h:29
virtual void Init(int fd, unsigned workerN)
This method is called by children processes right after forking.
Definition: TMPWorker.cxx:111
R__EXTERN Int_t gDebug
Definition: Rtypes.h:128
virtual void ActivateAll()
Activate all de-activated sockets.
Definition: TMonitor.cxx:268
unsigned Broadcast(unsigned code, unsigned nMessages=0)
Send a message with the specified code to at most nMessages workers.
Definition: TMPClient.cxx:216
virtual Int_t GetSize() const
Definition: TCollection.h:95
void ReapWorkers()
Wait on worker processes and remove their pids from fWorkerPids.
Definition: TMPClient.cxx:276
virtual TObject * First() const =0