Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
LikelihoodJob.cxx
Go to the documentation of this file.
1/*
2 * Project: RooFit
3 * Authors:
4 * PB, Patrick Bos, Netherlands eScience Center, p.bos@esciencecenter.nl
5 *
6 * Copyright (c) 2021, CERN
7 *
8 * Redistribution and use in source and binary forms,
9 * with or without modification, are permitted according to the terms
10 * listed in LICENSE (http://roofit.sourceforge.net/license.txt)
11 */
12
13#include "LikelihoodJob.h"
14
15#include "LikelihoodSerial.h"
27#include "RooRealVar.h"
28#include "RooNaNPacker.h"
29
30#include "TMath.h" // IsNaN
31
32namespace RooFit {
33namespace TestStatistics {
34
35LikelihoodJob::LikelihoodJob(std::shared_ptr<RooAbsL> likelihood,
36 std::shared_ptr<WrapperCalculationCleanFlags> calculation_is_clean, SharedOffset offset)
37 : LikelihoodWrapper(std::move(likelihood), std::move(calculation_is_clean), std::move(offset)),
38 n_event_tasks_(MultiProcess::Config::LikelihoodJob::defaultNEventTasks),
39 n_component_tasks_(MultiProcess::Config::LikelihoodJob::defaultNComponentTasks),
41{
42 init_vars();
44}
45
46// This is a separate function (instead of just in ctor) for historical reasons.
47// Its predecessor RooRealMPFE::initVars() was used from multiple ctors, but also
48// from RooRealMPFE::constOptimizeTestStatistic at the end, which makes sense,
49// because it might change the set of variables. We may at some point want to do
50// this here as well.
52{
53 // Empty current lists
54 vars_.removeAll();
55 save_vars_.removeAll();
56
57 // Retrieve non-constant parameters
58 std::unique_ptr<RooArgSet> vars{likelihood_->getParameters()};
59 // TODO: make sure this is the right list of parameters, compare to original
60 // implementation in RooRealMPFE.cxx
61
62 RooArgList varList(*vars);
63
64 // Save in lists
65 vars_.add(varList);
66 save_vars_.addClone(varList);
67}
68
70{
71 if (get_manager()->process_manager().is_worker()) {
72 bool more;
73
75 assert(more);
76
77 switch (mode) {
80 assert(more);
81 auto message = get_manager()->messenger().receive_from_master_on_worker<zmq::message_t>(&more);
82 auto message_begin = message.data<update_state_t>();
83 auto message_end = message_begin + message.size() / sizeof(update_state_t);
84 std::vector<update_state_t> to_update(message_begin, message_end);
85 for (auto const &item : to_update) {
86 RooRealVar *rvar = static_cast<RooRealVar *>(vars_.at(item.var_index));
87 rvar->setVal(static_cast<double>(item.value));
88 if (rvar->isConstant() != item.is_constant) {
89 rvar->setConstant(static_cast<bool>(item.is_constant));
90 }
91 }
92
93 if (more) {
94 // offsets also incoming
95 auto offsets_message = get_manager()->messenger().receive_from_master_on_worker<zmq::message_t>(&more);
96 assert(!more);
97 auto offsets_message_begin = offsets_message.data<ROOT::Math::KahanSum<double>>();
98 std::size_t N_offsets = offsets_message.size() / sizeof(ROOT::Math::KahanSum<double>);
99 shared_offset_.offsets().resize(N_offsets);
100 auto offsets_message_end = offsets_message_begin + N_offsets;
101 std::copy(offsets_message_begin, offsets_message_end, shared_offset_.offsets().begin());
102 }
103
104 break;
105 }
107 LikelihoodWrapper::enableOffsetting(get_manager()->messenger().receive_from_master_on_worker<bool>(&more));
108 assert(!more);
109 break;
110 }
111 }
112 }
113}
114
115/// \warning In automatic mode, this function can start MultiProcess (forks, starts workers, etc)!
117{
118 std::size_t val = n_event_tasks_;
121 }
122 if (val > likelihood_->getNEvents()) {
123 val = likelihood_->getNEvents();
124 }
125 return val;
126}
127
129{
130 std::size_t val = n_component_tasks_;
132 val = 1;
133 }
134 if (val > likelihood_->getNComponents()) {
135 val = likelihood_->getNComponents();
136 }
137 return val;
138}
139
141{
142 if (get_manager()->process_manager().is_master()) {
143 bool valChanged = false;
144 bool constChanged = false;
145 std::vector<update_state_t> to_update;
146 for (std::size_t ix = 0u; ix < static_cast<std::size_t>(vars_.size()); ++ix) {
147 valChanged = !vars_[ix].isIdentical(save_vars_[ix], true);
148 constChanged = (vars_[ix].isConstant() != save_vars_[ix].isConstant());
149
150 if (valChanged || constChanged) {
151 if (constChanged) {
152 (static_cast<RooRealVar *>(&save_vars_[ix]))->setConstant(vars_[ix].isConstant());
153 }
154 // TODO: Check with Wouter why he uses copyCache in MPFE; makes it very difficult to extend, because
155 // copyCache is protected (so must be friend). Moved setting value to if-block below.
156 // _saveVars[ix].copyCache(&_vars[ix]);
157
158 // send message to queue (which will relay to workers)
159 RooAbsReal *rar_val = dynamic_cast<RooAbsReal *>(&vars_[ix]);
160 if (rar_val) {
161 double val = rar_val->getVal();
162 dynamic_cast<RooRealVar *>(&save_vars_[ix])->setVal(val);
163 bool isC = vars_[ix].isConstant();
164 to_update.push_back(update_state_t{ix, val, isC});
165 }
166 }
167 }
168 bool update_offsets = isOffsetting() && shared_offset_.offsets() != offsets_previous_;
169 if (!to_update.empty() || update_offsets) {
170 ++state_id_;
171 zmq::message_t message(to_update.begin(), to_update.end());
172 // always send Job id first! This is used in worker_loop to route the
173 // update_state call to the correct Job.
174 if (update_offsets) {
175 zmq::message_t offsets_message(shared_offset_.offsets().begin(), shared_offset_.offsets().end());
177 std::move(message), std::move(offsets_message));
179 } else {
181 std::move(message));
182 }
183 }
184 }
185}
186
191
193{
194 if (get_manager()->process_manager().is_master()) {
195 // evaluate the serial likelihood to set the offsets
196 if (do_offset_ && shared_offset_.offsets().empty()) {
197 likelihood_serial_.evaluate();
198 // note: we don't need to get the offsets from the serial likelihood, because they are already coupled through
199 // the shared_ptr
200 }
201
202 // update parameters that changed since last calculation (or creation if first time)
204
205 // master fills queue with tasks
206 auto N_tasks = getNEventTasks() * getNComponentTasks();
207 for (std::size_t ix = 0; ix < N_tasks; ++ix) {
208 get_manager()->queue()->add({id_, state_id_, ix});
209 }
210 n_tasks_at_workers_ = N_tasks;
211
212 // wait for task results back from workers to master
214
215 RooNaNPacker packedNaN;
216
217 // Note: initializing result_ to results_[0] instead of zero-initializing it makes
218 // a difference due to Kahan sum precision. This way, a single-worker run gives
219 // the same result as a run with serial likelihood. Adding the terms to a zero
220 // initial sum can cancel the carry in some cases, causing divergent values.
221 result_ = results_[0];
222 packedNaN.accumulate(results_[0].Sum());
223 for (auto item_it = results_.cbegin() + 1; item_it != results_.cend(); ++item_it) {
224 result_ += *item_it;
225 packedNaN.accumulate(item_it->Sum());
226 }
227 results_.clear();
228
229 if (packedNaN.getPayload() != 0) {
231 }
232
233 if (TMath::IsNaN(result_.Sum())) {
234 RooAbsReal::logEvalError(nullptr, GetName().c_str(), "function value is NAN");
235 }
236 }
237}
238
239// --- RESULT LOGISTICS ---
240
242{
243 int numErrors = RooAbsReal::numEvalErrors();
244
245 if (numErrors) {
246 // Clear error list on local side
248 }
249
250 task_result_t task_result{id_, result_.Result(), result_.Carry(), numErrors > 0};
251 zmq::message_t message(sizeof(task_result_t));
252 memcpy(message.data(), &task_result, sizeof(task_result_t));
253 get_manager()->messenger().send_from_worker_to_master(std::move(message));
254}
255
256bool LikelihoodJob::receive_task_result_on_master(const zmq::message_t &message)
257{
258 auto task_result = message.data<task_result_t>();
259 results_.emplace_back(task_result->value, task_result->carry);
260 if (task_result->has_errors) {
261 RooAbsReal::logEvalError(nullptr, "LikelihoodJob", "evaluation errors at the worker processes", "no servervalue");
262 }
264 bool job_completed = (n_tasks_at_workers_ == 0);
265 return job_completed;
266}
267
268// --- END OF RESULT LOGISTICS ---
269
270void LikelihoodJob::evaluate_task(std::size_t task)
271{
272 assert(get_manager()->process_manager().is_worker());
273
274 double section_first = 0;
275 double section_last = 1;
276 if (getNEventTasks() > 1) {
277 std::size_t event_task = task % getNEventTasks();
278 std::size_t N_events = likelihood_->numDataEntries();
279 if (event_task > 0) {
280 std::size_t first = N_events * event_task / getNEventTasks();
281 section_first = static_cast<double>(first) / N_events;
282 }
283 if (event_task < getNEventTasks() - 1) {
284 std::size_t last = N_events * (event_task + 1) / getNEventTasks();
285 section_last = static_cast<double>(last) / N_events;
286 }
287 }
288
289 switch (likelihood_type_) {
292 result_ = likelihood_->evaluatePartition({section_first, section_last}, 0, 0);
293 if (do_offset_ && section_last == 1) {
294 // we only subtract at the end of event sections, otherwise the offset is subtracted for each event split
295 result_ -= shared_offset_.offsets()[0];
296 }
297 break;
298 }
300 result_ = likelihood_->evaluatePartition({0, 1}, 0, 0);
302 result_ -= shared_offset_.offsets()[0];
303 }
304 break;
305 }
306 case LikelihoodType::sum: {
307 std::size_t components_first = 0;
308 std::size_t components_last = likelihood_->getNComponents();
309 if (getNComponentTasks() > 1) {
310 std::size_t component_task = task / getNEventTasks();
311 components_first = likelihood_->getNComponents() * component_task / getNComponentTasks();
312 if (component_task == getNComponentTasks() - 1) {
313 components_last = likelihood_->getNComponents();
314 } else {
315 components_last = likelihood_->getNComponents() * (component_task + 1) / getNComponentTasks();
316 }
317 }
318
320 RooNaNPacker packedNaN;
321 for (std::size_t comp_ix = components_first; comp_ix < components_last; ++comp_ix) {
322 auto component_result = likelihood_->evaluatePartition({section_first, section_last}, comp_ix, comp_ix + 1);
323 packedNaN.accumulate(component_result.Sum());
324 if (do_offset_ && section_last == 1 &&
325 shared_offset_.offsets()[comp_ix] != ROOT::Math::KahanSum<double>(0, 0)) {
326 // we only subtract at the end of event sections, otherwise the offset is subtracted for each event split
327 result_ += (component_result - shared_offset_.offsets()[comp_ix]);
328 } else {
329 result_ += component_result;
330 }
331 }
332 if (packedNaN.getPayload() != 0) {
334 }
335
336 break;
337 }
338 }
339}
340
342{
343 likelihood_serial_.enableOffsetting(flag);
346 printf("WARNING: when calling MinuitFcnGrad::setOffsetting after the run has already been started the "
347 "MinuitFcnGrad::likelihood_in_gradient object (a LikelihoodSerial) on the workers can no longer be "
348 "updated! This function (LikelihoodJob::enableOffsetting) can in principle be used outside of "
349 "MinuitFcnGrad, but be aware of this limitation. To do a minimization with a different offsetting "
350 "setting, please delete all RooFit::MultiProcess based objects so that the forked processes are killed "
351 "and then set up a new RooMinimizer.\n");
353 }
354}
355
356#define PROCESS_VAL(p) \
357 case (p): s = #p; break;
358
359std::ostream &operator<<(std::ostream &out, const LikelihoodJob::update_state_mode value)
360{
361 std::string s;
362 switch (value) {
365 default: s = std::to_string(static_cast<int>(value));
366 }
367 return out << s;
368}
369
370#undef PROCESS_VAL
371
372} // namespace TestStatistics
373} // namespace RooFit
#define PROCESS_VAL(p)
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h offset
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void value
Option_t Option_t TPoint TPoint const char mode
The Kahan summation is a compensated summation algorithm, which significantly reduces numerical error...
Definition Util.h:122
bool isConstant() const
Check if the "Constant" attribute is set.
Definition RooAbsArg.h:361
void setConstant(bool value=true)
Abstract base class for objects that represent a real value and implements functionality common to al...
Definition RooAbsReal.h:59
double getVal(const RooArgSet *normalisationSet=nullptr) const
Evaluate object.
Definition RooAbsReal.h:103
static Int_t numEvalErrors()
Return the number of logged evaluation errors since the last clearing.
void logEvalError(const char *message, const char *serverValueString=nullptr) const
Log evaluation error message.
static void clearEvalErrorLog()
Clear the stack of evaluation error messages.
RooArgList is a container object that can hold multiple RooAbsArg objects.
Definition RooArgList.h:22
ProcessManager & process_manager() const
std::size_t id_
Definition Job.h:45
std::size_t state_id_
Definition Job.h:46
JobManager * get_manager()
Get JobManager instance; create and activate if necessary.
Definition Job.cxx:112
void gather_worker_results()
Wait for all tasks to be retrieved for the current Job.
Definition Job.cxx:126
value_t receive_from_master_on_worker(bool *more=nullptr)
Definition Messenger.h:176
void send_from_worker_to_master(T &&item)
specialization that sends the final message
Definition Messenger.h:192
void publish_from_master_to_workers(T &&item)
specialization that sends the final message
Definition Messenger.h:150
virtual void add(JobTask job_task)=0
Enqueue a task.
void enableOffsetting(bool flag) override
bool receive_task_result_on_master(const zmq::message_t &message) override
void evaluate_task(std::size_t task) override
std::vector< ROOT::Math::KahanSum< double > > results_
void send_back_task_result_from_worker(std::size_t task) override
void update_state() override
Virtual function to update any necessary state on workers.
void evaluate() override
Triggers (possibly asynchronous) evaluation of the likelihood.
SharedOffset::OffsetVec offsets_previous_
ROOT::Math::KahanSum< double > result_
LikelihoodJob(std::shared_ptr< RooAbsL > _likelihood, std::shared_ptr< WrapperCalculationCleanFlags > calculation_is_clean, SharedOffset offset)
LikelihoodWrapper(std::shared_ptr< RooAbsL > likelihood, std::shared_ptr< WrapperCalculationCleanFlags > calculation_is_clean, SharedOffset offset)
std::shared_ptr< WrapperCalculationCleanFlags > calculation_is_clean_
Variable that can be changed from the outside.
Definition RooRealVar.h:37
void setVal(double value) override
Set value of variable to 'value'.
std::size_t State
Definition types.h:23
Namespace for new RooFit test statistic calculation.
Definition RooAbsData.h:50
std::ostream & operator<<(std::ostream &out, const LikelihoodJob::update_state_mode value)
The namespace RooFit contains mostly switches that change the behaviour of functions of PDFs (or othe...
Definition JSONIO.h:26
Bool_t IsNaN(Double_t x)
Definition TMath.h:892
static constexpr std::size_t automaticNEventTasks
Definition Config.h:34
static constexpr std::size_t automaticNComponentTasks
Definition Config.h:35
Little struct that can pack a float into the unused bits of the mantissa of a NaN double.
float getPayload() const
Retrieve packed float.
double getNaNWithPayload() const
Retrieve a NaN with the current float payload packed into the mantissa.
void accumulate(double val)
Accumulate a packed float from another NaN into this.