Logo ROOT  
Reference Guide
Messenger_decl.h
Go to the documentation of this file.
1/*
2 * Project: RooFit
3 * Authors:
4 * PB, Patrick Bos, Netherlands eScience Center, p.bos@esciencecenter.nl
5 * IP, Inti Pelupessy, Netherlands eScience Center, i.pelupessy@esciencecenter.nl
6 *
7 * Copyright (c) 2021, CERN
8 *
9 * Redistribution and use in source and binary forms,
10 * with or without modification, are permitted according to the terms
11 * listed in LICENSE (http://roofit.sourceforge.net/license.txt)
12 */
13#ifndef ROOT_ROOFIT_MultiProcess_Messenger_decl
14#define ROOT_ROOFIT_MultiProcess_Messenger_decl
15
19
20#include <iosfwd>
21#include <vector>
22#include <csignal> // sigprocmask, sigset_t, etc
23#include <string>
24
25namespace RooFit {
26namespace MultiProcess {
27
29
30// test messages
31enum class X2X : int { ping = -1, pong = -2, initial_value = 0 };
32
33class Messenger {
34public:
35 explicit Messenger(const ProcessManager &process_manager);
36 ~Messenger();
37
38 void test_connections(const ProcessManager &process_manager);
39
40 enum class test_snd_pipes {
41 M2Q,
42 Q2M,
43 Q2W,
44 W2Q,
45 };
46
47 enum class test_rcv_pipes {
48 fromQonM,
49 fromMonQ,
50 fromWonQ,
51 fromQonW,
52 };
53
54 std::pair<ZeroMQPoller, std::size_t> create_queue_poller();
55 std::pair<ZeroMQPoller, std::size_t> create_worker_poller();
56
57 // -- WORKER - QUEUE COMMUNICATION --
58
60 template <typename T, typename... Ts>
61 void send_from_worker_to_queue(T item, Ts... items);
62 template <typename value_t>
63 value_t receive_from_worker_on_queue(std::size_t this_worker_id);
64 void send_from_queue_to_worker(std::size_t this_worker_id);
65 template <typename T, typename... Ts>
66 void send_from_queue_to_worker(std::size_t this_worker_id, T item, Ts... items);
67 template <typename value_t>
69
70 // -- QUEUE - MASTER COMMUNICATION --
71
73
74 template <typename T, typename... Ts>
75 void send_from_queue_to_master(T item, Ts... items);
76 template <typename value_t>
79
80 template <typename T, typename... Ts>
81 void send_from_master_to_queue(T item, Ts... items);
82 template <typename value_t>
84
85 // -- MASTER - WORKER COMMUNICATION --
86
87 template <typename T>
89 template <typename T, typename T2, typename... Ts>
90 void publish_from_master_to_workers(T&& item, T2&& item2, Ts&&... items);
91 template <typename value_t>
92 value_t receive_from_master_on_worker(bool *more = nullptr);
93
95 template <typename T, typename... Ts>
96 void send_from_worker_to_master(T item, Ts... items);
97 template <typename value_t>
99
100 void test_receive(X2X expected_ping_value, test_rcv_pipes rcv_pipe, std::size_t worker_id);
101 void test_send(X2X ping_value, test_snd_pipes snd_pipe, std::size_t worker_id);
102
104
105 void set_send_flag(zmq::send_flags flag);
106
107private:
108 void debug_print(std::string s);
109
110 template<class T>
111 void bindAddr(T & socket, std::string && addr) {
112 bound_ipc_addresses_.emplace_back(addr);
113 socket->bind(bound_ipc_addresses_.back());
114 }
115
116 // push
117 std::vector<ZmqLingeringSocketPtr<>> qw_push_;
120 // pollers for all push sockets
121 std::vector<ZeroMQPoller> qw_push_poller_;
123 // pull
124 std::vector<ZmqLingeringSocketPtr<>> qw_pull_;
127 // pollers for all pull sockets
128 std::vector<ZeroMQPoller> qw_pull_poller_;
130
131 // publish/subscribe sockets for parameter updating from master to workers
135 // push/pull sockets for result retrieving from workers on master
139
140 // destruction flags to distinguish between different process-type setups:
144
145 zmq::send_flags send_flag_ = zmq::send_flags::none;
146
147 std::vector<std::string> bound_ipc_addresses_;
148};
149
150// Messages from master to queue
151enum class M2Q : int {
152 enqueue = 10,
153};
154
155// Messages from worker to queue
156enum class W2Q : int { dequeue = 30 };
157
158// Messages from queue to worker
159enum class Q2W : int {
160 dequeue_rejected = 40,
161 dequeue_accepted = 41,
162};
163
164// stream output operators for debugging
165std::ostream &operator<<(std::ostream &out, const M2Q value);
166std::ostream &operator<<(std::ostream &out, const Q2W value);
167std::ostream &operator<<(std::ostream &out, const W2Q value);
168std::ostream &operator<<(std::ostream &out, const X2X value);
169
170} // namespace MultiProcess
171} // namespace RooFit
172
173#endif // ROOT_ROOFIT_MultiProcess_Messenger_decl
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void value
std::unique_ptr< zmq::socket_t, ZmqLingeringSocketPtrDeleter< PERIOD > > ZmqLingeringSocketPtr
Definition: ZeroMQSvc.h:80
Manages ZeroMQ sockets and wraps send and receive calls.
void test_receive(X2X expected_ping_value, test_rcv_pipes rcv_pipe, std::size_t worker_id)
Definition: Messenger.cxx:289
void test_connections(const ProcessManager &process_manager)
Test whether push-pull sockets are working.
Definition: Messenger.cxx:351
value_t receive_from_master_on_worker(bool *more=nullptr)
Definition: Messenger.h:176
std::vector< ZmqLingeringSocketPtr<> > qw_push_
std::vector< std::string > bound_ipc_addresses_
void set_send_flag(zmq::send_flags flag)
Set the flag used in all send functions; 0, ZMQ_DONTWAIT, ZMQ_SNDMORE or bitwise combination.
Definition: Messenger.cxx:461
value_t receive_from_queue_on_worker()
Definition: Messenger.h:72
ZmqLingeringSocketPtr wm_push_
std::vector< ZeroMQPoller > qw_pull_poller_
Messenger(const ProcessManager &process_manager)
Definition: Messenger.cxx:63
std::vector< ZmqLingeringSocketPtr<> > qw_pull_
ZmqLingeringSocketPtr mw_pub_
void bindAddr(T &socket, std::string &&addr)
ZmqLingeringSocketPtr mq_pull_
ZmqLingeringSocketPtr wm_pull_
std::vector< ZeroMQPoller > qw_push_poller_
ZmqLingeringSocketPtr this_worker_qw_push_
void debug_print(std::string s)
Function called from send and receive template functions in debug builds used to monitor the messages...
Definition: Messenger.cxx:522
value_t receive_from_worker_on_queue(std::size_t this_worker_id)
Definition: Messenger.h:43
void publish_from_master_to_workers(T &&item)
specialization that sends the final message
Definition: Messenger.h:150
ZmqLingeringSocketPtr mw_sub_
void send_from_queue_to_worker(std::size_t this_worker_id)
Definition: Messenger.cxx:452
ZmqLingeringSocketPtr this_worker_qw_pull_
ZmqLingeringSocketPtr mq_push_
void test_send(X2X ping_value, test_snd_pipes snd_pipe, std::size_t worker_id)
Definition: Messenger.cxx:259
std::pair< ZeroMQPoller, std::size_t > create_worker_poller()
Helper function that creates a poller for worker_loop()
Definition: Messenger.cxx:440
std::pair< ZeroMQPoller, std::size_t > create_queue_poller()
Helper function that creates a poller for Queue::loop()
Definition: Messenger.cxx:429
Fork processes for queue and workers.
Wrapper class for polling ZeroMQ sockets.
Definition: ZeroMQPoller.h:26
#define T2
Definition: md5.inl:147
double T(double x)
Definition: ChebyshevPol.h:34
void set_socket_immediate(ZmqLingeringSocketPtr<> &socket)
Definition: Messenger.cxx:24
std::ostream & operator<<(std::ostream &out, const M2Q value)
Definition: Messenger.cxx:474
The namespace RooFit contains mostly switches that change the behaviour of functions of PDFs (or othe...
Definition: Common.h:18
static constexpr double s