Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RooFit::MultiProcess::Messenger Class Reference

Manages ZeroMQ sockets and wraps send and receive calls.

This class is used for all interprocess communication between the master, queue and worker processes. It sets up ZeroMQ sockets between all processes over IPC socket files stored in /tmp on the filesystem.

Several sockets are used for communication between different places for different purposes:

  • Master and queue processes each have a PUSH-PULL socket pair to directly send/receive data between only the master and queue processes. This is currently used mainly for sending tasks to the queue from master. The socket from queue back to master is used only to test connections and may be removed in the future.
  • The queue process also has a PUSH-PULL socket pair with each worker process. These are used by the workers to obtain tasks from the queue.
  • The master has a PUB socket that the workers subscribe to with SUB sockets. These are used to update state. Note that to ensure robust reception of all messages on the SUB socket, it's important to send over state in as little messages as possible. For instance, it's best to send arrays over in a single big message instead of sending over each element separately. This also improves performance, since each message has some fixed overhead.
  • Each worker has a PUSH socket connected to a PULL socket on master that is used to send back task results from workers to master in 'JobManager::retrieve()'.
Parameters
process_managerProcessManager instance which manages the master, queue and worker processes that we want to set up communication for in this Messenger.

Definition at line 33 of file Messenger_decl.h.

Public Types

enum class  test_rcv_pipes { fromQonM , fromMonQ , fromWonQ , fromQonW }
 
enum class  test_snd_pipes { M2Q , Q2M , Q2W , W2Q }
 

Public Member Functions

 Messenger (const ProcessManager &process_manager)
 
 ~Messenger ()
 
std::pair< ZeroMQPoller, std::size_t > create_queue_poller ()
 Helper function that creates a poller for Queue::loop()
 
std::pair< ZeroMQPoller, std::size_t > create_worker_poller ()
 Helper function that creates a poller for worker_loop()
 
template<typename T >
void publish_from_master_to_workers (T &&item)
 specialization that sends the final message
 
template<typename T , typename T2 , typename... Ts>
void publish_from_master_to_workers (T &&item, T2 &&item2, Ts &&...items)
 specialization that queues first parts of multipart messages
 
template<typename value_t >
value_t receive_from_master_on_queue ()
 
template<typename value_t >
value_t receive_from_master_on_worker (bool *more=nullptr)
 
template<typename value_t >
value_t receive_from_queue_on_master ()
 
template<typename value_t >
value_t receive_from_queue_on_worker ()
 
template<typename value_t >
value_t receive_from_worker_on_master (bool *more=nullptr)
 
template<typename value_t >
value_t receive_from_worker_on_queue (std::size_t this_worker_id)
 
void send_from_master_to_queue ()
 
template<typename T , typename... Ts>
void send_from_master_to_queue (T item, Ts... items)
 
void send_from_queue_to_master ()
 
template<typename T , typename... Ts>
void send_from_queue_to_master (T item, Ts... items)
 
void send_from_queue_to_worker (std::size_t this_worker_id)
 
template<typename T , typename... Ts>
void send_from_queue_to_worker (std::size_t this_worker_id, T item, Ts... items)
 
template<typename T >
void send_from_worker_to_master (T &&item)
 specialization that sends the final message
 
template<typename T , typename T2 , typename... Ts>
void send_from_worker_to_master (T &&item, T2 &&item2, Ts &&...items)
 specialization that queues first parts of multipart messages
 
void send_from_worker_to_queue ()
 
template<typename T , typename... Ts>
void send_from_worker_to_queue (T item, Ts... items)
 
void set_send_flag (zmq::send_flags flag)
 Set the flag used in all send functions; 0, ZMQ_DONTWAIT, ZMQ_SNDMORE or bitwise combination.
 
void test_connections (const ProcessManager &process_manager)
 Test whether push-pull sockets are working.
 
void test_receive (X2X expected_ping_value, test_rcv_pipes rcv_pipe, std::size_t worker_id)
 
void test_send (X2X ping_value, test_snd_pipes snd_pipe, std::size_t worker_id)
 

Public Attributes

sigset_t ppoll_sigmask
 

Private Member Functions

template<class T >
void bindAddr (T &socket, std::string &&addr)
 
void debug_print (std::string s)
 Function called from send and receive template functions in debug builds used to monitor the messages that are going to be sent or are received.
 

