1/* @(#)root/multiproc:$Id$ */
2// Author: Enrico Guiraud July 2015
3// Modified: G Ganis Jan 2017
6 * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
7 * All rights reserved. *
8 * *
9 * For the licensing terms see $ROOTSYS/LICENSE. *
10 * For the list of contributors see $ROOTSYS/README/CREDITS. *
11 *************************************************************************/
13#include "MPCode.h"
14#include "MPSendRecv.h"
15#include "TError.h"
16#include "TMPWorker.h"
17#include "TSystem.h"
18#include <memory> //unique_ptr
19#include <string>
23/// \class TMPWorker
25/// This class works in conjuction with TMPClient, reacting to messages
26/// received from it as specified by the Notify and HandleInput methods.
27/// When TMPClient::Fork is called, a TMPWorker instance is passed to it
28/// which will take control of the ROOT session in the children processes.
30/// After forking, every time a message is sent or broadcast to the workers,
31/// TMPWorker::Notify is called and the message is retrieved.
32/// Messages exchanged between TMPClient and TMPWorker should be sent with
33/// the MPSend() standalone function.\n
34/// If the code of the message received is above 1000 (i.e. it is an MPCode)
35/// the qualified TMPWorker::HandleInput method is called, that takes care
36/// of handling the most generic type of messages. Otherwise the unqualified
37/// (possibly overridden) version of HandleInput is called, allowing classes
38/// that inherit from TMPWorker to manage their own protocol.\n
39/// An application's worker class should inherit from TMPWorker and implement
40/// a HandleInput method that overrides TMPWorker's.\n
45/// This method is called by children processes right after forking.
46/// Initialization of worker properties that must be delayed until after
47/// forking must be done here.\n
48/// For example, Init saves the pid into fPid, and adds the TMPWorker to
49/// the main eventloop (as a TFileHandler).\n
50/// Make sure this operations are performed also by overriding implementations,
51/// e.g. by calling TMPWorker::Init explicitly.
52void TMPWorker::Init(int fd, unsigned workerN)
54 fS.reset(new TSocket(fd, "MPsock")); //TSocket's constructor with this signature seems much faster than TSocket(int fd)
55 fPid = getpid();
56 fNWorker = workerN;
57 fId = "W" + std::to_string(GetNWorker()) + "|P" + std::to_string(GetPid());
63 while(true) {
64 MPCodeBufPair msg = MPRecv(fS.get());
65 if (msg.first == MPCode::kRecvError) {
66 Error("TMPWorker::Run", "Lost connection to client\n");
67 gSystem->Exit(0);
68 }
70 if (msg.first < 1000)
71 HandleInput(msg); //call overridden method
72 else
73 TMPWorker::HandleInput(msg); //call this class' method
74 }
79/// Handle a message with an EMPCode.
80/// This method is called upon receiving a message with a code >= 1000 (i.e.
81/// EMPCode). It handles the most generic types of messages.\n
82/// Classes inheriting from TMPWorker should implement their own HandleInput
83/// function, that should be able to handle codes specific to that application.\n
84/// The appropriate version of the HandleInput method (TMPWorker's or the
85/// overriding version) is automatically called depending on the message code.
88 unsigned code = msg.first;
90 std::string reply = fId;
91 if (code == MPCode::kMessage) {
92 //general message, ignore it
93 reply += ": ok";
94 MPSend(fS.get(), MPCode::kMessage, reply.c_str());
95 } else if (code == MPCode::kError) {
96 //general error, ignore it
97 reply += ": ko";
98 MPSend(fS.get(), MPCode::kMessage, reply.c_str());
99 } else if (code == MPCode::kShutdownOrder || code == MPCode::kFatalError) {
100 //client is asking the server to shutdown or client is dying
101 MPSend(fS.get(), MPCode::kShutdownNotice, reply.c_str());
102 gSystem->Exit(0);
103 } else {
104 reply += ": unknown code received. code=" + std::to_string(code);
105 MPSend(fS.get(), MPCode::kError, reply.c_str());
106 }
110/// Error sender
112void TMPWorker::SendError(const std::string& errmsg, unsigned int errcode)
114 std::string reply = fId + ": " + errmsg;
115 MPSend(GetSocket(), errcode, reply.c_str());
