22namespace MultiProcess {
69 return "ipc://" +
tmpPath +
"/roofit_" + std::to_string(pid) +
"_roofitMP";
114 for (std::size_t ix = 0; ix <
poll_results.size(); ++ix) {
116 assert(request ==
"present");
122 }
else if (process_manager.
is_queue()) {
133 for (std::size_t ix = 0; ix < process_manager.
N_workers(); ++ix) {
162 }
else if (process_manager.
is_worker()) {
185 mw_sub_->set(zmq::sockopt::subscribe,
"");
206 throw std::runtime_error(
"Messenger ctor: I'm neither master, nor queue, nor a worker");
208 }
catch (zmq::error_t &
e) {
209 std::cerr <<
e.what() <<
" -- errnum: " <<
e.num() << std::endl;
225 remove(address.substr(6).c_str());
227 }
catch (
const std::exception &
e) {
228 std::cerr <<
"WARNING: something in Messenger dtor threw an exception! Original exception message:\n"
229 <<
e.what() << std::endl;
281 }
catch (zmq::error_t &
e) {
283 throw std::runtime_error(
"Messenger::test_connections: SEND over master-queue connection timed out!");
295 std::size_t
tries = 0;
321 throw std::runtime_error(
"EINTR in test_receive and SIGTERM received, aborting\n");
323 printf(
"EINTR in test_receive but no SIGTERM received, try %zu\n",
tries);
326 printf(
"EAGAIN in test_receive, try %zu\n",
tries);
329 }
catch (zmq::error_t &
e) {
331 throw std::runtime_error(
"Messenger::test_connections: RECEIVE over master-queue connection timed out!");
333 printf(
"unhandled zmq::error_t (not a ppoll_error_t) in Messenger::test_receive with errno %d: %s\n",
341 throw std::runtime_error(
342 "Messenger::test_connections: RECEIVE over master-queue connection failed, did not receive expected value!");
363 throw std::runtime_error(
"sigprocmask failed in test_connections");
374 }
else if (process_manager.
is_queue()) {
379 for (std::size_t ix = 0; ix < process_manager.
N_workers(); ++ix) {
386 std::vector<std::pair<size_t, zmq::event_flags>>
poll_result;
414 }
else if (process_manager.
is_worker()) {
421 throw std::runtime_error(
"Messenger::test_connections: I'm neither master, nor queue, nor a worker");
436 poller.register_socket(*s, zmq::event_flags::pollin);
469#define PROCESS_VAL(p) \
470 case (p): s = #p; break;
477 default: s = std::to_string(
static_cast<int>(
value));
487 default: s = std::to_string(
static_cast<int>(
value));
498 default: s = std::to_string(
static_cast<int>(
value));
509 default: s = std::to_string(
static_cast<int>(
value));
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void value
R__EXTERN TSystem * gSystem
ZeroMQSvc & zmqSvc()
Get singleton object of this class.
void test_receive(X2X expected_ping_value, test_rcv_pipes rcv_pipe, std::size_t worker_id)
bool close_QW_container_on_destruct_
void test_connections(const ProcessManager &process_manager)
Test whether push-pull sockets are working.
std::vector< ZmqLingeringSocketPtr<> > qw_push_
std::vector< std::string > bound_ipc_addresses_
void set_send_flag(zmq::send_flags flag)
Set the flag used in all send functions; 0, ZMQ_DONTWAIT, ZMQ_SNDMORE or bitwise combination.
ZeroMQPoller mq_push_poller_
ZmqLingeringSocketPtr wm_push_
std::vector< ZeroMQPoller > qw_pull_poller_
zmq::send_flags send_flag_
void send_from_queue_to_master()
Messenger(const ProcessManager &process_manager)
std::vector< ZmqLingeringSocketPtr<> > qw_pull_
ZmqLingeringSocketPtr mw_pub_
void bindAddr(T &socket, std::string &&addr)
ZmqLingeringSocketPtr mq_pull_
bool close_this_QW_on_destruct_
ZmqLingeringSocketPtr wm_pull_
std::vector< ZeroMQPoller > qw_push_poller_
void send_from_master_to_queue()
ZeroMQPoller mw_sub_poller_
ZmqLingeringSocketPtr this_worker_qw_push_
void debug_print(std::string s)
Function called from send and receive template functions in debug builds used to monitor the messages...
ZeroMQPoller mq_pull_poller_
bool close_MQ_on_destruct_
ZmqLingeringSocketPtr mw_sub_
void send_from_queue_to_worker(std::size_t this_worker_id)
ZmqLingeringSocketPtr this_worker_qw_pull_
ZmqLingeringSocketPtr mq_push_
void test_send(X2X ping_value, test_snd_pipes snd_pipe, std::size_t worker_id)
std::pair< ZeroMQPoller, std::size_t > create_worker_poller()
Helper function that creates a poller for worker_loop()
std::pair< ZeroMQPoller, std::size_t > create_queue_poller()
Helper function that creates a poller for Queue::loop()
void send_from_worker_to_queue()
ZeroMQPoller wm_pull_poller_
Fork processes for queue and workers.
std::size_t N_workers() const
static bool sigterm_received()
std::size_t worker_id() const
virtual const char * TempDirectory() const
Return a user configured or systemwide directory to create temporary files in.
Wrapper class for polling ZeroMQ sockets.
size_t register_socket(zmq::socket_t &socket, zmq::event_flags type)
Register socket to poll.
zmq::send_result_t send(zmq::socket_t &socket, const T &item, zmq::send_flags flags=zmq::send_flags::none) const
Send message with ZMQ.
zmq::socket_t * socket_ptr(zmq::socket_type type) const
Create and return a new socket by pointer.
T receive(zmq::socket_t &socket, zmq::recv_flags flags=zmq::recv_flags::none, bool *more=nullptr) const
receive message with ZMQ, general version
void set_socket_immediate(ZmqLingeringSocketPtr<> &socket)
std::tuple< std::vector< std::pair< size_t, zmq::event_flags > >, bool > careful_ppoll(ZeroMQPoller &poller, const sigset_t &ppoll_sigmask, std::size_t max_tries=2)
zmq_ppoll_error_response handle_zmq_ppoll_error(ZMQ::ppoll_error_t &e)
std::ostream & operator<<(std::ostream &out, const M2Q value)
The namespace RooFit contains mostly switches that change the behaviour of functions of PDFs (or othe...