Private Attributes

std::vector< std::string > bound_ipc_addresses_
 
bool close_MQ_on_destruct_ = false
 
bool close_QW_container_on_destruct_ = false
 
bool close_this_QW_on_destruct_ = false
 
ZmqLingeringSocketPtr mq_pull_
 
ZeroMQPoller mq_pull_poller_
 
ZmqLingeringSocketPtr mq_push_
 
ZeroMQPoller mq_push_poller_
 
ZmqLingeringSocketPtr mw_pub_
 
ZmqLingeringSocketPtr mw_sub_
 
ZeroMQPoller mw_sub_poller_
 
std::vector< ZmqLingeringSocketPtr<> > qw_pull_
 
std::vector< ZeroMQPollerqw_pull_poller_
 
std::vector< ZmqLingeringSocketPtr<> > qw_push_
 
std::vector< ZeroMQPollerqw_push_poller_
 
zmq::send_flags send_flag_ = zmq::send_flags::none
 
ZmqLingeringSocketPtr this_worker_qw_pull_
 
ZmqLingeringSocketPtr this_worker_qw_push_
 
ZmqLingeringSocketPtr wm_pull_
 
ZeroMQPoller wm_pull_poller_
 
ZmqLingeringSocketPtr wm_push_
 

#include </home/sftnight/build/workspace/root-makedoc-master/rootspi/rdoc/src/master/roofit/multiprocess/res/RooFit/MultiProcess/Messenger_decl.h>

Member Enumeration Documentation

◆ test_rcv_pipes

Enumerator
fromQonM 
fromMonQ 
fromWonQ 
fromQonW 

Definition at line 47 of file Messenger_decl.h.

◆ test_snd_pipes

Enumerator
M2Q 
Q2M 
Q2W 
W2Q 

Definition at line 40 of file Messenger_decl.h.

Constructor & Destructor Documentation

◆ Messenger()

RooFit::MultiProcess::Messenger::Messenger ( const ProcessManager process_manager)
explicit

Definition at line 63 of file Messenger.cxx.

◆ ~Messenger()

RooFit::MultiProcess::Messenger::~Messenger ( )

Definition at line 214 of file Messenger.cxx.

Member Function Documentation

◆ bindAddr()

template<class T >
void RooFit::MultiProcess::Messenger::bindAddr ( T &  socket,
std::string &&  addr 
)
inlineprivate

Definition at line 112 of file Messenger_decl.h.

◆ create_queue_poller()

std::pair< ZeroMQPoller, std::size_t > RooFit::MultiProcess::Messenger::create_queue_poller ( )

Helper function that creates a poller for Queue::loop()

Definition at line 431 of file Messenger.cxx.

◆ create_worker_poller()

std::pair< ZeroMQPoller, std::size_t > RooFit::MultiProcess::Messenger::create_worker_poller ( )

Helper function that creates a poller for worker_loop()

Definition at line 442 of file Messenger.cxx.

◆ debug_print()

void RooFit::MultiProcess::Messenger::debug_print ( std::string  s)
private

Function called from send and receive template functions in debug builds used to monitor the messages that are going to be sent or are received.

By defining this in the implementation file, compilation is a lot faster during debugging of Messenger or communication protocols.

Definition at line 520 of file Messenger.cxx.

◆ publish_from_master_to_workers() [1/2]

template<typename T >
void RooFit::MultiProcess::Messenger::publish_from_master_to_workers ( T &&  item)

specialization that sends the final message

Definition at line 150 of file Messenger.h.

◆ publish_from_master_to_workers() [2/2]

template<typename T , typename T2 , typename... Ts>
void RooFit::MultiProcess::Messenger::publish_from_master_to_workers ( T &&  item,
T2 &&  item2,
Ts &&...  items 
)

specialization that queues first parts of multipart messages

Definition at line 163 of file Messenger.h.

◆ receive_from_master_on_queue()

template<typename value_t >
value_t RooFit::MultiProcess::Messenger::receive_from_master_on_queue

Definition at line 132 of file Messenger.h.

◆ receive_from_master_on_worker()

template<typename value_t >
value_t RooFit::MultiProcess::Messenger::receive_from_master_on_worker ( bool more = nullptr)

Definition at line 176 of file Messenger.h.

◆ receive_from_queue_on_master()

