Manages ZeroMQ sockets and wraps send and receive calls.
This class is used for all interprocess communication between the master, queue and worker processes. It sets up ZeroMQ sockets between all processes over IPC socket files stored in /tmp on the filesystem.
Several sockets are used for communication between different places for different purposes:
process_manager | ProcessManager instance which manages the master, queue and worker processes that we want to set up communication for in this Messenger. |
Definition at line 33 of file Messenger_decl.h.
Public Types | |
enum class | test_rcv_pipes { fromQonM , fromMonQ , fromWonQ , fromQonW } |
enum class | test_snd_pipes { M2Q , Q2M , Q2W , W2Q } |
Public Member Functions | |
Messenger (const ProcessManager &process_manager) | |
~Messenger () | |
std::pair< ZeroMQPoller, std::size_t > | create_queue_poller () |
Helper function that creates a poller for Queue::loop() | |
std::pair< ZeroMQPoller, std::size_t > | create_worker_poller () |
Helper function that creates a poller for worker_loop() | |
template<typename T > | |
void | publish_from_master_to_workers (T &&item) |
specialization that sends the final message | |
template<typename T , typename T2 , typename... Ts> | |
void | publish_from_master_to_workers (T &&item, T2 &&item2, Ts &&...items) |
specialization that queues first parts of multipart messages | |
template<typename value_t > | |
value_t | receive_from_master_on_queue () |
template<typename value_t > | |
value_t | receive_from_master_on_worker (bool *more=nullptr) |
template<typename value_t > | |
value_t | receive_from_queue_on_master () |
template<typename value_t > | |
value_t | receive_from_queue_on_worker () |
template<typename value_t > | |
value_t | receive_from_worker_on_master (bool *more=nullptr) |
template<typename value_t > | |
value_t | receive_from_worker_on_queue (std::size_t this_worker_id) |
void | send_from_master_to_queue () |
template<typename T , typename... Ts> | |
void | send_from_master_to_queue (T item, Ts... items) |
void | send_from_queue_to_master () |
template<typename T , typename... Ts> | |
void | send_from_queue_to_master (T item, Ts... items) |
void | send_from_queue_to_worker (std::size_t this_worker_id) |
template<typename T , typename... Ts> | |
void | send_from_queue_to_worker (std::size_t this_worker_id, T item, Ts... items) |
template<typename T > | |
void | send_from_worker_to_master (T &&item) |
specialization that sends the final message | |
template<typename T , typename T2 , typename... Ts> | |
void | send_from_worker_to_master (T &&item, T2 &&item2, Ts &&...items) |
specialization that queues first parts of multipart messages | |
void | send_from_worker_to_queue () |
template<typename T , typename... Ts> | |
void | send_from_worker_to_queue (T item, Ts... items) |
void | set_send_flag (zmq::send_flags flag) |
Set the flag used in all send functions; 0, ZMQ_DONTWAIT, ZMQ_SNDMORE or bitwise combination. | |
void | test_connections (const ProcessManager &process_manager) |
Test whether push-pull sockets are working. | |
void | test_receive (X2X expected_ping_value, test_rcv_pipes rcv_pipe, std::size_t worker_id) |
void | test_send (X2X ping_value, test_snd_pipes snd_pipe, std::size_t worker_id) |
Public Attributes | |
sigset_t | ppoll_sigmask |
Private Member Functions | |
template<class T > | |
void | bindAddr (T &socket, std::string &&addr) |
void | debug_print (std::string s) |
Function called from send and receive template functions in debug builds used to monitor the messages that are going to be sent or are received. | |
Private Attributes | |
std::vector< std::string > | bound_ipc_addresses_ |
bool | close_MQ_on_destruct_ = false |
bool | close_QW_container_on_destruct_ = false |
bool | close_this_QW_on_destruct_ = false |
ZmqLingeringSocketPtr | mq_pull_ |
ZeroMQPoller | mq_pull_poller_ |
ZmqLingeringSocketPtr | mq_push_ |
ZeroMQPoller | mq_push_poller_ |
ZmqLingeringSocketPtr | mw_pub_ |
ZmqLingeringSocketPtr | mw_sub_ |
ZeroMQPoller | mw_sub_poller_ |
std::vector< ZmqLingeringSocketPtr<> > | qw_pull_ |
std::vector< ZeroMQPoller > | qw_pull_poller_ |
std::vector< ZmqLingeringSocketPtr<> > | qw_push_ |
std::vector< ZeroMQPoller > | qw_push_poller_ |
zmq::send_flags | send_flag_ = zmq::send_flags::none |
ZmqLingeringSocketPtr | this_worker_qw_pull_ |
ZmqLingeringSocketPtr | this_worker_qw_push_ |
ZmqLingeringSocketPtr | wm_pull_ |
ZeroMQPoller | wm_pull_poller_ |
ZmqLingeringSocketPtr | wm_push_ |
|
strong |
Enumerator | |
---|---|
fromQonM | |
fromMonQ | |
fromWonQ | |
fromQonW |
Definition at line 47 of file Messenger_decl.h.
|
strong |
Enumerator | |
---|---|
M2Q | |
Q2M | |
Q2W | |
W2Q |
Definition at line 40 of file Messenger_decl.h.
|
explicit |
Definition at line 63 of file Messenger.cxx.
RooFit::MultiProcess::Messenger::~Messenger | ( | ) |
Definition at line 214 of file Messenger.cxx.
|
inlineprivate |
Definition at line 112 of file Messenger_decl.h.
std::pair< ZeroMQPoller, std::size_t > RooFit::MultiProcess::Messenger::create_queue_poller | ( | ) |
Helper function that creates a poller for Queue::loop()
Definition at line 431 of file Messenger.cxx.
std::pair< ZeroMQPoller, std::size_t > RooFit::MultiProcess::Messenger::create_worker_poller | ( | ) |
Helper function that creates a poller for worker_loop()
Definition at line 442 of file Messenger.cxx.
|
private |
Function called from send and receive template functions in debug builds used to monitor the messages that are going to be sent or are received.
By defining this in the implementation file, compilation is a lot faster during debugging of Messenger or communication protocols.
Definition at line 520 of file Messenger.cxx.
void RooFit::MultiProcess::Messenger::publish_from_master_to_workers | ( | T && | item | ) |
specialization that sends the final message
Definition at line 150 of file Messenger.h.
void RooFit::MultiProcess::Messenger::publish_from_master_to_workers | ( | T && | item, |
T2 && | item2, | ||
Ts &&... | items | ||
) |
specialization that queues first parts of multipart messages
Definition at line 163 of file Messenger.h.
value_t RooFit::MultiProcess::Messenger::receive_from_master_on_queue |
Definition at line 132 of file Messenger.h.
value_t RooFit::MultiProcess::Messenger::receive_from_master_on_worker | ( | bool * | more = nullptr | ) |
Definition at line 176 of file Messenger.h.
value_t RooFit::MultiProcess::Messenger::receive_from_queue_on_master |
Definition at line 103 of file Messenger.h.
value_t RooFit::MultiProcess::Messenger::receive_from_queue_on_worker |
Definition at line 72 of file Messenger.h.
value_t RooFit::MultiProcess::Messenger::receive_from_worker_on_master | ( | bool * | more = nullptr | ) |
Definition at line 219 of file Messenger.h.
value_t RooFit::MultiProcess::Messenger::receive_from_worker_on_queue | ( | std::size_t | this_worker_id | ) |
Definition at line 43 of file Messenger.h.
void RooFit::MultiProcess::Messenger::send_from_master_to_queue | ( | ) |
Definition at line 460 of file Messenger.cxx.
void RooFit::MultiProcess::Messenger::send_from_master_to_queue | ( | T | item, |
Ts... | items | ||
) |
Definition at line 118 of file Messenger.h.
void RooFit::MultiProcess::Messenger::send_from_queue_to_master | ( | ) |
Definition at line 458 of file Messenger.cxx.
void RooFit::MultiProcess::Messenger::send_from_queue_to_master | ( | T | item, |
Ts... | items | ||
) |
Definition at line 89 of file Messenger.h.
void RooFit::MultiProcess::Messenger::send_from_queue_to_worker | ( | std::size_t | this_worker_id | ) |
Definition at line 454 of file Messenger.cxx.
void RooFit::MultiProcess::Messenger::send_from_queue_to_worker | ( | std::size_t | this_worker_id, |
T | item, | ||
Ts... | items | ||
) |
Definition at line 58 of file Messenger.h.
void RooFit::MultiProcess::Messenger::send_from_worker_to_master | ( | T && | item | ) |
specialization that sends the final message
Definition at line 192 of file Messenger.h.
void RooFit::MultiProcess::Messenger::send_from_worker_to_master | ( | T && | item, |
T2 && | item2, | ||
Ts &&... | items | ||
) |
specialization that queues first parts of multipart messages
Definition at line 205 of file Messenger.h.
void RooFit::MultiProcess::Messenger::send_from_worker_to_queue | ( | ) |
Definition at line 452 of file Messenger.cxx.
void RooFit::MultiProcess::Messenger::send_from_worker_to_queue | ( | T | item, |
Ts... | items | ||
) |
Definition at line 29 of file Messenger.h.
void RooFit::MultiProcess::Messenger::set_send_flag | ( | zmq::send_flags | flag | ) |
Set the flag used in all send functions; 0, ZMQ_DONTWAIT, ZMQ_SNDMORE or bitwise combination.
Definition at line 463 of file Messenger.cxx.
void RooFit::MultiProcess::Messenger::test_connections | ( | const ProcessManager & | process_manager | ) |
Test whether push-pull sockets are working.
process_manager | ProcessManager object used to instantiate this object. Used to identify which process we are running on and hence which sockets need to be tested. |
Definition at line 353 of file Messenger.cxx.
void RooFit::MultiProcess::Messenger::test_receive | ( | X2X | expected_ping_value, |
test_rcv_pipes | rcv_pipe, | ||
std::size_t | worker_id | ||
) |
Definition at line 290 of file Messenger.cxx.
void RooFit::MultiProcess::Messenger::test_send | ( | X2X | ping_value, |
test_snd_pipes | snd_pipe, | ||
std::size_t | worker_id | ||
) |
Definition at line 260 of file Messenger.cxx.
|
private |
Definition at line 149 of file Messenger_decl.h.
|
private |
Definition at line 143 of file Messenger_decl.h.
|
private |
Definition at line 145 of file Messenger_decl.h.
|
private |
Definition at line 144 of file Messenger_decl.h.
|
private |
Definition at line 128 of file Messenger_decl.h.
|
private |
Definition at line 131 of file Messenger_decl.h.
|
private |
Definition at line 121 of file Messenger_decl.h.
|
private |
Definition at line 124 of file Messenger_decl.h.
|
private |
Definition at line 134 of file Messenger_decl.h.
|
private |
Definition at line 135 of file Messenger_decl.h.
|
private |
Definition at line 136 of file Messenger_decl.h.
sigset_t RooFit::MultiProcess::Messenger::ppoll_sigmask |
Definition at line 104 of file Messenger_decl.h.
|
private |
Definition at line 126 of file Messenger_decl.h.
|
private |
Definition at line 130 of file Messenger_decl.h.
|
private |
Definition at line 119 of file Messenger_decl.h.
|
private |
Definition at line 123 of file Messenger_decl.h.
|
private |
Definition at line 147 of file Messenger_decl.h.
|
private |
Definition at line 127 of file Messenger_decl.h.
|
private |
Definition at line 120 of file Messenger_decl.h.
|
private |
Definition at line 139 of file Messenger_decl.h.
|
private |
Definition at line 140 of file Messenger_decl.h.
|
private |
Definition at line 138 of file Messenger_decl.h.