Logo ROOT  
Reference Guide
worker.cxx
Go to the documentation of this file.
1/*
2 * Project: RooFit
3 * Authors:
4 * PB, Patrick Bos, Netherlands eScience Center, p.bos@esciencecenter.nl
5 * IP, Inti Pelupessy, Netherlands eScience Center, i.pelupessy@esciencecenter.nl
6 *
7 * Copyright (c) 2021, CERN
8 *
9 * Redistribution and use in source and binary forms,
10 * with or without modification, are permitted according to the terms
11 * listed in LICENSE (http://roofit.sourceforge.net/license.txt)
12 */
13
15
23
24#include <string>
25#include <unistd.h> // getpid, pid_t
26#include <cerrno> // EINTR
27#include <csignal> // sigprocmask etc
28
29namespace RooFit {
30namespace MultiProcess {
31
32static bool worker_loop_running = false;
33
35{
37}
38
39/// \brief The worker processes' event loop
40///
41/// Asks the queue process for tasks, polls for incoming messages from other
42/// processes and handles them.
44{
45 assert(JobManager::instance()->process_manager().is_worker());
47 Q2W message_q2w;
48
49 // use a flag to not ask twice
50 bool dequeue_acknowledged = true;
51
52 ZeroMQPoller poller;
53 std::size_t mw_sub_index;
54
55 std::tie(poller, mw_sub_index) = JobManager::instance()->messenger().create_worker_poller();
56
57 // Before blocking SIGTERM, set the signal handler, so we can also check after blocking whether a signal occurred
58 // In our case, we already set it in the ProcessManager after forking to the queue and worker processes.
59
60 sigset_t sigmask;
61 sigemptyset(&sigmask);
62 sigaddset(&sigmask, SIGTERM);
63 sigprocmask(SIG_BLOCK, &sigmask, &JobManager::instance()->messenger().ppoll_sigmask);
64
65 // Before doing anything, check whether we have received a terminate signal while blocking signals!
66 // In this case, we also do that in the while condition.
68 try { // watch for error from ppoll (which is called inside receive functions) caused by SIGTERM from master
69
70 // try to dequeue a task
71 if (dequeue_acknowledged) { // don't ask twice
73 dequeue_acknowledged = false;
74 }
75
76 // wait for handshake from queue or update from SUB socket
77 auto poll_result = poller.ppoll(-1, &JobManager::instance()->messenger().ppoll_sigmask);
78 // because the poller may now have a waiting update from master over the SUB socket,
79 // but the queue socket could be first in the poll_result vector, and during handling
80 // of a new task it is possible we need to already receive the updated state over SUB,
81 // we have to then flip this boolean so that in the for loop when we reach the SUB
82 // socket's result, we can skip it (otherwise we will hang there, because no more
83 // updated state will be coming):
84 bool skip_sub = false;
85 // then process incoming messages from sockets
86 for (auto readable_socket : poll_result) {
87 // message comes from the master-worker SUB socket (first element):
88 if (readable_socket.first == mw_sub_index) {
89 if (!skip_sub) {
90 auto job_id = JobManager::instance()->messenger().receive_from_master_on_worker<std::size_t>();
92 }
93 } else { // from queue socket
95 switch (message_q2w) {
97 dequeue_acknowledged = true;
98 break;
99 }
101 dequeue_acknowledged = true;
102 auto job_id = JobManager::instance()->messenger().receive_from_queue_on_worker<std::size_t>();
105
106 // while loop, because multiple jobs may have updated state coming
107 while (state_id != JobManager::get_job_object(job_id)->get_state_id()) {
108 skip_sub = true;
109 auto job_id_for_state =
111 JobManager::get_job_object(job_id_for_state)->update_state();
112 }
113 if (RooFit::MultiProcess::Config::getTimingAnalysis()) ProcessTimer::start_timer("worker:eval_task:" + std::to_string(task_id));
115 if (RooFit::MultiProcess::Config::getTimingAnalysis()) ProcessTimer::end_timer("worker:eval_task:" + std::to_string(task_id));
117
118 break;
119 }
120 }
121 }
122 }
123
124 } catch (ZMQ::ppoll_error_t &e) {
126 try {
127 response = handle_zmq_ppoll_error(e);
128 } catch (std::logic_error &) {
129 printf("worker loop at PID %d got unhandleable ZMQ::ppoll_error_t\n", getpid());
130 throw;
131 }
132 if (response == zmq_ppoll_error_response::abort) {
133 break;
134 } else if (response == zmq_ppoll_error_response::unknown_eintr) {
135 printf("EINTR in worker loop at PID %d but no SIGTERM received, continuing\n", getpid());
136 continue;
137 } else if (response == zmq_ppoll_error_response::retry) {
138 printf("EAGAIN from ppoll in worker loop at PID %d, continuing\n", getpid());
139 continue;
140 }
141 } catch (zmq::error_t &e) {
142 printf("unhandled zmq::error_t (not a ppoll_error_t) in worker loop at PID %d with errno %d: %s\n", getpid(),
143 e.num(), e.what());
144 throw;
145 }
146 }
147
149
150 // clean up signal management modifications
151 sigprocmask(SIG_SETMASK, &JobManager::instance()->messenger().ppoll_sigmask, nullptr);
152
153 worker_loop_running = false;
154}
155
156} // namespace MultiProcess
157} // namespace RooFit
#define e(i)
Definition: RSha256.hxx:103
static bool getTimingAnalysis()
Definition: Config.cxx:87
Messenger & messenger() const
Definition: JobManager.cxx:151
static JobManager * instance()
Definition: JobManager.cxx:49
static Job * get_job_object(std::size_t job_object_id)
Definition: JobManager.cxx:130
std::size_t get_state_id()
Get the current state identifier.
Definition: Job.cxx:145
virtual void send_back_task_result_from_worker(std::size_t task)=0
virtual void update_state()
Virtual function to update any necessary state on workers.
Definition: Job.cxx:142
virtual void evaluate_task(std::size_t task)=0
value_t receive_from_master_on_worker(bool *more=nullptr)
Definition: Messenger.h:176
value_t receive_from_queue_on_worker()
Definition: Messenger.h:72
std::pair< ZeroMQPoller, std::size_t > create_worker_poller()
Helper function that creates a poller for worker_loop()
Definition: Messenger.cxx:440
static void start_timer(std::string section_name)
static void end_timer(std::string section_name)
Wrapper class for polling ZeroMQ sockets.
Definition: ZeroMQPoller.h:26
std::vector< std::pair< size_t, zmq::event_flags > > ppoll(int timeo, const sigset_t *sigmask)
Poll the sockets with ppoll.
void worker_loop()
The worker processes' event loop.
Definition: worker.cxx:43
bool is_worker_loop_running()
Definition: worker.cxx:34
std::size_t State
Definition: types.h:23
zmq_ppoll_error_response handle_zmq_ppoll_error(ZMQ::ppoll_error_t &e)
Definition: util.cxx:64
static bool worker_loop_running
Definition: worker.cxx:32
std::size_t Task
Definition: types.h:22
The namespace RooFit contains mostly switches that change the behaviour of functions of PDFs (or othe...
Definition: Common.h:18