Logo ROOT  
Reference Guide
TMPWorker.cxx
Go to the documentation of this file.
1/* @(#)root/multiproc:$Id$ */
2// Author: Enrico Guiraud July 2015
3// Modified: G Ganis Jan 2017
4
5/*************************************************************************
6 * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
7 * All rights reserved. *
8 * *
9 * For the licensing terms see $ROOTSYS/LICENSE. *
10 * For the list of contributors see $ROOTSYS/README/CREDITS. *
11 *************************************************************************/
12
13#include "MPCode.h"
14#include "MPSendRecv.h"
15#include "TError.h"
16#include "TMPWorker.h"
17#include "TSystem.h"
18#include <memory> //unique_ptr
19#include <string>
20
21//////////////////////////////////////////////////////////////////////////
22///
23/// \class TMPWorker
24///
25/// This class works in conjuction with TMPClient, reacting to messages
26/// received from it as specified by the Notify and HandleInput methods.
27/// When TMPClient::Fork is called, a TMPWorker instance is passed to it
28/// which will take control of the ROOT session in the children processes.
29///
30/// After forking, every time a message is sent or broadcast to the workers,
31/// TMPWorker::Notify is called and the message is retrieved.
32/// Messages exchanged between TMPClient and TMPWorker should be sent with
33/// the MPSend() standalone function.\n
34/// If the code of the message received is above 1000 (i.e. it is an MPCode)
35/// the qualified TMPWorker::HandleInput method is called, that takes care
36/// of handling the most generic type of messages. Otherwise the unqualified
37/// (possibly overridden) version of HandleInput is called, allowing classes
38/// that inherit from TMPWorker to manage their own protocol.\n
39/// An application's worker class should inherit from TMPWorker and implement
40/// a HandleInput method that overrides TMPWorker's.\n
41///
42//////////////////////////////////////////////////////////////////////////
43
44//////////////////////////////////////////////////////////////////////////
45/// This method is called by children processes right after forking.
46/// Initialization of worker properties that must be delayed until after
47/// forking must be done here.\n
48/// For example, Init saves the pid into fPid, and adds the TMPWorker to
49/// the main eventloop (as a TFileHandler).\n
50/// Make sure this operations are performed also by overriding implementations,
51/// e.g. by calling TMPWorker::Init explicitly.
52void TMPWorker::Init(int fd, unsigned workerN)
53{
54 fS.reset(new TSocket(fd, "MPsock")); //TSocket's constructor with this signature seems much faster than TSocket(int fd)
55 fPid = getpid();
56 fNWorker = workerN;
57 fId = "W" + std::to_string(GetNWorker()) + "|P" + std::to_string(GetPid());
58}
59
60
62{
63 while(true) {
64 MPCodeBufPair msg = MPRecv(fS.get());
65 if (msg.first == MPCode::kRecvError) {
66 Error("TMPWorker::Run", "Lost connection to client\n");
67 gSystem->Exit(0);
68 }
69
70 if (msg.first < 1000)
71 HandleInput(msg); //call overridden method
72 else
73 TMPWorker::HandleInput(msg); //call this class' method
74 }
75}
76
77
78//////////////////////////////////////////////////////////////////////////
79/// Handle a message with an EMPCode.
80/// This method is called upon receiving a message with a code >= 1000 (i.e.
81/// EMPCode). It handles the most generic types of messages.\n
82/// Classes inheriting from TMPWorker should implement their own HandleInput
83/// function, that should be able to handle codes specific to that application.\n
84/// The appropriate version of the HandleInput method (TMPWorker's or the
85/// overriding version) is automatically called depending on the message code.
87{
88 unsigned code = msg.first;
89
90 std::string reply = fId;
91 if (code == MPCode::kMessage) {
92 //general message, ignore it
93 reply += ": ok";
94 MPSend(fS.get(), MPCode::kMessage, reply.c_str());
95 } else if (code == MPCode::kError) {
96 //general error, ignore it
97 reply += ": ko";
98 MPSend(fS.get(), MPCode::kMessage, reply.c_str());
99 } else if (code == MPCode::kShutdownOrder || code == MPCode::kFatalError) {
100 //client is asking the server to shutdown or client is dying
101 MPSend(fS.get(), MPCode::kShutdownNotice, reply.c_str());
102 gSystem->Exit(0);
103 } else {
104 reply += ": unknown code received. code=" + std::to_string(code);
105 MPSend(fS.get(), MPCode::kError, reply.c_str());
106 }
107}
108
109//////////////////////////////////////////////////////////////////////////
110/// Error sender
111
112void TMPWorker::SendError(const std::string& errmsg, unsigned int errcode)
113{
114 std::string reply = fId + ": " + errmsg;
115 MPSend(GetSocket(), errcode, reply.c_str());
116}
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
Definition: MPSendRecv.h:32
MPCodeBufPair MPRecv(TSocket *s)
Receive message from a socket.
Definition: MPSendRecv.cxx:54
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
Definition: MPSendRecv.cxx:32
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
Definition: TError.cxx:187
R__EXTERN TSystem * gSystem
Definition: TSystem.h:559
void SendError(const std::string &errmsg, unsigned int code=MPCode::kError)
Error sender.
Definition: TMPWorker.cxx:112
unsigned GetNWorker() const
Definition: TMPWorker.h:41
std::string fId
identifier string in the form W<nwrk>|P<proc id>
Definition: TMPWorker.h:44
pid_t GetPid()
Definition: TMPWorker.h:40
TSocket * GetSocket()
Definition: TMPWorker.h:39
unsigned fNWorker
the ordinal number of this worker (0 to nWorkers-1)
Definition: TMPWorker.h:56
virtual void HandleInput(MPCodeBufPair &msg)
Handle a message with an EMPCode.
Definition: TMPWorker.cxx:86
void Run()
Definition: TMPWorker.cxx:61
pid_t fPid
the PID of the process in which this worker is running
Definition: TMPWorker.h:55
std::unique_ptr< TSocket > fS
This worker's socket. The unique_ptr makes sure resources are released.
Definition: TMPWorker.h:54
virtual void Init(int fd, unsigned workerN)
This method is called by children processes right after forking.
Definition: TMPWorker.cxx:52
virtual void Exit(int code, Bool_t mode=kTRUE)
Exit the application.
Definition: TSystem.cxx:719
@ kMessage
Generic message.
Definition: MPCode.h:46
@ kRecvError
Error while reading from the socket.
Definition: MPCode.h:51
@ kError
Error message.
Definition: MPCode.h:47
@ kFatalError
Fatal error: whoever sends this message is terminating execution.
Definition: MPCode.h:48
@ kShutdownOrder
Used by the client to tell servers to shutdown.
Definition: MPCode.h:49
@ kShutdownNotice
Used by the workers to notify client of shutdown.
Definition: MPCode.h:50