template<typename value_t >
value_t RooFit::MultiProcess::Messenger::receive_from_queue_on_master

Definition at line 103 of file Messenger.h.

◆ receive_from_queue_on_worker()

template<typename value_t >
value_t RooFit::MultiProcess::Messenger::receive_from_queue_on_worker

Definition at line 72 of file Messenger.h.

◆ receive_from_worker_on_master()

template<typename value_t >
value_t RooFit::MultiProcess::Messenger::receive_from_worker_on_master ( bool more = nullptr)

Definition at line 219 of file Messenger.h.

◆ receive_from_worker_on_queue()

template<typename value_t >
value_t RooFit::MultiProcess::Messenger::receive_from_worker_on_queue ( std::size_t  this_worker_id)

Definition at line 43 of file Messenger.h.

◆ send_from_master_to_queue() [1/2]

void RooFit::MultiProcess::Messenger::send_from_master_to_queue ( )

Definition at line 460 of file Messenger.cxx.

◆ send_from_master_to_queue() [2/2]

template<typename T , typename... Ts>
void RooFit::MultiProcess::Messenger::send_from_master_to_queue ( item,
Ts...  items 
)

Definition at line 118 of file Messenger.h.

◆ send_from_queue_to_master() [1/2]

void RooFit::MultiProcess::Messenger::send_from_queue_to_master ( )

Definition at line 458 of file Messenger.cxx.

◆ send_from_queue_to_master() [2/2]

template<typename T , typename... Ts>
void RooFit::MultiProcess::Messenger::send_from_queue_to_master ( item,
Ts...  items 
)

Definition at line 89 of file Messenger.h.

◆ send_from_queue_to_worker() [1/2]

void RooFit::MultiProcess::Messenger::send_from_queue_to_worker ( std::size_t  this_worker_id)

Definition at line 454 of file Messenger.cxx.

◆ send_from_queue_to_worker() [2/2]

template<typename T , typename... Ts>
void RooFit::MultiProcess::Messenger::send_from_queue_to_worker ( std::size_t  this_worker_id,
item,
Ts...  items 
)

Definition at line 58 of file Messenger.h.

◆ send_from_worker_to_master() [1/2]

template<typename T >
void RooFit::MultiProcess::Messenger::send_from_worker_to_master ( T &&  item)

specialization that sends the final message

Definition at line 192 of file Messenger.h.

◆ send_from_worker_to_master() [2/2]

template<typename T , typename T2 , typename... Ts>
void RooFit::MultiProcess::Messenger::send_from_worker_to_master ( T &&  item,
T2 &&  item2,
Ts &&...  items 
)

specialization that queues first parts of multipart messages

Definition at line 205 of file Messenger.h.

◆ send_from_worker_to_queue() [1/2]

void RooFit::MultiProcess::Messenger::send_from_worker_to_queue ( )

Definition at line 452 of file Messenger.cxx.

◆ send_from_worker_to_queue() [2/2]

template<typename T , typename... Ts>
void RooFit::MultiProcess::Messenger::send_from_worker_to_queue ( item,
Ts...  items 
)

Definition at line 29 of file Messenger.h.

◆ set_send_flag()

void RooFit::MultiProcess::Messenger::set_send_flag ( zmq::send_flags  flag)

Set the flag used in all send functions; 0, ZMQ_DONTWAIT, ZMQ_SNDMORE or bitwise combination.

Definition at line 463 of file Messenger.cxx.

◆ test_connections()

void RooFit::MultiProcess::Messenger::test_connections ( const ProcessManager process_manager)

Test whether push-pull sockets are working.

Note
This function tests the PUSH-PULL socket pairs only. The PUB-SUB sockets are already tested in the constructor.
Parameters
process_managerProcessManager object used to instantiate this object. Used to identify which process we are running on and hence which sockets need to be tested.

Definition at line 353 of file Messenger.cxx.

◆ test_receive()

void RooFit::MultiProcess::Messenger::test_receive ( X2X  expected_ping_value,
test_rcv_pipes  rcv_pipe,
std::size_t  worker_id 
)

Definition at line 290 of file Messenger.cxx.

◆ test_send()

void RooFit::MultiProcess::Messenger::test_send ( X2X  ping_value,
test_snd_pipes  snd_pipe,
std::size_t  worker_id 
)

Definition at line 260 of file Messenger.cxx.

