Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
Queue.cxx
Go to the documentation of this file.
1/*
2 * Project: RooFit
3 * Authors:
4 * PB, Patrick Bos, Netherlands eScience Center, p.bos@esciencecenter.nl
5 * IP, Inti Pelupessy, Netherlands eScience Center, i.pelupessy@esciencecenter.nl
6 *
7 * Copyright (c) 2021, CERN
8 *
9 * Redistribution and use in source and binary forms,
10 * with or without modification, are permitted according to the terms
11 * listed in LICENSE (http://roofit.sourceforge.net/license.txt)
12 */
13
18
19namespace RooFit {
20namespace MultiProcess {
21
22/** \class Queue
23 * \brief Keeps a queue of tasks for workers and manages the queue process through its event loop
24 *
25 * The Queue maintains a set of tasks on the queue process by receiving them
26 * from the master process. Worker processes can request to pop them off the
27 * queue. The communication between these processes is handled inside
28 * 'Queue::loop()', the queue process's event loop that polls the Messenger's
29 * sockets for incoming messages and handles them when they come.
30 *
31 * The reason for this class is to get automatic load balancing between
32 * workers. By allowing workers to request tasks whenever they are ready to
33 * do work, we don't need to manually distribute work over workers and they
34 * will always have something to do until all tasks have been completed.
35 * The alternative simple strategy of just distributing all tasks evenly over
36 * workers will be suboptimal when tasks have different or even varying
37 * runtimes (this simple strategy could be implemented with a PUSH-PULL
38 * ZeroMQ socket from master to workers, which would distribute tasks in a
39 * round-robin fashion, which, indeed, does not do load balancing).
40 */
41
42/// Helper function for 'Queue::loop()'
44{
45 switch (message) {
46 case M2Q::enqueue: {
47 // enqueue task
48 auto job_object_id = JobManager::instance()->messenger().receive_from_master_on_queue<std::size_t>();
51 JobTask job_task{job_object_id, state_id, task_id};
52 add(job_task);
53 N_tasks_++;
54 break;
55 }
56 }
57}
58
59/// Helper function for 'Queue::loop()'
60void Queue::process_worker_message(std::size_t this_worker_id, W2Q message)
61{
62 switch (message) {
63 case W2Q::dequeue: {
64 // dequeue task
65 JobTask job_task;
66 bool popped = pop(job_task);
67 if (popped) {
68 // Note: below two commands should be run atomically for thread safety (if that ever becomes an issue)
70 this_worker_id, Q2W::dequeue_accepted, job_task.job_id, job_task.state_id, job_task.task_id);
72 } else {
74 }
75 break;
76 }
77 }
78}
79
80/// \brief The queue process's event loop
81///
82/// Polls for incoming messages from other processes and handles them.
84{
85 assert(JobManager::instance()->process_manager().is_queue());
86 ZeroMQPoller poller;
87 std::size_t mq_index;
88 std::tie(poller, mq_index) = JobManager::instance()->messenger().create_queue_poller();
89
90 // Before blocking SIGTERM, set the signal handler, so we can also check after blocking whether a signal occurred
91 // In our case, we already set it in the ProcessManager after forking to the queue and worker processes.
92
93 sigset_t sigmask;
94 sigemptyset(&sigmask);
95 sigaddset(&sigmask, SIGTERM);
96 sigprocmask(SIG_BLOCK, &sigmask, &JobManager::instance()->messenger().ppoll_sigmask);
97
98 // Before doing anything, check whether we have received a terminate signal while blocking signals!
99 // In this case, we also do that in the while condition.
101 try { // watch for zmq_error from ppoll caused by SIGTERM from master
102 // poll: wait until status change (-1: infinite timeout)
103 auto poll_result = poller.ppoll(-1, &JobManager::instance()->messenger().ppoll_sigmask);
104 // then process incoming messages from sockets
105 for (auto readable_socket : poll_result) {
106 // message comes from the master/queue socket (first element):
107 if (readable_socket.first == mq_index) {
109 process_master_message(message);
110 } else { // from a worker socket
111 auto this_worker_id = readable_socket.first - 1;
112 auto message = JobManager::instance()->messenger().receive_from_worker_on_queue<W2Q>(this_worker_id);
113 process_worker_message(this_worker_id, message);
114 }
115 }
116 } catch (ZMQ::ppoll_error_t &e) {
118 try {
119 response = handle_zmq_ppoll_error(e);
120 } catch (std::logic_error &) {
121 printf("queue loop got unhandleable ZMQ::ppoll_error_t\n");
122 throw;
123 }
124 if (response == zmq_ppoll_error_response::abort) {
125 break;
126 } else if (response == zmq_ppoll_error_response::unknown_eintr) {
127 printf("EINTR in queue loop but no SIGTERM received, continuing\n");
128 continue;
129 } else if (response == zmq_ppoll_error_response::retry) {
130 printf("EAGAIN from ppoll in queue loop, continuing\n");
131 continue;
132 }
133 } catch (zmq::error_t &e) {
134 printf("unhandled zmq::error_t (not a ppoll_error_t) in queue loop with errno %d: %s\n", e.num(), e.what());
135 throw;
136 }
137 }
138
139 // clean up signal management modifications
140 sigprocmask(SIG_SETMASK, &JobManager::instance()->messenger().ppoll_sigmask, nullptr);
141}
142
143} // namespace MultiProcess
144} // namespace RooFit
#define e(i)
Definition RSha256.hxx:103
static JobManager * instance()
value_t receive_from_worker_on_queue(std::size_t this_worker_id)
Definition Messenger.h:43
void send_from_queue_to_worker(std::size_t this_worker_id)
std::pair< ZeroMQPoller, std::size_t > create_queue_poller()
Helper function that creates a poller for Queue::loop()
virtual bool pop(JobTask &job_task)=0
Have a worker ask for a task-message from the queue.
void loop()
The queue process's event loop.
Definition Queue.cxx:83
virtual void add(JobTask job_task)=0
Enqueue a task.
void process_master_message(M2Q message)
Helper function for 'Queue::loop()'.
Definition Queue.cxx:43
std::size_t N_tasks_at_workers_
Definition Queue.h:44
void process_worker_message(std::size_t this_worker_id, W2Q message)
Helper function for 'Queue::loop()'.
Definition Queue.cxx:60
Wrapper class for polling ZeroMQ sockets.
std::vector< std::pair< size_t, zmq::event_flags > > ppoll(int timeo, const sigset_t *sigmask)
Poll the sockets with ppoll.
std::size_t State
Definition types.h:23
zmq_ppoll_error_response handle_zmq_ppoll_error(ZMQ::ppoll_error_t &e)
Definition util.cxx:64
std::size_t Task
Definition types.h:22
The namespace RooFit contains mostly switches that change the behaviour of functions of PDFs (or othe...
Definition JSONIO.h:26
combined job_object, state and task identifier type
Definition types.h:25