22namespace MultiProcess {
27 socket->set(zmq::sockopt::immediate, optval);
67 auto makeAddrPrefix = [](pid_t pid) -> std::string {
69 return "ipc://" + tmpPath +
"/roofit_" + std::to_string(pid) +
"_roofitMP";
80 auto addrBase = makeAddrPrefix(getpid());
83 mq_push_->set(zmq::sockopt::sndhwm, hwm);
89 mq_pull_->set(zmq::sockopt::rcvhwm, hwm);
95 mw_pub_->set(zmq::sockopt::sndhwm, hwm);
99 wm_pull_->set(zmq::sockopt::rcvhwm, hwm);
107 bindAddr(subscriber_ping_socket, addrBase +
"_subscriber_ping_socket");
109 subscriber_ping_poller.
register_socket(*subscriber_ping_socket, zmq::event_flags::pollin);
110 std::size_t N_subscribers_confirmed = 0;
111 while (N_subscribers_confirmed < process_manager.
N_workers()) {
113 auto poll_results = subscriber_ping_poller.
poll(0);
114 for (std::size_t ix = 0; ix < poll_results.size(); ++ix) {
115 auto request =
zmqSvc().
receive<std::string>(*subscriber_ping_socket, zmq::recv_flags::dontwait);
116 assert(request ==
"present");
117 ++N_subscribers_confirmed;
122 }
else if (process_manager.
is_queue()) {
123 auto addrBase = makeAddrPrefix(getppid());
133 for (std::size_t ix = 0; ix < process_manager.
N_workers(); ++ix) {
136 bindAddr(
qw_push_[ix], addrBase +
"_from_queue_to_worker_" + std::to_string(ix));
142 bindAddr(
qw_pull_[ix], addrBase +
"_from_worker_" + std::to_string(ix) +
"_to_queue");
149 mq_push_->set(zmq::sockopt::sndhwm, hwm);
150 mq_push_->connect(addrBase +
"_from_queue_to_master");
155 mq_pull_->set(zmq::sockopt::rcvhwm, hwm);
156 mq_pull_->connect(addrBase +
"_from_master_to_queue");
162 }
else if (process_manager.
is_worker()) {
163 auto addrBase = makeAddrPrefix(getppid());
171 auto addr = addrBase +
"_from_worker_" + std::to_string(process_manager.
worker_id()) +
"_to_queue";
178 addr = addrBase +
"_from_queue_to_worker_" + std::to_string(process_manager.
worker_id());
184 mw_sub_->set(zmq::sockopt::rcvhwm, hwm);
185 mw_sub_->set(zmq::sockopt::subscribe,
"");
186 mw_sub_->connect(addrBase +
"_from_master_to_workers");
190 wm_push_->set(zmq::sockopt::sndhwm, hwm);
191 wm_push_->connect(addrBase +
"_from_workers_to_master");
195 subscriber_ping_socket->connect(addrBase +
"_subscriber_ping_socket");
197 zmqSvc().
send(*subscriber_ping_socket,
"present");
199 while (!all_connected) {
206 throw std::runtime_error(
"Messenger ctor: I'm neither master, nor queue, nor a worker");
208 }
catch (zmq::error_t &
e) {
209 std::cerr <<
e.what() <<
" -- errnum: " <<
e.num() << std::endl;
225 remove(address.substr(6).c_str());
227 }
catch (
const std::exception &
e) {
228 std::cerr <<
"WARNING: something in Messenger dtor threw an exception! Original exception message:\n"
229 <<
e.what() << std::endl;
281 }
catch (zmq::error_t &
e) {
282 if (
e.num() == EAGAIN) {
283 throw std::runtime_error(
"Messenger::test_connections: SEND over master-queue connection timed out!");
294 std::size_t max_tries = 3;
295 std::size_t tries = 0;
296 bool carry_on =
true;
297 while (carry_on && (tries++ < max_tries)) {
301 handshake = receive_from_master_on_queue<X2X>();
305 handshake = receive_from_queue_on_master<X2X>();
309 handshake = receive_from_queue_on_worker<X2X>();
313 handshake = receive_from_worker_on_queue<X2X>(worker_id);
321 throw std::runtime_error(
"EINTR in test_receive and SIGTERM received, aborting\n");
323 printf(
"EINTR in test_receive but no SIGTERM received, try %zu\n", tries);
326 printf(
"EAGAIN in test_receive, try %zu\n", tries);
329 }
catch (zmq::error_t &
e) {
330 if (
e.num() == EAGAIN) {
331 throw std::runtime_error(
"Messenger::test_connections: RECEIVE over master-queue connection timed out!");
333 printf(
"unhandled zmq::error_t (not a ppoll_error_t) in Messenger::test_receive with errno %d: %s\n",
340 if (handshake != expected_ping_value) {
341 throw std::runtime_error(
342 "Messenger::test_connections: RECEIVE over master-queue connection failed, did not receive expected value!");
359 sigemptyset(&sigmask);
360 sigaddset(&sigmask, SIGTERM);
363 throw std::runtime_error(
"sigprocmask failed in test_connections");
374 }
else if (process_manager.
is_queue()) {
376 std::size_t mq_index;
379 for (std::size_t ix = 0; ix < process_manager.
N_workers(); ++ix) {
386 std::vector<std::pair<size_t, zmq::event_flags>> poll_result;
393 for (
auto readable_socket : poll_result) {
395 if (readable_socket.first == mq_index) {
402 auto this_worker_id = readable_socket.first - 1;
414 }
else if (process_manager.
is_worker()) {
421 throw std::runtime_error(
"Messenger::test_connections: I'm neither master, nor queue, nor a worker");
438 return {std::move(poller), mq_index};
447 return {std::move(poller), mw_sub_index};
469#define PROCESS_VAL(p) \
470 case (p): s = #p; break;
477 default: s = std::to_string(
static_cast<int>(
value));
487 default: s = std::to_string(
static_cast<int>(
value));
498 default: s = std::to_string(
static_cast<int>(
value));
509 default: s = std::to_string(
static_cast<int>(
value));
TBuffer & operator<<(TBuffer &buf, const Tmpl *obj)
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void value
R__EXTERN TSystem * gSystem
std::unique_ptr< zmq::socket_t, ZmqLingeringSocketPtrDeleter< PERIOD > > ZmqLingeringSocketPtr
ZeroMQSvc & zmqSvc()
Get singleton object of this class.
void test_receive(X2X expected_ping_value, test_rcv_pipes rcv_pipe, std::size_t worker_id)
bool close_QW_container_on_destruct_
void test_connections(const ProcessManager &process_manager)
Test whether push-pull sockets are working.
std::vector< ZmqLingeringSocketPtr<> > qw_push_
std::vector< std::string > bound_ipc_addresses_
void set_send_flag(zmq::send_flags flag)
Set the flag used in all send functions; 0, ZMQ_DONTWAIT, ZMQ_SNDMORE or bitwise combination.
ZeroMQPoller mq_push_poller_
ZmqLingeringSocketPtr wm_push_
std::vector< ZeroMQPoller > qw_pull_poller_
zmq::send_flags send_flag_
void send_from_queue_to_master()
Messenger(const ProcessManager &process_manager)
std::vector< ZmqLingeringSocketPtr<> > qw_pull_
ZmqLingeringSocketPtr mw_pub_
void bindAddr(T &socket, std::string &&addr)
ZmqLingeringSocketPtr mq_pull_
bool close_this_QW_on_destruct_
ZmqLingeringSocketPtr wm_pull_
std::vector< ZeroMQPoller > qw_push_poller_
void send_from_master_to_queue()
ZeroMQPoller mw_sub_poller_
ZmqLingeringSocketPtr this_worker_qw_push_
void debug_print(std::string s)
Function called from send and receive template functions in debug builds used to monitor the messages...
ZeroMQPoller mq_pull_poller_
bool close_MQ_on_destruct_
ZmqLingeringSocketPtr mw_sub_
void send_from_queue_to_worker(std::size_t this_worker_id)
ZmqLingeringSocketPtr this_worker_qw_pull_
ZmqLingeringSocketPtr mq_push_
void test_send(X2X ping_value, test_snd_pipes snd_pipe, std::size_t worker_id)
std::pair< ZeroMQPoller, std::size_t > create_worker_poller()
Helper function that creates a poller for worker_loop()
std::pair< ZeroMQPoller, std::size_t > create_queue_poller()
Helper function that creates a poller for Queue::loop()
void send_from_worker_to_queue()
ZeroMQPoller wm_pull_poller_
Fork processes for queue and workers.
std::size_t N_workers() const
static bool sigterm_received()
std::size_t worker_id() const
virtual const char * TempDirectory() const
Return a user configured or systemwide directory to create temporary files in.
Wrapper class for polling ZeroMQ sockets.
size_t register_socket(zmq::socket_t &socket, zmq::event_flags type)
Register socket to poll.
size_t unregister_socket(zmq::socket_t &socket)
Unregister socket from poller.
std::vector< std::pair< size_t, zmq::event_flags > > poll(int timeo=-1)
Poll the sockets.
zmq::send_result_t send(zmq::socket_t &socket, const T &item, zmq::send_flags flags=zmq::send_flags::none) const
Send message with ZMQ.
zmq::socket_t * socket_ptr(zmq::socket_type type) const
Create and return a new socket by pointer.
T receive(zmq::socket_t &socket, zmq::recv_flags flags=zmq::recv_flags::none, bool *more=nullptr) const
receive message with ZMQ, general version
void set_socket_immediate(ZmqLingeringSocketPtr<> &socket)
std::tuple< std::vector< std::pair< size_t, zmq::event_flags > >, bool > careful_ppoll(ZeroMQPoller &poller, const sigset_t &ppoll_sigmask, std::size_t max_tries=2)
zmq_ppoll_error_response handle_zmq_ppoll_error(ZMQ::ppoll_error_t &e)
The namespace RooFit contains mostly switches that change the behaviour of functions of PDFs (or othe...