Member Data Documentation

◆ bound_ipc_addresses_

std::vector<std::string> RooFit::MultiProcess::Messenger::bound_ipc_addresses_
private

Definition at line 149 of file Messenger_decl.h.

◆ close_MQ_on_destruct_

bool RooFit::MultiProcess::Messenger::close_MQ_on_destruct_ = false
private

Definition at line 143 of file Messenger_decl.h.

◆ close_QW_container_on_destruct_

bool RooFit::MultiProcess::Messenger::close_QW_container_on_destruct_ = false
private

Definition at line 145 of file Messenger_decl.h.

◆ close_this_QW_on_destruct_

bool RooFit::MultiProcess::Messenger::close_this_QW_on_destruct_ = false
private

Definition at line 144 of file Messenger_decl.h.

◆ mq_pull_

ZmqLingeringSocketPtr RooFit::MultiProcess::Messenger::mq_pull_
private

Definition at line 128 of file Messenger_decl.h.

◆ mq_pull_poller_

ZeroMQPoller RooFit::MultiProcess::Messenger::mq_pull_poller_
private

Definition at line 131 of file Messenger_decl.h.

◆ mq_push_

ZmqLingeringSocketPtr RooFit::MultiProcess::Messenger::mq_push_
private

Definition at line 121 of file Messenger_decl.h.

◆ mq_push_poller_

ZeroMQPoller RooFit::MultiProcess::Messenger::mq_push_poller_
private

Definition at line 124 of file Messenger_decl.h.

◆ mw_pub_

ZmqLingeringSocketPtr RooFit::MultiProcess::Messenger::mw_pub_
private

Definition at line 134 of file Messenger_decl.h.

◆ mw_sub_

ZmqLingeringSocketPtr RooFit::MultiProcess::Messenger::mw_sub_
private

Definition at line 135 of file Messenger_decl.h.

◆ mw_sub_poller_

ZeroMQPoller RooFit::MultiProcess::Messenger::mw_sub_poller_
private

Definition at line 136 of file Messenger_decl.h.

◆ ppoll_sigmask

sigset_t RooFit::MultiProcess::Messenger::ppoll_sigmask

Definition at line 104 of file Messenger_decl.h.

◆ qw_pull_

std::vector<ZmqLingeringSocketPtr<> > RooFit::MultiProcess::Messenger::qw_pull_
private

Definition at line 126 of file Messenger_decl.h.

◆ qw_pull_poller_

std::vector<ZeroMQPoller> RooFit::MultiProcess::Messenger::qw_pull_poller_
private

Definition at line 130 of file Messenger_decl.h.

◆ qw_push_

std::vector<ZmqLingeringSocketPtr<> > RooFit::MultiProcess::Messenger::qw_push_
private

Definition at line 119 of file Messenger_decl.h.

◆ qw_push_poller_

std::vector<ZeroMQPoller> RooFit::MultiProcess::Messenger::qw_push_poller_
private

Definition at line 123 of file Messenger_decl.h.

◆ send_flag_

zmq::send_flags RooFit::MultiProcess::Messenger::send_flag_ = zmq::send_flags::none
private

Definition at line 147 of file Messenger_decl.h.

◆ this_worker_qw_pull_

ZmqLingeringSocketPtr RooFit::MultiProcess::Messenger::this_worker_qw_pull_
private

Definition at line 127 of file Messenger_decl.h.

◆ this_worker_qw_push_

ZmqLingeringSocketPtr RooFit::MultiProcess::Messenger::this_worker_qw_push_
private

Definition at line 120 of file Messenger_decl.h.

◆ wm_pull_

ZmqLingeringSocketPtr RooFit::MultiProcess::Messenger::wm_pull_
private

Definition at line 139 of file Messenger_decl.h.

◆ wm_pull_poller_

ZeroMQPoller RooFit::MultiProcess::Messenger::wm_pull_poller_
private

Definition at line 140 of file Messenger_decl.h.

◆ wm_push_

ZmqLingeringSocketPtr RooFit::MultiProcess::Messenger::wm_push_
private

Definition at line 138 of file Messenger_decl.h.

  • roofit/multiprocess/res/RooFit/MultiProcess/Messenger_decl.h
  • roofit/multiprocess/res/RooFit/MultiProcess/Messenger.h
  • roofit/multiprocess/src/Messenger.cxx