Logo ROOT  
Reference Guide
TMPClient.cxx
Go to the documentation of this file.
1/* @(#)root/multiproc:$Id$ */
2// Author: Enrico Guiraud July 2015
3
4/*************************************************************************
5 * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12#include "MPCode.h"
13#include "TGuiFactory.h" //gGuiFactory
14#include "TError.h" //gErrorIgnoreLevel
15#include "TMPClient.h"
16#include "TMPWorker.h"
17#include "TROOT.h" //gROOT
18#include "TSocket.h"
19#include "TSystem.h" //gSystem
20#include "TVirtualX.h" //gVirtualX
21#include <errno.h> //errno, used by socketpair
22#include <memory> //unique_ptr
23#include <sys/socket.h> //socketpair
24#include <sys/wait.h> // waitpid
25#include <unistd.h> // close, fork
26#include <dlfcn.h>
27
28//////////////////////////////////////////////////////////////////////////
29///
30/// \class TMPClient
31///
32/// Base class for multiprocess applications' clients. It provides a
33/// simple interface to fork a ROOT session into server/worker sessions
34/// and exchange messages with them. Multiprocessing applications can build
35/// on TMPClient and TMPWorker: the class providing multiprocess
36/// functionalities to users should inherit (possibly privately) from
37/// TMPClient, and the workers executing tasks should inherit from TMPWorker.
38///
39//////////////////////////////////////////////////////////////////////////
40
41//////////////////////////////////////////////////////////////////////////
42/// Class constructor.
43/// \param nWorkers
44/// \parblock
45/// the number of children processes that will be created by
46/// Fork, i.e. the number of workers that will be available after this call.
47/// The default value (0) means that a number of workers equal to the number
48/// of cores of the machine is going to be spawned. If that information is
49/// not available, 2 workers are created instead.
50/// \endparblock
51TMPClient::TMPClient(unsigned nWorkers) : fIsParent(true), fWorkerPids(), fMon(), fNWorkers(0)
52{
53 // decide on number of workers
54 if (nWorkers) {
55 fNWorkers = nWorkers;
56 } else {
57 SysInfo_t si;
58 if (gSystem->GetSysInfo(&si) == 0)
59 fNWorkers = si.fCpus;
60 else
61 fNWorkers = 2;
62 }
63}
64
65
66//////////////////////////////////////////////////////////////////////////
67/// Class destructor.
68/// This method is in charge of shutting down any remaining worker,
69/// closing off connections and reap the terminated children processes.
71{
74 l->Delete();
75 delete l;
77 l->Delete();
78 delete l;
81}
82
83namespace ROOT {
84 namespace Internal {
85 /// Class to acquire and release the Python GIL where it applies, i.e.
86 /// if libPython is loaded and the interpreter is initialized.
87 class TGILRAII {
88 using Py_IsInitialized_type = int (*)(void);
89 using PyGILState_Ensure_type = void* (*)(void);
90 using PyGILState_Release_type = void (*)(void*);
91 void* fPyGILState_STATE = nullptr;
92 template<class FPTYPE>
93 FPTYPE GetSymT(const char* name) {return (FPTYPE) dlsym(nullptr,name);}
94 public:
95 TGILRAII()
96 {
97 auto Py_IsInitialized = GetSymT<Py_IsInitialized_type>("Py_IsInitialized");
98 if (!Py_IsInitialized || !Py_IsInitialized()) return;
99 auto PyGILState_Ensure = GetSymT<PyGILState_Ensure_type>("PyGILState_Ensure");
100 if (PyGILState_Ensure) fPyGILState_STATE = PyGILState_Ensure();
101 }
102
103 ~TGILRAII()
104 {
105 auto PyGILState_Release = GetSymT<PyGILState_Release_type>("PyGILState_Release");
106 if (fPyGILState_STATE && PyGILState_Release) PyGILState_Release(fPyGILState_STATE);
107 }
108 };
109 }
110}
111
112//////////////////////////////////////////////////////////////////////////
113/// This method forks the ROOT session into fNWorkers children processes.
114/// The ROOT sessions spawned in this way will not have graphical
115/// capabilities and will not read from standard input, but will be
116/// connected to the original (interactive) session through TSockets.
117/// The children processes' PIDs are added to the fWorkerPids vector.
118/// The parent session can then communicate with the children using the
119/// Broadcast and MPSend methods, and receive messages through MPRecv.\n
120/// \param server
121/// \parblock
122/// A pointer to an instance of the class that will take control
123/// of the subprocesses after forking. Applications should implement their
124/// own class inheriting from TMPWorker. Behaviour can be customized
125/// overriding TMPWorker::HandleInput.
126/// \endparblock
127/// \return true if Fork succeeded, false otherwise
129{
130 std::string basePath = "/tmp/ROOTMP-";
131
132 //fork as many times as needed and save pids
133 pid_t pid = 1; //must be positive to handle the case in which fNWorkers is 0
134 int sockets[2]; //sockets file descriptors
135 unsigned nWorker = 0;
136 for (; nWorker < fNWorkers; ++nWorker) {
137 //create socket pair
138 int ret = socketpair(AF_UNIX, SOCK_STREAM, 0, sockets);
139 if (ret != 0) {
140 Error("TMPClient::Fork", "[E][C] Could not create socketpair. Error n. . Now retrying.\n%d", errno);
141 --nWorker;
142 continue;
143 }
144
145 //fork
146 {
147 ROOT::Internal::TGILRAII tgilraai;
148 pid = fork();
149 }
150
151 if (!pid) {
152 //child process, exit loop. sockets[1] is the fd that should be used
153 break;
154 } else {
155 //parent process, create TSocket with current value of sockets[0]
156 close(sockets[1]); //we don't need this
157 TSocket *s = new TSocket(sockets[0], (std::to_string(pid)).c_str()); //TSocket's constructor with this signature seems much faster than TSocket(int fd)
158 if (s && s->IsValid()) {
159 fMon.Add(s);
160 fWorkerPids.push_back(pid);
161 } else {
162 Error("TMPClient::Fork","[E][C] Could not connect to worker with pid %d. Giving up.\n", pid);
163 delete s;
164 }
165 }
166 }
167
168 if (pid) {
169 //parent returns here
170 return true;
171 } else {
172 //CHILD/WORKER
173 fIsParent = false;
174 close(sockets[0]); //we don't need this
175
176 //override signal handler (make the servers exit on SIGINT)
178 TSignalHandler *sh = nullptr;
179 if (signalHandlers && signalHandlers->GetSize() > 0)
180 sh = (TSignalHandler *)signalHandlers->First();
181 if (sh)
183
184 //remove stdin from eventloop and close it
186 if (fileHandlers) {
187 for (auto h : *fileHandlers) {
188 if (h && ((TFileHandler *)h)->GetFd() == 0) {
190 break;
191 }
192 }
193 }
194 close(0);
195
196 std::unique_ptr<TList> lofact(fMon.GetListOfActives());
197 while (lofact && (lofact->GetSize() > 0)) {
198 TSocket *s = (TSocket *) lofact->First();
199 lofact.reset(nullptr); // delete list before object is destroyed
200 fMon.Remove(s);
201 delete s;
202 lofact.reset(fMon.GetListOfActives());
203 }
204
205 std::unique_ptr<TList> lofdeact(fMon.GetListOfDeActives());
206 while (lofdeact && (lofdeact->GetSize() > 0)) {
207 TSocket *s = (TSocket *) lofdeact->First();
208 lofdeact.reset(nullptr); // delete list before object is destroyed
209 fMon.Remove(s);
210 delete s;
211 lofdeact.reset(fMon.GetListOfDeActives());
212 }
213
214 //disable graphics
215 //these instructions were copied from TApplication::MakeBatch
216 gROOT->SetBatch();
218 delete gGuiFactory;
220#ifndef R__WIN32
221 if (gVirtualX != gGXBatch)
222 delete gVirtualX;
223#endif
225
226 //prepare server and add it to eventloop
227 server.Init(sockets[1], nWorker);
228
229 //enter worker loop
230 server.Run();
231 }
232
233 //control should never reach here
234 return true;
235}
236
237
238//////////////////////////////////////////////////////////////////////////
239/// Send a message with the specified code to at most nMessages workers.
240/// Sockets can either be in an "active" or "non-active" state. This method
241/// activates all the sockets through which the client is connected to the
242/// workers, and deactivates them when a message is sent to the corresponding
243/// worker. This way the sockets pertaining to workers who have been left
244/// idle will be the only ones in the active list
245/// (TSocket::GetMonitor()->GetListOfActives()) after execution.
246/// \param code the code to send (e.g. EMPCode)
247/// \param nMessages
248/// \parblock
249/// the maximum number of messages to send.
250/// If `nMessages == 0 || nMessage > fNWorkers`, send a message to every worker.
251/// \endparblock
252/// \return the number of messages successfully sent
253unsigned TMPClient::Broadcast(unsigned code, unsigned nMessages)
254{
255 if (nMessages == 0)
256 nMessages = fNWorkers;
257 unsigned count = 0;
259
260 //send message to all sockets
261 std::unique_ptr<TList> lp(fMon.GetListOfActives());
262 for (auto s : *lp) {
263 if (count == nMessages)
264 break;
265 if (MPSend((TSocket *)s, code)) {
267 ++count;
268 } else {
269 Error("TMPClient:Broadcast", "[E] Could not send message to server\n");
270 }
271 }
272
273 return count;
274}
275
276
277//////////////////////////////////////////////////////////////////////////
278/// DeActivate a certain socket.
279/// This does not remove it from the monitor: it will be reactivated by
280/// the next call to Broadcast() (or possibly other methods that are
281/// specified to do so).\n
282/// A socket should be DeActivated when the corresponding
283/// worker is done *for now* and we want to stop listening to this worker's
284/// socket. If the worker is done *forever*, Remove() should be used instead.
285/// \param s the socket to be deactivated
287{
289}
290
291
292//////////////////////////////////////////////////////////////////////////
293/// Remove a certain socket from the monitor.
294/// A socket should be Removed from the monitor when the
295/// corresponding worker is done *forever*. For example HandleMPCode()
296/// calls this method on sockets pertaining to workers which sent an
297/// MPCode::kShutdownNotice.\n
298/// If the worker is done *for now*, DeActivate should be used instead.
299/// \param s the socket to be removed from the monitor fMon
301{
302 fMon.Remove(s);
303 delete s;
304}
305
306
307//////////////////////////////////////////////////////////////////////////
308/// Wait on worker processes and remove their pids from fWorkerPids.
309/// A blocking waitpid is called, but this should actually not block
310/// execution since ReapWorkers should only be called when all workers
311/// have already quit. ReapWorkers is then called not to leave zombie
312/// processes hanging around, and to clean-up fWorkerPids.
314{
315 for (auto &pid : fWorkerPids) {
316 waitpid(pid, nullptr, 0);
317 }
318 fWorkerPids.clear();
319}
320
321
322//////////////////////////////////////////////////////////////////////////
323/// Handle messages containing an EMPCode.
324/// This method should be called upon receiving a message with a code >= 1000
325/// (i.e. EMPCode). It handles the most generic types of messages.\n
326/// Classes inheriting from TMPClient should implement a similar method
327/// to handle message codes specific to the application they're part of.\n
328/// \param msg the MPCodeBufPair returned by a MPRecv call
329/// \param s
330/// \parblock
331/// a pointer to the socket from which the message has been received is passed.
332/// This way HandleMPCode knows which socket to reply on.
333/// \endparblock
335{
336 unsigned code = msg.first;
337 //message contains server's pid. retrieve it
338 const char *str = ReadBuffer<const char*>(msg.second.get());
339
340 if (code == MPCode::kMessage) {
341 Error("TMPClient::HandleMPCode", "[I][C] message received: %s\n", str);
342 } else if (code == MPCode::kError) {
343 Error("TMPClient::HandleMPCode", "[E][C] error message received: %s\n", str);
344 } else if (code == MPCode::kShutdownNotice || code == MPCode::kFatalError) {
345 if (gDebug > 0) //generally users don't want to know this
346 Error("TMPClient::HandleMPCode", "[I][C] shutdown notice received from %s\n", str);
347 Remove(s);
348 } else
349 Error("TMPClient::HandleMPCode", "[W][C] unknown code received. code=%d\n", code);
350 delete [] str;
351}
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
Definition: MPSendRecv.h:31
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
Definition: MPSendRecv.cxx:32
#define h(i)
Definition: RSha256.hxx:106
R__EXTERN Int_t gDebug
Definition: RtypesCore.h:117
void Error(const char *location, const char *msgfmt,...)
char name[80]
Definition: TGX11.cxx:109
R__EXTERN TGuiFactory * gBatchGuiFactory
Definition: TGuiFactory.h:67
R__EXTERN TGuiFactory * gGuiFactory
Definition: TGuiFactory.h:66
#define gROOT
Definition: TROOT.h:406
typedef void((*Func_t)())
R__EXTERN TSystem * gSystem
Definition: TSystem.h:556
#define gVirtualX
Definition: TVirtualX.h:338
R__EXTERN TVirtualX * gGXBatch
Definition: TVirtualX.h:341
virtual Int_t GetSize() const
Return the capacity of the collection, i.e.
Definition: TCollection.h:182
A doubly linked list.
Definition: TList.h:44
unsigned fNWorkers
The number of workers that should be spawned upon forking.
Definition: TMPClient.h:50
TMPClient(unsigned nWorkers=0)
Class constructor.
Definition: TMPClient.cxx:51
unsigned Broadcast(unsigned code, unsigned nMessages=0)
Send a message with the specified code to at most nMessages workers.
Definition: TMPClient.cxx:253
std::vector< pid_t > fWorkerPids
A vector containing the PIDs of children processes/workers.
Definition: TMPClient.h:48
~TMPClient()
Class destructor.
Definition: TMPClient.cxx:70
TMonitor fMon
This object manages the sockets and detect socket events via TMonitor::Select.
Definition: TMPClient.h:49
void HandleMPCode(MPCodeBufPair &msg, TSocket *sender)
Handle messages containing an EMPCode.
Definition: TMPClient.cxx:334
void ReapWorkers()
Wait on worker processes and remove their pids from fWorkerPids.
Definition: TMPClient.cxx:313
void Remove(TSocket *s)
Remove a certain socket from the monitor.
Definition: TMPClient.cxx:300
bool Fork(TMPWorker &server)
This method forks the ROOT session into fNWorkers children processes.
Definition: TMPClient.cxx:128
bool fIsParent
This is true if this is the parent/client process, false if this is a child/worker process.
Definition: TMPClient.h:47
void DeActivate(TSocket *s)
DeActivate a certain socket.
Definition: TMPClient.cxx:286
This class works in conjuction with TMPClient, reacting to messages received from it as specified by ...
Definition: TMPWorker.h:26
void Run()
Definition: TMPWorker.cxx:63
virtual void Init(int fd, unsigned workerN)
This method is called by children processes right after forking.
Definition: TMPWorker.cxx:54
virtual void RemoveAll()
Remove all sockets from the monitor.
Definition: TMonitor.cxx:241
virtual void ActivateAll()
Activate all de-activated sockets.
Definition: TMonitor.cxx:268
virtual void Add(TSocket *sock, Int_t interest=kRead)
Add socket to the monitor's active list.
Definition: TMonitor.cxx:168
virtual void DeActivate(TSocket *sock)
De-activate a socket.
Definition: TMonitor.cxx:284
TList * GetListOfActives() const
Returns a list with all active sockets.
Definition: TMonitor.cxx:498
TList * GetListOfDeActives() const
Returns a list with all de-active sockets.
Definition: TMonitor.cxx:515
virtual void Remove(TSocket *sock)
Remove a socket from the monitor.
Definition: TMonitor.cxx:214
Sequenceable collection abstract base class.
virtual TObject * First() const =0
virtual int GetSysInfo(SysInfo_t *info) const
Returns static system info, like OS type, CPU type, number of CPUs RAM size, etc into the SysInfo_t s...
Definition: TSystem.cxx:2459
virtual TFileHandler * RemoveFileHandler(TFileHandler *fh)
Remove a file handler from the list of file handlers.
Definition: TSystem.cxx:562
virtual TSeqCollection * GetListOfFileHandlers() const
Definition: TSystem.h:379
virtual TSeqCollection * GetListOfSignalHandlers() const
Definition: TSystem.h:376
virtual TSignalHandler * RemoveSignalHandler(TSignalHandler *sh)
Remove a signal handler from list of signal handlers.
Definition: TSystem.cxx:540
@ kMessage
Generic message.
Definition: MPCode.h:46
@ kError
Error message.
Definition: MPCode.h:47
@ kFatalError
Fatal error: whoever sends this message is terminating execution.
Definition: MPCode.h:48
@ kShutdownOrder
Used by the client to tell servers to shutdown.
Definition: MPCode.h:49
@ kShutdownNotice
Used by the workers to notify client of shutdown.
Definition: MPCode.h:50
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
Definition: StringConv.hxx:21
static constexpr double s
Int_t fCpus
Definition: TSystem.h:153
auto * l
Definition: textangle.C:4