Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
LikelihoodJob.cxx
Go to the documentation of this file.
1/*
2 * Project: RooFit
3 * Authors:
4 * PB, Patrick Bos, Netherlands eScience Center, p.bos@esciencecenter.nl
5 *
6 * Copyright (c) 2021, CERN
7 *
8 * Redistribution and use in source and binary forms,
9 * with or without modification, are permitted according to the terms
10 * listed in LICENSE (http://roofit.sourceforge.net/license.txt)
11 */
12
13#include "LikelihoodJob.h"
14
15#include "LikelihoodSerial.h"
27#include "RooRealVar.h"
28#include "RooNaNPacker.h"
29
30#include "TMath.h" // IsNaN
31
32namespace RooFit {
33namespace TestStatistics {
34
35LikelihoodJob::LikelihoodJob(std::shared_ptr<RooAbsL> likelihood,
36 std::shared_ptr<WrapperCalculationCleanFlags> calculation_is_clean, SharedOffset offset)
37 : LikelihoodWrapper(std::move(likelihood), std::move(calculation_is_clean), std::move(offset)),
38 n_event_tasks_(MultiProcess::Config::LikelihoodJob::defaultNEventTasks),
39 n_component_tasks_(MultiProcess::Config::LikelihoodJob::defaultNComponentTasks),
40 likelihood_serial_(likelihood_, calculation_is_clean_, shared_offset_)
41{
42 init_vars();
44}
45
46// This is a separate function (instead of just in ctor) for historical reasons.
47// Its predecessor RooRealMPFE::initVars() was used from multiple ctors, but also
48// from RooRealMPFE::constOptimizeTestStatistic at the end, which makes sense,
49// because it might change the set of variables. We may at some point want to do
50// this here as well.
52{
53 // Empty current lists
56
57 // Retrieve non-constant parameters
58 std::unique_ptr<RooArgSet> vars{likelihood_->getParameters()};
59 // TODO: make sure this is the right list of parameters, compare to original
60 // implementation in RooRealMPFE.cxx
61
62 RooArgList varList(*vars);
63
64 // Save in lists
65 vars_.add(varList);
66 save_vars_.addClone(varList);
67}
68
70{
71 if (get_manager()->process_manager().is_worker()) {
72 bool more;
73
75 assert(more);
76
77 switch (mode) {
80 assert(more);
81 auto message = get_manager()->messenger().receive_from_master_on_worker<zmq::message_t>(&more);
82 auto message_begin = message.data<update_state_t>();
83 auto message_end = message_begin + message.size() / sizeof(update_state_t);
84 std::vector<update_state_t> to_update(message_begin, message_end);
85 for (auto const &item : to_update) {
86 RooRealVar *rvar = static_cast<RooRealVar *>(vars_.at(item.var_index));
87 rvar->setVal(static_cast<double>(item.value));
88 if (rvar->isConstant() != item.is_constant) {
89 rvar->setConstant(static_cast<bool>(item.is_constant));
90 }
91 }
92
93 if (more) {
94 // offsets also incoming
95 auto offsets_message = get_manager()->messenger().receive_from_master_on_worker<zmq::message_t>(&more);
96 assert(!more);
97 auto offsets_message_begin = offsets_message.data<ROOT::Math::KahanSum<double>>();
98 std::size_t N_offsets = offsets_message.size() / sizeof(ROOT::Math::KahanSum<double>);
99 shared_offset_.offsets().resize(N_offsets);
100 auto offsets_message_end = offsets_message_begin + N_offsets;
101 std::copy(offsets_message_begin, offsets_message_end, shared_offset_.offsets().begin());
102 }
103
104 break;
105 }
107 LikelihoodWrapper::enableOffsetting(get_manager()->messenger().receive_from_master_on_worker<bool>(&more));
108 assert(!more);
109 break;
110 }
111 }
112 }
113}
114
115/// \warning In automatic mode, this function can start MultiProcess (forks, starts workers, etc)!
117{
118 std::size_t val = n_event_tasks_;
121 }
122 if (val > likelihood_->getNEvents()) {
123 val = likelihood_->getNEvents();
124 }
125 return val;
126}
127
129{
130 std::size_t val = n_component_tasks_;
132 val = 1;
133 }
134 if (val > likelihood_->getNComponents()) {
135 val = likelihood_->getNComponents();
136 }
137 return val;
138}
139
141{
142 if (get_manager()->process_manager().is_master()) {
143 bool valChanged = false;
144 bool constChanged = false;
145 std::vector<update_state_t> to_update;
146 for (std::size_t ix = 0u; ix < static_cast<std::size_t>(vars_.size()); ++ix) {
147 valChanged = !vars_[ix].isIdentical(save_vars_[ix], true);
148 constChanged = (vars_[ix].isConstant() != save_vars_[ix].isConstant());
149
150 if (valChanged || constChanged) {
151 if (constChanged) {
152 (static_cast<RooRealVar *>(&save_vars_[ix]))->setConstant(vars_[ix].isConstant());
153 }
154 // TODO: Check with Wouter why he uses copyCache in MPFE; makes it very difficult to extend, because
155 // copyCache is protected (so must be friend). Moved setting value to if-block below.
156 // _saveVars[ix].copyCache(&_vars[ix]);
157
158 // send message to queue (which will relay to workers)
159 RooAbsReal *rar_val = dynamic_cast<RooAbsReal *>(&vars_[ix]);
160 if (rar_val) {
161 double val = rar_val->getVal();
162 dynamic_cast<RooRealVar *>(&save_vars_[ix])->setVal(val);
163 bool isC = vars_[ix].isConstant();
164 to_update.push_back(update_state_t{ix, val, isC});
165 }
166 }
167 }
168 bool update_offsets = isOffsetting() && shared_offset_.offsets() != offsets_previous_;
169 if (!to_update.empty() || update_offsets) {
170 ++state_id_;
171 zmq::message_t message(to_update.begin(), to_update.end());
172 // always send Job id first! This is used in worker_loop to route the
173 // update_state call to the correct Job.
174 if (update_offsets) {
175 zmq::message_t offsets_message(shared_offset_.offsets().begin(), shared_offset_.offsets().end());
177 std::move(message), std::move(offsets_message));
179 } else {
181 std::move(message));
182 }
183 }
184 }
185}
186
188{
190}
191
193{
194 if (get_manager()->process_manager().is_master()) {
195 // evaluate the serial likelihood to set the offsets
196 if (do_offset_ && shared_offset_.offsets().empty()) {
198 // note: we don't need to get the offsets from the serial likelihood, because they are already coupled through
199 // the shared_ptr
200 }
201
202 // update parameters that changed since last calculation (or creation if first time)
204
205 // master fills queue with tasks
206 auto N_tasks = getNEventTasks() * getNComponentTasks();
207 for (std::size_t ix = 0; ix < N_tasks; ++ix) {
208 get_manager()->queue()->add({id_, state_id_, ix});
209 }
210 n_tasks_at_workers_ = N_tasks;
211
212 // wait for task results back from workers to master
214
215 RooNaNPacker packedNaN;
216
217 // Note: initializing result_ to results_[0] instead of zero-initializing it makes
218 // a difference due to Kahan sum precision. This way, a single-worker run gives
219 // the same result as a run with serial likelihood. Adding the terms to a zero
220 // initial sum can cancel the carry in some cases, causing divergent values.
221 result_ = results_[0];
222 packedNaN.accumulate(results_[0].Sum());
223 for (auto item_it = results_.cbegin() + 1; item_it != results_.cend(); ++item_it) {
224 result_ += *item_it;
225 packedNaN.accumulate(item_it->Sum());
226 }
227 results_.clear();
228
229 if (packedNaN.getPayload() != 0) {
231 }
232
233 if (TMath::IsNaN(result_.Sum())) {
234 RooAbsReal::logEvalError(nullptr, GetName().c_str(), "function value is NAN");
235 }
236 }
237}
238
239// --- RESULT LOGISTICS ---
240
242{
243 int numErrors = RooAbsReal::numEvalErrors();
244
245 if (numErrors) {
246 // Clear error list on local side
248 }
249
250 task_result_t task_result{id_, result_.Result(), result_.Carry(), numErrors > 0};
251 zmq::message_t message(sizeof(task_result_t));
252 memcpy(message.data(), &task_result, sizeof(task_result_t));
253 get_manager()->messenger().send_from_worker_to_master(std::move(message));
254}
255
256bool LikelihoodJob::receive_task_result_on_master(const zmq::message_t &message)
257{
258 auto task_result = message.data<task_result_t>();
259 results_.emplace_back(task_result->value, task_result->carry);
260 if (task_result->has_errors) {
261 RooAbsReal::logEvalError(nullptr, "LikelihoodJob", "evaluation errors at the worker processes", "no servervalue");
262 }
264 bool job_completed = (n_tasks_at_workers_ == 0);
265 return job_completed;
266}
267
268// --- END OF RESULT LOGISTICS ---
269
270void LikelihoodJob::evaluate_task(std::size_t task)
271{
272 assert(get_manager()->process_manager().is_worker());
273
274 double section_first = 0;
275 double section_last = 1;
276 if (getNEventTasks() > 1) {
277 std::size_t event_task = task % getNEventTasks();
278 std::size_t N_events = likelihood_->numDataEntries();
279 if (event_task > 0) {
280 std::size_t first = N_events * event_task / getNEventTasks();
281 section_first = static_cast<double>(first) / N_events;
282 }
283 if (event_task < getNEventTasks() - 1) {
284 std::size_t last = N_events * (event_task + 1) / getNEventTasks();
285 section_last = static_cast<double>(last) / N_events;
286 }
287 }
288
289 switch (likelihood_type_) {
292 result_ = likelihood_->evaluatePartition({section_first, section_last}, 0, 0);
293 if (do_offset_ && section_last == 1) {
294 // we only subtract at the end of event sections, otherwise the offset is subtracted for each event split
296 }
297 break;
298 }
300 result_ = likelihood_->evaluatePartition({0, 1}, 0, 0);
303 }
304 break;
305 }
306 case LikelihoodType::sum: {
307 std::size_t components_first = 0;
308 std::size_t components_last = likelihood_->getNComponents();
309 if (getNComponentTasks() > 1) {
310 std::size_t component_task = task / getNEventTasks();
311 components_first = likelihood_->getNComponents() * component_task / getNComponentTasks();
312 if (component_task == getNComponentTasks() - 1) {
313 components_last = likelihood_->getNComponents();
314 } else {
315 components_last = likelihood_->getNComponents() * (component_task + 1) / getNComponentTasks();
316 }
317 }
318
320 RooNaNPacker packedNaN;
321 for (std::size_t comp_ix = components_first; comp_ix < components_last; ++comp_ix) {
322 auto component_result = likelihood_->evaluatePartition({section_first, section_last}, comp_ix, comp_ix + 1);
323 packedNaN.accumulate(component_result.Sum());
324 if (do_offset_ && section_last == 1 &&
326 // we only subtract at the end of event sections, otherwise the offset is subtracted for each event split
327 result_ += (component_result - shared_offset_.offsets()[comp_ix]);
328 } else {
329 result_ += component_result;
330 }
331 }
332 if (packedNaN.getPayload() != 0) {
334 }
335
336 break;
337 }
338 }
339}
340
342{
346 printf("WARNING: when calling MinuitFcnGrad::setOffsetting after the run has already been started the "
347 "MinuitFcnGrad::likelihood_in_gradient object (a LikelihoodSerial) on the workers can no longer be "
348 "updated! This function (LikelihoodJob::enableOffsetting) can in principle be used outside of "
349 "MinuitFcnGrad, but be aware of this limitation. To do a minimization with a different offsetting "
350 "setting, please delete all RooFit::MultiProcess based objects so that the forked processes are killed "
351 "and then set up a new RooMinimizer.\n");
353 }
354}
355
356#define PROCESS_VAL(p) \
357 case (p): s = #p; break;
358
359std::ostream &operator<<(std::ostream &out, const LikelihoodJob::update_state_mode value)
360{
361 std::string s;
362 switch (value) {
365 default: s = std::to_string(static_cast<int>(value));
366 }
367 return out << s;
368}
369
370#undef PROCESS_VAL
371
372} // namespace TestStatistics
373} // namespace RooFit
#define PROCESS_VAL(p)
TBuffer & operator<<(TBuffer &buf, const Tmpl *obj)
Definition TBuffer.h:397
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h offset
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void value
Option_t Option_t TPoint TPoint const char mode
The Kahan summation is a compensated summation algorithm, which significantly reduces numerical error...
Definition Util.h:122
T Sum() const
Definition Util.h:240
T Result() const
Definition Util.h:245
T Carry() const
Definition Util.h:250
bool isConstant() const
Check if the "Constant" attribute is set.
Definition RooAbsArg.h:335
virtual void removeAll()
Remove all arguments from our set, deleting them if we own them.
virtual bool add(const RooAbsArg &var, bool silent=false)
Add the specified argument to list.
Storage_t::size_type size() const
virtual RooAbsArg * addClone(const RooAbsArg &var, bool silent=false)
Add a clone of the specified argument to list.
void setConstant(bool value=true)
Abstract base class for objects that represent a real value and implements functionality common to al...
Definition RooAbsReal.h:59
double getVal(const RooArgSet *normalisationSet=nullptr) const
Evaluate object.
Definition RooAbsReal.h:103
static Int_t numEvalErrors()
Return the number of logged evaluation errors since the last clearing.
void logEvalError(const char *message, const char *serverValueString=nullptr) const
Log evaluation error message.
static void clearEvalErrorLog()
Clear the stack of evaluation error messages.
RooArgList is a container object that can hold multiple RooAbsArg objects.
Definition RooArgList.h:22
RooAbsArg * at(Int_t idx) const
Return object at given index, or nullptr if index is out of range.
Definition RooArgList.h:110
ProcessManager & process_manager() const
std::size_t id_
Definition Job.h:45
std::size_t state_id_
Definition Job.h:46
JobManager * get_manager()
Get JobManager instance; create and activate if necessary.
Definition Job.cxx:112
void gather_worker_results()
Wait for all tasks to be retrieved for the current Job.
Definition Job.cxx:126
value_t receive_from_master_on_worker(bool *more=nullptr)
Definition Messenger.h:176
void send_from_worker_to_master(T &&item)
specialization that sends the final message
Definition Messenger.h:192
void publish_from_master_to_workers(T &&item)
specialization that sends the final message
Definition Messenger.h:150
virtual void add(JobTask job_task)=0
Enqueue a task.
void enableOffsetting(bool flag) override
bool receive_task_result_on_master(const zmq::message_t &message) override
void evaluate_task(std::size_t task) override
std::vector< ROOT::Math::KahanSum< double > > results_
void send_back_task_result_from_worker(std::size_t task) override
void update_state() override
Virtual function to update any necessary state on workers.
void evaluate() override
Triggers (possibly asynchronous) evaluation of the likelihood.
SharedOffset::OffsetVec offsets_previous_
ROOT::Math::KahanSum< double > result_
LikelihoodJob(std::shared_ptr< RooAbsL > _likelihood, std::shared_ptr< WrapperCalculationCleanFlags > calculation_is_clean, SharedOffset offset)
void evaluate() override
Triggers (possibly asynchronous) evaluation of the likelihood.
Virtual base class for implementation of likelihood calculation strategies.
Variable that can be changed from the outside.
Definition RooRealVar.h:37
void setVal(double value) override
Set value of variable to 'value'.
OffsetVec & offsets()
std::size_t State
Definition types.h:23
The namespace RooFit contains mostly switches that change the behaviour of functions of PDFs (or othe...
Definition JSONIO.h:26
Bool_t IsNaN(Double_t x)
Definition TMath.h:892
static constexpr std::size_t automaticNEventTasks
Definition Config.h:34
static constexpr std::size_t automaticNComponentTasks
Definition Config.h:35
Little struct that can pack a float into the unused bits of the mantissa of a NaN double.
float getPayload() const
Retrieve packed float.
double getNaNWithPayload() const
Retrieve a NaN with the current float payload packed into the mantissa.
void accumulate(double val)
Accumulate a packed float from another NaN into this.