Logo ROOT  
Reference Guide
ProcessManager.cxx
Go to the documentation of this file.
1/*
2 * Project: RooFit
3 * Authors:
4 * PB, Patrick Bos, Netherlands eScience Center, p.bos@esciencecenter.nl
5 * IP, Inti Pelupessy, Netherlands eScience Center, i.pelupessy@esciencecenter.nl
6 *
7 * Copyright (c) 2021, CERN
8 *
9 * Redistribution and use in source and binary forms,
10 * with or without modification, are permitted according to the terms
11 * listed in LICENSE (http://roofit.sourceforge.net/license.txt)
12 */
13
19
20#include <thread>
21#include <cstring> // for strsignal
22#include <sys/wait.h> // for wait
23#include <iostream>
24#include <unordered_set>
25
26namespace RooFit {
27namespace MultiProcess {
28
29/// \class ProcessManager
30/// \brief Fork processes for queue and workers
31///
32/// This class manages three types of processes:
33/// 1. master: the initial main process. It defines and enqueues tasks
34/// and processes results.
35/// 2. workers: a pool of processes that will try to take tasks from the
36/// queue. These are forked from master.
37/// 3. queue: This process runs the queue_loop and maintains the queue of
38/// tasks. It is also forked from master.
39///
40/// \param N_workers Number of worker processes to spawn.
41ProcessManager::ProcessManager(std::size_t N_workers) : N_workers_(N_workers)
42{
43 // Note: zmq context is automatically created in the ZeroMQSvc class and maintained as singleton,
44 // but we must close any possibly existing state before reusing it. This assumes that our Messenger
45 // is the only user of ZeroMQSvc and that there is only one Messenger at a time. Beware that
46 // this must be designed more carefully if either of these assumptions change! Note also that this
47 // call must be done before the ProcessManager forks new processes, otherwise the master process'
48 // context that will be cloned to all forked processes will be closed multiple times, which will
49 // hang, because the ZeroMQ context creates threads and these will not be cloned along with the
50 // fork. See the ZeroMQ documentation for more details on this. In principle, one could design this
51 // in a more finegrained way by keeping the context on the master process and only recreating it
52 // on child processes (while avoiding calling the destructor on the child processes!). This
53 // approach may offer more flexibility if this is needed in the future.
56}
57
59{
60 if (is_master()) {
61 terminate();
62 } else {
64 }
65}
66
67// static member initialization
68volatile sig_atomic_t ProcessManager::sigterm_received_ = 0;
69
70// static function
71/// We need this to tell the children to die, because we can't talk
72/// to them anymore during JobManager destruction, because that kills
73/// the Messenger first. We do that with SIGTERMs. The sigterm_received()
74/// should be checked in message loops to stop them when it's true.
76{
78}
79
80// static function
82{
83 if (sigterm_received_ > 0) {
84 return true;
85 } else {
86 return false;
87 }
88}
89
91{
92 pid_t child_pid = fork();
93 int retries = 0;
94 while (child_pid == -1) {
95 if (retries < 3) {
96 ++retries;
97 printf("fork returned with error number %d, retrying after 1 second...\n", errno);
98 sleep(1);
99 child_pid = fork();
100 } else {
101 printf("fork returned with error number %d\n", errno);
102 throw std::runtime_error("fork returned with error 3 times, aborting!");
103 }
104 }
105 return child_pid;
106}
107
108/// \brief Fork processes and activate CPU pinning
109///
110/// \param cpu_pinning Activate CPU pinning if true. Effective on Linux only.
112{
113 // Initialize processes;
114 // ... first workers:
115
116 // Setup process timer master and assign pid_t 999
118
119 worker_pids_.resize(N_workers_);
120 pid_t child_pid{};
121 for (std::size_t ix = 0; ix < N_workers_; ++ix) {
122 child_pid = fork_and_handle_errors();
123 if (!child_pid) { // we're on the worker
124 // Setup process timer, do not overwrite begin time, this keeps timing
125 // synced between worker and master processes. The forked process keeps
126 // the master process' begin time
128 is_worker_ = true;
129 worker_id_ = ix;
130 break;
131 } else { // we're on master
132 worker_pids_[ix] = child_pid;
133 }
134 }
135
136 // ... then queue:
137 if (child_pid) { // we're on master
139 if (!queue_pid_) { // we're now on queue
140 is_queue_ = true;
141 } else {
142 is_master_ = true;
143 }
144 }
145
146 // set the sigterm handler on the child processes
147 if (!is_master_) {
148 struct sigaction sa;
149 memset(&sa, '\0', sizeof(sa));
150 sa.sa_handler = ProcessManager::handle_sigterm;
151
152 if (sigaction(SIGTERM, &sa, nullptr) < 0) {
153 std::perror("sigaction failed");
154 std::exit(1);
155 }
156 }
157
158 if (cpu_pinning) {
159#if defined(__APPLE__)
160#ifndef NDEBUG
161 static bool affinity_warned = false;
162 if (is_master() & !affinity_warned) {
163 std::cout << "CPU affinity cannot be set on macOS" << std::endl;
164 affinity_warned = true;
165 }
166#endif // NDEBUG
167#elif defined(_WIN32)
168#ifndef NDEBUG
169 if (is_master())
170 std::cerr << "WARNING: CPU affinity setting not implemented on Windows, continuing..." << std::endl;
171#endif // NDEBUG
172#else
173 cpu_set_t mask;
174 // zero all bits in mask
175 CPU_ZERO(&mask);
176 // set correct bit
177 std::size_t set_cpu;
178 if (is_master()) {
179 set_cpu = N_workers() + 1;
180 } else if (is_queue()) {
181 set_cpu = N_workers();
182 } else {
183 set_cpu = worker_id();
184 }
185 CPU_SET(set_cpu, &mask);
186#ifndef NDEBUG
187 // sched_setaffinity returns 0 on success
188 if (sched_setaffinity(0, sizeof(mask), &mask) == -1) {
189 std::cerr << "WARNING: Could not set CPU affinity, continuing..." << std::endl;
190 } else {
191 std::cerr << "CPU affinity set to cpu " << set_cpu << " in process " << getpid() << std::endl;
192 }
193#endif // NDEBUG
194#endif
195 }
196
197#ifndef NDEBUG
199#endif // NDEBUG
200
201 initialized_ = true;
202}
203
205{
206 return initialized_;
207}
208
209/// Shutdown forked processes if on master and if this process manager is initialized
211{
212 try {
213 if (is_master() && is_initialized()) {
215 }
216 } catch (const std::exception &e) {
217 std::cerr << "WARNING: something in ProcessManager::terminate threw an exception! Original exception message:\n"
218 << e.what() << std::endl;
219 }
220}
221
223{
224 if (!is_master()) {
225 while (!sigterm_received()) {
226 }
227 std::_Exit(0);
228 }
229}
230
232{
233 int status = 0;
234 pid_t pid;
235 do {
236 pid = wait(&status);
237 } while (-1 == pid && EINTR == errno); // retry on interrupted system call
238
239 if (0 != status) {
240 if (WIFEXITED(status)) {
241 printf("exited, status=%d\n", WEXITSTATUS(status));
242 } else if (WIFSIGNALED(status)) {
243 if (WTERMSIG(status) != SIGTERM) {
244 printf("killed by signal %d\n", WTERMSIG(status));
245 }
246 } else if (WIFSTOPPED(status)) {
247 printf("stopped by signal %d\n", WSTOPSIG(status));
248 } else if (WIFCONTINUED(status)) {
249 printf("continued\n");
250 }
251 }
252
253 if (-1 == pid) {
254 if (errno == ECHILD) {
255 printf("chill_wait: no children (got ECHILD error code from wait call), done\n");
256 } else {
257 throw std::runtime_error(std::string("chill_wait: error in wait call: ") + strerror(errno) +
258 std::string(", errno ") + std::to_string(errno));
259 }
260 }
261
262 return pid;
263}
264
265/// Shutdown forked processes if on master
267{
268 if (is_master()) {
270 // Give children some time to write to file
271 if (RooFit::MultiProcess::Config::getTimingAnalysis()) std::this_thread::sleep_for(std::chrono::seconds(2));
272 // terminate all children
273 std::unordered_set<pid_t> children;
274 children.insert(queue_pid_);
275 kill(queue_pid_, SIGTERM);
276 for (auto pid : worker_pids_) {
277 kill(pid, SIGTERM);
278 children.insert(pid);
279 }
280 // then wait for them to actually die and clean out the zombies
281 while (!children.empty()) {
282 pid_t pid = chill_wait();
283 children.erase(pid);
284 }
285 }
286
287 initialized_ = false;
288}
289
290// Getters
291
293{
294 return is_master_;
295}
296
298{
299 return is_queue_;
300}
301
303{
304 return is_worker_;
305}
306
307std::size_t ProcessManager::worker_id() const
308{
309 return worker_id_;
310}
311
312std::size_t ProcessManager::N_workers() const
313{
314 return N_workers_;
315}
316
317/// Print to stdout which type of process we are on and what its PID is (for debugging)
319{
320 if (is_worker_) {
321 printf("I'm a worker, PID %d\n", getpid());
322 } else if (is_master_) {
323 printf("I'm master, PID %d\n", getpid());
324 } else if (is_queue_) {
325 printf("I'm queue, PID %d\n", getpid());
326 } else {
327 printf("I'm not master, queue or worker, weird! PID %d\n", getpid());
328 }
329}
330
331} // namespace MultiProcess
332} // namespace RooFit
#define e(i)
Definition: RSha256.hxx:103
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t mask
R__EXTERN C unsigned int sleep(unsigned int seconds)
ZeroMQSvc & zmqSvc()
Get singleton object of this class.
Definition: ZeroMQSvc.cpp:34
static bool getTimingAnalysis()
Definition: Config.cxx:87
void initialize_processes(bool cpu_pinning=true)
Fork processes and activate CPU pinning.
static void handle_sigterm(int signum)
We need this to tell the children to die, because we can't talk to them anymore during JobManager des...
void identify_processes() const
Print to stdout which type of process we are on and what its PID is (for debugging)
void terminate() noexcept
Shutdown forked processes if on master and if this process manager is initialized.
ProcessManager(std::size_t N_workers)
void shutdown_processes()
Shutdown forked processes if on master.
static volatile sig_atomic_t sigterm_received_
static void setup(pid_t proc, bool set_begin=true)
Definition: ProcessTimer.h:31
void close_context() const
Definition: ZeroMQSvc.cpp:114
The namespace RooFit contains mostly switches that change the behaviour of functions of PDFs (or othe...
Definition: Common.h:18