ROOT  6.06/09
Reference Guide
TMPWorker.cxx
Go to the documentation of this file.
1 #include "TMPWorker.h"
2 #include "MPCode.h"
3 #include "MPSendRecv.h"
4 #include "TSystem.h"
5 #include <string>
6 #include <memory> //unique_ptr
7 #include <iostream>
8 
9 //////////////////////////////////////////////////////////////////////////
10 ///
11 /// \class TMPWorker
12 ///
13 /// This class works in conjuction with TMPClient, reacting to messages
14 /// received from it as specified by the Notify and HandleInput methods.
15 /// When TMPClient::Fork is called, a TMPWorker instance is passed to it
16 /// which will take control of the ROOT session in the children processes.
17 ///
18 /// After forking, every time a message is sent or broadcast to the workers,
19 /// TMPWorker::Notify is called and the message is retrieved.
20 /// Messages exchanged between TMPClient and TMPWorker should be sent with
21 /// the MPSend() standalone function.\n
22 /// If the code of the message received is above 1000 (i.e. it is an MPCode)
23 /// the qualified TMPWorker::HandleInput method is called, that takes care
24 /// of handling the most generic type of messages. Otherwise the unqualified
25 /// (possibly overridden) version of HandleInput is called, allowing classes
26 /// that inherit from TMPWorker to manage their own protocol.\n
27 /// An application's worker class should inherit from TMPWorker and implement
28 /// a HandleInput method that overrides TMPWorker's.\n
29 ///
30 //////////////////////////////////////////////////////////////////////////
31 
32 
33 
34 //////////////////////////////////////////////////////////////////////////
35 /// Class constructor.
36 /// Note that this does not set variables like fPid or fS (worker's socket).\n
37 /// These operations are handled by the Init method, which is called after
38 /// forking.\n
39 /// This separation is in place because the instantiation of a worker
40 /// must be done once _before_ forking, while the initialization of the
41 /// members must be done _after_ forking by each of the children processes.
42 TMPWorker::TMPWorker() : fS(), fPid(0), fNWorker(0)
43 {
44 }
45 
46 
47 //////////////////////////////////////////////////////////////////////////
48 /// This method is called by children processes right after forking.
49 /// Initialization of worker properties that must be delayed until after
50 /// forking must be done here.\n
51 /// For example, Init saves the pid into fPid, and adds the TMPWorker to
52 /// the main eventloop (as a TFileHandler).\n
53 /// Make sure this operations are performed also by overriding implementations,
54 /// e.g. by calling TMPWorker::Init explicitly.
55 void TMPWorker::Init(int fd, unsigned workerN)
56 {
57  fS.reset(new TSocket(fd, "MPsock")); //TSocket's constructor with this signature seems much faster than TSocket(int fd)
58  fPid = getpid();
59  fNWorker = workerN;
60 }
61 
62 
64 {
65  while(true) {
66  MPCodeBufPair msg = MPRecv(fS.get());
67  if (msg.first == MPCode::kRecvError) {
68  std::cerr << "Lost connection to client\n";
69  gSystem->Exit(0);
70  }
71 
72  if (msg.first < 1000)
73  HandleInput(msg); //call overridden method
74  else
75  TMPWorker::HandleInput(msg); //call this class' method
76  }
77 }
78 
79 
80 //////////////////////////////////////////////////////////////////////////
81 /// Handle a message with an EMPCode.
82 /// This method is called upon receiving a message with a code >= 1000 (i.e.
83 /// EMPCode). It handles the most generic types of messages.\n
84 /// Classes inheriting from TMPWorker should implement their own HandleInput
85 /// function, that should be able to handle codes specific to that application.\n
86 /// The appropriate version of the HandleInput method (TMPWorker's or the
87 /// overriding version) is automatically called depending on the message code.
89 {
90  unsigned code = msg.first;
91 
92  std::string reply = "S" + std::to_string(fNWorker);
93  if (code == MPCode::kMessage) {
94  //general message, ignore it
95  reply += ": ok";
96  MPSend(fS.get(), MPCode::kMessage, reply.data());
97  } else if (code == MPCode::kError) {
98  //general error, ignore it
99  reply += ": ko";
100  MPSend(fS.get(), MPCode::kMessage, reply.data());
101  } else if (code == MPCode::kShutdownOrder || code == MPCode::kFatalError) {
102  //client is asking the server to shutdown or client is dying
103  MPSend(fS.get(), MPCode::kShutdownNotice, reply.data());
104  gSystem->Exit(0);
105  } else {
106  reply += ": unknown code received. code=" + std::to_string(code);
107  MPSend(fS.get(), MPCode::kError, reply.data());
108  }
109 }
virtual void HandleInput(MPCodeBufPair &msg)
Handle a message with an EMPCode.
Definition: TMPWorker.cxx:88
pid_t fPid
the PID of the process in which this worker is running
Definition: TMPWorker.h:42
unsigned fNWorker
the ordinal number of this worker (0 to nWorkers-1)
Definition: TMPWorker.h:43
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
Definition: MPSendRecv.cxx:21
Error message.
Definition: MPCode.h:30
Error while reading from the socket.
Definition: MPCode.h:34
Used by the workers to notify client of shutdown.
Definition: MPCode.h:33
std::unique_ptr< TSocket > fS
This worker's socket. The unique_ptr makes sure resources are released.
Definition: TMPWorker.h:41
Fatal error: whoever sends this message is terminating execution.
Definition: MPCode.h:31
R__EXTERN TSystem * gSystem
Definition: TSystem.h:549
TMPWorker()
Class constructor.
Definition: TMPWorker.cxx:42
void Run()
Definition: TMPWorker.cxx:63
std::pair< unsigned, std::unique_ptr< TBufferFile >> MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
Definition: MPSendRecv.h:20
Used by the client to tell servers to shutdown.
Definition: MPCode.h:32
virtual void Exit(int code, Bool_t mode=kTRUE)
Exit the application.
Definition: TSystem.cxx:720
Generic message.
Definition: MPCode.h:29
virtual void Init(int fd, unsigned workerN)
This method is called by children processes right after forking.
Definition: TMPWorker.cxx:55
MPCodeBufPair MPRecv(TSocket *s)
Receive message from a socket.
Definition: MPSendRecv.cxx:43