Logo ROOT   6.12/07
Reference Guide
TMPWorker.cxx
Go to the documentation of this file.
1 /* @(#)root/multiproc:$Id$ */
2 // Author: Enrico Guiraud July 2015
3 // Modified: G Ganis Jan 2017
4 
5 /*************************************************************************
6  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
7  * All rights reserved. *
8  * *
9  * For the licensing terms see $ROOTSYS/LICENSE. *
10  * For the list of contributors see $ROOTSYS/README/CREDITS. *
11  *************************************************************************/
12 
13 #include "MPCode.h"
14 #include "MPSendRecv.h"
15 #include "TEnv.h"
16 #include "TError.h"
17 #include "TMPWorker.h"
18 #include "TSystem.h"
19 #include <memory> //unique_ptr
20 #include <string>
21 
22 #include <iostream>
23 
24 //////////////////////////////////////////////////////////////////////////
25 ///
26 /// \class TMPWorker
27 ///
28 /// This class works in conjuction with TMPClient, reacting to messages
29 /// received from it as specified by the Notify and HandleInput methods.
30 /// When TMPClient::Fork is called, a TMPWorker instance is passed to it
31 /// which will take control of the ROOT session in the children processes.
32 ///
33 /// After forking, every time a message is sent or broadcast to the workers,
34 /// TMPWorker::Notify is called and the message is retrieved.
35 /// Messages exchanged between TMPClient and TMPWorker should be sent with
36 /// the MPSend() standalone function.\n
37 /// If the code of the message received is above 1000 (i.e. it is an MPCode)
38 /// the qualified TMPWorker::HandleInput method is called, that takes care
39 /// of handling the most generic type of messages. Otherwise the unqualified
40 /// (possibly overridden) version of HandleInput is called, allowing classes
41 /// that inherit from TMPWorker to manage their own protocol.\n
42 /// An application's worker class should inherit from TMPWorker and implement
43 /// a HandleInput method that overrides TMPWorker's.\n
44 ///
45 //////////////////////////////////////////////////////////////////////////
46 
47 //////////////////////////////////////////////////////////////////////////
48 /// This method is called by children processes right after forking.
49 /// Initialization of worker properties that must be delayed until after
50 /// forking must be done here.\n
51 /// For example, Init saves the pid into fPid, and adds the TMPWorker to
52 /// the main eventloop (as a TFileHandler).\n
53 /// Make sure this operations are performed also by overriding implementations,
54 /// e.g. by calling TMPWorker::Init explicitly.
55 void TMPWorker::Init(int fd, unsigned workerN)
56 {
57  fS.reset(new TSocket(fd, "MPsock")); //TSocket's constructor with this signature seems much faster than TSocket(int fd)
58  fPid = getpid();
59  fNWorker = workerN;
60  fId = "W" + std::to_string(GetNWorker()) + "|P" + std::to_string(GetPid());
61 }
62 
63 
65 {
66  while(true) {
67  MPCodeBufPair msg = MPRecv(fS.get());
68  if (msg.first == MPCode::kRecvError) {
69  Error("TMPWorker::Run", "Lost connection to client\n");
70  gSystem->Exit(0);
71  }
72 
73  if (msg.first < 1000)
74  HandleInput(msg); //call overridden method
75  else
76  TMPWorker::HandleInput(msg); //call this class' method
77  }
78 }
79 
80 
81 //////////////////////////////////////////////////////////////////////////
82 /// Handle a message with an EMPCode.
83 /// This method is called upon receiving a message with a code >= 1000 (i.e.
84 /// EMPCode). It handles the most generic types of messages.\n
85 /// Classes inheriting from TMPWorker should implement their own HandleInput
86 /// function, that should be able to handle codes specific to that application.\n
87 /// The appropriate version of the HandleInput method (TMPWorker's or the
88 /// overriding version) is automatically called depending on the message code.
90 {
91  unsigned code = msg.first;
92 
93  std::string reply = fId;
94  if (code == MPCode::kMessage) {
95  //general message, ignore it
96  reply += ": ok";
97  MPSend(fS.get(), MPCode::kMessage, reply.c_str());
98  } else if (code == MPCode::kError) {
99  //general error, ignore it
100  reply += ": ko";
101  MPSend(fS.get(), MPCode::kMessage, reply.c_str());
102  } else if (code == MPCode::kShutdownOrder || code == MPCode::kFatalError) {
103  //client is asking the server to shutdown or client is dying
104  MPSend(fS.get(), MPCode::kShutdownNotice, reply.c_str());
105  gSystem->Exit(0);
106  } else {
107  reply += ": unknown code received. code=" + std::to_string(code);
108  MPSend(fS.get(), MPCode::kError, reply.c_str());
109  }
110 }
111 
112 //////////////////////////////////////////////////////////////////////////
113 /// Error sender
114 
115 void TMPWorker::SendError(const std::string& errmsg, unsigned int errcode)
116 {
117  std::string reply = fId + ": " + errmsg;
118  MPSend(GetSocket(), errcode, reply.c_str());
119 }
pid_t GetPid()
Definition: TMPWorker.h:44
virtual void HandleInput(MPCodeBufPair &msg)
Handle a message with an EMPCode.
Definition: TMPWorker.cxx:89
pid_t fPid
the PID of the process in which this worker is running
Definition: TMPWorker.h:59
unsigned fNWorker
the ordinal number of this worker (0 to nWorkers-1)
Definition: TMPWorker.h:60
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
Definition: MPSendRecv.cxx:32
Error message.
Definition: MPCode.h:47
Error while reading from the socket.
Definition: MPCode.h:51
void SendError(const std::string &errmsg, unsigned int code=MPCode::kError)
Error sender.
Definition: TMPWorker.cxx:115
unsigned GetNWorker() const
Definition: TMPWorker.h:45
Used by the workers to notify client of shutdown.
Definition: MPCode.h:50
void Error(const char *location, const char *msgfmt,...)
std::unique_ptr< TSocket > fS
This worker&#39;s socket. The unique_ptr makes sure resources are released.
Definition: TMPWorker.h:58
Fatal error: whoever sends this message is terminating execution.
Definition: MPCode.h:48
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
Definition: MPSendRecv.h:31
R__EXTERN TSystem * gSystem
Definition: TSystem.h:540
TSocket * GetSocket()
Definition: TMPWorker.h:43
void Run()
Definition: TMPWorker.cxx:64
Used by the client to tell servers to shutdown.
Definition: MPCode.h:49
std::string fId
identifier string in the form W<nwrk>|P<proc id>="">
Definition: TMPWorker.h:48
virtual void Exit(int code, Bool_t mode=kTRUE)
Exit the application.
Definition: TSystem.cxx:724
Generic message.
Definition: MPCode.h:46
virtual void Init(int fd, unsigned workerN)
This method is called by children processes right after forking.
Definition: TMPWorker.cxx:55
MPCodeBufPair MPRecv(TSocket *s)
Receive message from a socket.
Definition: MPSendRecv.cxx:54