Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
ZeroMQSvc.h
Go to the documentation of this file.
1/*
2 * Project: RooFit
3 * Authors:
4 * RA, Roel Aaij, NIKHEF
5 * PB, Patrick Bos, Netherlands eScience Center, p.bos@esciencecenter.nl
6 *
7 * Copyright (c) 2021, CERN
8 *
9 * Redistribution and use in source and binary forms,
10 * with or without modification, are permitted according to the terms
11 * listed in LICENSE (http://roofit.sourceforge.net/license.txt)
12 */
13
14#ifndef ZEROMQ_IZEROMQSVC_H
15#define ZEROMQ_IZEROMQSVC_H 1
16
17#include <zmq.hpp>
18#include "RooFit_ZMQ/Utility.h"
20
21#include <type_traits>
22#include <string>
23#include <vector>
24#include <sstream>
25#include <ios>
26#include <iostream> // std::cerr
27#include <functional>
28#include <memory>
29
30// debugging
31#include <unistd.h> // getpid
32
33namespace ZMQ {
34
35struct TimeOutException : std::exception {
36 TimeOutException() = default;
37};
38
39struct MoreException : std::exception {
40 MoreException() = default;
41};
42
43} // namespace ZMQ
44
45template <int PERIOD = 0>
47 void operator()(zmq::socket_t *socket)
48 {
49 int tries = 0;
50 int max_tries = 3;
51 while (true) {
52 try {
53 // the actual work this function should do, plus the delete socket below:
54 if (socket)
55 socket->set(zmq::sockopt::linger, PERIOD);
56 break;
57 } catch (zmq::error_t &e) {
58 if (++tries == max_tries || e.num() == EINVAL || e.num() == ETERM ||
59 e.num() == ENOTSOCK // not recoverable from here
60 ) {
61 std::cerr << "ERROR in ZeroMQSvc::socket: " << e.what() << " (errno: " << e.num() << ")\n";
62 throw;
63 }
64 std::cerr << "RETRY " << tries << "/" << (max_tries - 1)
65 << " in ZmqLingeringSocketPtrDeleter: call interrupted (errno: " << e.num() << ")\n";
66 }
67 }
68
69 delete socket;
70 }
71};
72
73template <int PERIOD = 0>
74using ZmqLingeringSocketPtr = std::unique_ptr<zmq::socket_t, ZmqLingeringSocketPtrDeleter<PERIOD>>;
75
76// We retry send and receive only on EINTR, all other errors are either fatal, or can only
77// be handled at the caller.
78template <typename... args_t>
79auto retry_send(zmq::socket_t &socket, int max_tries, args_t... args) -> decltype(socket.send(args...))
80{
81 int tries = 0;
82 while (true) {
83 try {
84 // the actual work this function should do, all the rest is error handling:
85 return socket.send(args...);
86 } catch (zmq::error_t &e) {
87 if (++tries == max_tries || e.num() != EINTR // only recoverable error
88 ) {
89 throw;
90 }
91 std::cerr << "RETRY " << tries << "/" << (max_tries - 1) << " in ZeroMQSvc::send (retry_send) on pid "
92 << getpid() << ": " << e.what() << ")\n";
93 }
94 }
95}
96
97template <typename... args_t>
98auto retry_recv(zmq::socket_t &socket, int max_tries, args_t... args) -> decltype(socket.recv(args...))
99{
100 int tries = 0;
101 while (true) {
102 try {
103 // the actual work this function should do, all the rest is error handling:
104 return socket.recv(args...);
105 } catch (zmq::error_t &e) {
106 if (++tries == max_tries || e.num() != EINTR // only recoverable error
107 ) {
108 throw;
109 }
110 std::cerr << "RETRY " << tries << "/" << (max_tries - 1) << " in ZeroMQSvc::recv (retry_recv) on pid "
111 << getpid() << ": " << e.what() << ")\n";
112 }
113 }
114}
115
117 // Note on error handling:
118 // Creating message_t can throw, but only when memory ran out (errno ENOMEM),
119 // and that is something only the caller can fix, so we don't catch it here.
120
121public:
122 enum Encoding { Text = 0, Binary };
123
124 Encoding encoding() const;
125 void setEncoding(const Encoding &e);
126 zmq::context_t &context() const;
127 zmq::socket_t socket(zmq::socket_type type) const;
128 zmq::socket_t *socket_ptr(zmq::socket_type type) const;
129 void close_context() const;
130
131 /// decode message with ZMQ, POD version
132 template <class T, typename std::enable_if<!std::is_pointer<T>::value && ZMQ::Detail::is_trivial<T>::value, T>::type
133 * = nullptr>
134 T decode(const zmq::message_t &msg) const
135 {
136 T object;
137 memcpy(&object, msg.data(), msg.size());
138 return object;
139 }
140
141 /// decode ZMQ message, string version
142 template <class T, typename std::enable_if<std::is_same<T, std::string>::value, T>::type * = nullptr>
143 std::string decode(const zmq::message_t &msg) const
144 {
145 std::string r(msg.size() + 1, char{});
146 r.assign(static_cast<const char *>(msg.data()), msg.size());
147 return r;
148 }
149
150 /// receive message with ZMQ, general version
151 // FIXME: what to do with flags=nullptr.... more is a pointer, that might prevent conversion
152 template <class T, typename std::enable_if<!(std::is_same<zmq::message_t, T>::value), T>::type * = nullptr>
153 T receive(zmq::socket_t &socket, zmq::recv_flags flags = zmq::recv_flags::none, bool *more = nullptr) const
154 {
155 // receive message
156 zmq::message_t msg;
157 auto recv_result = retry_recv(socket, 2, std::ref(msg), flags);
158 if (!recv_result) {
159 throw ZMQ::TimeOutException{};
160 }
161 if (more)
162 *more = msg.more();
163
164 // decode message
165 return decode<T>(msg);
166 }
167
168 /// receive message with ZMQ
169 template <class T, typename std::enable_if<std::is_same<zmq::message_t, T>::value, T>::type * = nullptr>
170 T receive(zmq::socket_t &socket, zmq::recv_flags flags = zmq::recv_flags::none, bool *more = nullptr) const
171 {
172 // receive message
173 zmq::message_t msg;
174 auto recv_result = retry_recv(socket, 2, std::ref(msg), flags);
175 if (!recv_result) {
176 throw ZMQ::TimeOutException{};
177 }
178 if (more)
179 *more = msg.more();
180 return msg;
181 }
182
183 /// encode message to ZMQ
184 template <class T, typename std::enable_if<!std::is_pointer<T>::value && ZMQ::Detail::is_trivial<T>::value, T>::type
185 * = nullptr>
186 zmq::message_t encode(const T &item, std::function<size_t(const T &t)> sizeFun = ZMQ::defaultSizeOf<T>) const
187 {
188 size_t s = sizeFun(item);
189 zmq::message_t msg{s};
190 memcpy((void *)msg.data(), &item, s);
191 return msg;
192 }
193
194 zmq::message_t encode(const char *item) const;
195 zmq::message_t encode(const std::string &item) const;
196
197 /// Send message with ZMQ
198 template <class T, typename std::enable_if<!std::is_same<T, zmq::message_t>::value, T>::type * = nullptr>
199 zmq::send_result_t send(zmq::socket_t &socket, const T &item, zmq::send_flags flags = zmq::send_flags::none) const
200 {
201 return retry_send(socket, 1, encode(item), flags);
202 }
203
204 zmq::send_result_t
205 send(zmq::socket_t &socket, const char *item, zmq::send_flags flags = zmq::send_flags::none) const;
206 zmq::send_result_t
207 send(zmq::socket_t &socket, zmq::message_t &msg, zmq::send_flags flags = zmq::send_flags::none) const;
208 zmq::send_result_t
209 send(zmq::socket_t &socket, zmq::message_t &&msg, zmq::send_flags flags = zmq::send_flags::none) const;
210
211private:
213 mutable zmq::context_t *m_context = nullptr;
214};
215
217
218#endif // ZEROMQ_IZEROMQSVC_H
#define e(i)
Definition RSha256.hxx:103
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h Atom_t Int_t ULong_t ULong_t unsigned char prop_list Atom_t Atom_t Atom_t Time_t type
std::unique_ptr< zmq::socket_t, ZmqLingeringSocketPtrDeleter< PERIOD > > ZmqLingeringSocketPtr
Definition ZeroMQSvc.h:74
auto retry_send(zmq::socket_t &socket, int max_tries, args_t... args) -> decltype(socket.send(args...))
Definition ZeroMQSvc.h:79
auto retry_recv(zmq::socket_t &socket, int max_tries, args_t... args) -> decltype(socket.recv(args...))
Definition ZeroMQSvc.h:98
ZeroMQSvc & zmqSvc()
Get singleton object of this class.
Definition ZeroMQSvc.cpp:34
Wrapper class for basic ZeroMQ context and socket management.
Definition ZeroMQSvc.h:116
Encoding m_enc
Definition ZeroMQSvc.h:212
zmq::context_t * m_context
Definition ZeroMQSvc.h:213
zmq::message_t encode(const T &item, std::function< size_t(const T &t)> sizeFun=ZMQ::defaultSizeOf< T >) const
encode message to ZMQ
Definition ZeroMQSvc.h:186
zmq::send_result_t send(zmq::socket_t &socket, const T &item, zmq::send_flags flags=zmq::send_flags::none) const
Send message with ZMQ.
Definition ZeroMQSvc.h:199
zmq::socket_t * socket_ptr(zmq::socket_type type) const
Create and return a new socket by pointer.
Encoding encoding() const
Definition ZeroMQSvc.cpp:43
T decode(const zmq::message_t &msg) const
decode message with ZMQ, POD version
Definition ZeroMQSvc.h:134
zmq::context_t & context() const
Get context.
Definition ZeroMQSvc.cpp:63
T receive(zmq::socket_t &socket, zmq::recv_flags flags=zmq::recv_flags::none, bool *more=nullptr) const
receive message with ZMQ, general version
Definition ZeroMQSvc.h:153
void close_context() const
void setEncoding(const Encoding &e)
Set encoding mode.
Definition ZeroMQSvc.cpp:53
std::string decode(const zmq::message_t &msg) const
decode ZMQ message, string version
Definition ZeroMQSvc.h:143
MoreException()=default
TimeOutException()=default
void operator()(zmq::socket_t *socket)
Definition ZeroMQSvc.h:47