Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
LikelihoodJob.cxx
Go to the documentation of this file.
1/*
2 * Project: RooFit
3 * Authors:
4 * PB, Patrick Bos, Netherlands eScience Center, p.bos@esciencecenter.nl
5 *
6 * Copyright (c) 2021, CERN
7 *
8 * Redistribution and use in source and binary forms,
9 * with or without modification, are permitted according to the terms
10 * listed in LICENSE (http://roofit.sourceforge.net/license.txt)
11 */
12
13#include "LikelihoodJob.h"
14
15#include "LikelihoodSerial.h"
27#include "RooRealVar.h"
28
29namespace RooFit {
30namespace TestStatistics {
31
32LikelihoodJob::LikelihoodJob(std::shared_ptr<RooAbsL> likelihood,
33 std::shared_ptr<WrapperCalculationCleanFlags> calculation_is_clean, SharedOffset offset)
34 : LikelihoodWrapper(std::move(likelihood), std::move(calculation_is_clean), std::move(offset)),
35 n_event_tasks_(MultiProcess::Config::LikelihoodJob::defaultNEventTasks),
36 n_component_tasks_(MultiProcess::Config::LikelihoodJob::defaultNComponentTasks),
37 likelihood_serial_(likelihood_, calculation_is_clean_, shared_offset_)
38{
39 init_vars();
41}
42
43// This is a separate function (instead of just in ctor) for historical reasons.
44// Its predecessor RooRealMPFE::initVars() was used from multiple ctors, but also
45// from RooRealMPFE::constOptimizeTestStatistic at the end, which makes sense,
46// because it might change the set of variables. We may at some point want to do
47// this here as well.
49{
50 // Empty current lists
53
54 // Retrieve non-constant parameters
55 std::unique_ptr<RooArgSet> vars{likelihood_->getParameters()};
56 // TODO: make sure this is the right list of parameters, compare to original
57 // implementation in RooRealMPFE.cxx
58
59 RooArgList varList(*vars);
60
61 // Save in lists
62 vars_.add(varList);
63 save_vars_.addClone(varList);
64}
65
67{
68 if (get_manager()->process_manager().is_worker()) {
69 bool more;
70
72 assert(more);
73
74 switch (mode) {
77 assert(more);
78 auto message = get_manager()->messenger().receive_from_master_on_worker<zmq::message_t>(&more);
79 auto message_begin = message.data<update_state_t>();
80 auto message_end = message_begin + message.size() / sizeof(update_state_t);
81 std::vector<update_state_t> to_update(message_begin, message_end);
82 for (auto const &item : to_update) {
83 RooRealVar *rvar = static_cast<RooRealVar *>(vars_.at(item.var_index));
84 rvar->setVal(static_cast<double>(item.value));
85 if (rvar->isConstant() != item.is_constant) {
86 rvar->setConstant(static_cast<bool>(item.is_constant));
87 }
88 }
89
90 if (more) {
91 // offsets also incoming
92 auto offsets_message = get_manager()->messenger().receive_from_master_on_worker<zmq::message_t>(&more);
93 assert(!more);
94 auto offsets_message_begin = offsets_message.data<ROOT::Math::KahanSum<double>>();
95 std::size_t N_offsets = offsets_message.size() / sizeof(ROOT::Math::KahanSum<double>);
96 shared_offset_.offsets().reserve(N_offsets);
97 auto offsets_message_end = offsets_message_begin + N_offsets;
98 std::copy(offsets_message_begin, offsets_message_end, shared_offset_.offsets().begin());
99 }
100
101 break;
102 }
104 LikelihoodWrapper::enableOffsetting(get_manager()->messenger().receive_from_master_on_worker<bool>(&more));
105 assert(!more);
106 break;
107 }
108 }
109 }
110}
111
112/// \warning In automatic mode, this function can start MultiProcess (forks, starts workers, etc)!
114{
115 std::size_t val = n_event_tasks_;
118 }
119 if (val > likelihood_->getNEvents()) {
120 val = likelihood_->getNEvents();
121 }
122 return val;
123}
124
126{
127 std::size_t val = n_component_tasks_;
129 val = 1;
130 }
131 if (val > likelihood_->getNComponents()) {
132 val = likelihood_->getNComponents();
133 }
134 return val;
135}
136
138{
139 if (get_manager()->process_manager().is_master()) {
140 bool valChanged = false;
141 bool constChanged = false;
142 std::vector<update_state_t> to_update;
143 for (std::size_t ix = 0u; ix < static_cast<std::size_t>(vars_.size()); ++ix) {
144 valChanged = !vars_[ix].isIdentical(save_vars_[ix], true);
145 constChanged = (vars_[ix].isConstant() != save_vars_[ix].isConstant());
146
147 if (valChanged || constChanged) {
148 if (constChanged) {
149 (static_cast<RooRealVar *>(&save_vars_[ix]))->setConstant(vars_[ix].isConstant());
150 }
151 // TODO: Check with Wouter why he uses copyCache in MPFE; makes it very difficult to extend, because
152 // copyCache is protected (so must be friend). Moved setting value to if-block below.
153 // _saveVars[ix].copyCache(&_vars[ix]);
154
155 // send message to queue (which will relay to workers)
156 RooAbsReal *rar_val = dynamic_cast<RooAbsReal *>(&vars_[ix]);
157 if (rar_val) {
158 double val = rar_val->getVal();
159 dynamic_cast<RooRealVar *>(&save_vars_[ix])->setVal(val);
160 bool isC = vars_[ix].isConstant();
161 to_update.push_back(update_state_t{ix, val, isC});
162 }
163 }
164 }
165 bool update_offsets = isOffsetting() && shared_offset_.offsets() != offsets_previous_;
166 if (!to_update.empty() || update_offsets) {
167 ++state_id_;
168 zmq::message_t message(to_update.begin(), to_update.end());
169 // always send Job id first! This is used in worker_loop to route the
170 // update_state call to the correct Job.
171 if (update_offsets) {
172 zmq::message_t offsets_message(shared_offset_.offsets().begin(), shared_offset_.offsets().end());
174 std::move(message), std::move(offsets_message));
176 } else {
178 std::move(message));
179 }
180 }
181 }
182}
183
185{
187}
188
190{
191 if (get_manager()->process_manager().is_master()) {
192 // evaluate the serial likelihood to set the offsets
193 if (do_offset_ && shared_offset_.offsets().empty()) {
195 // note: we don't need to get the offsets from the serial likelihood, because they are already coupled through
196 // the shared_ptr
197 }
198
199 // update parameters that changed since last calculation (or creation if first time)
201
202 // master fills queue with tasks
203 auto N_tasks = getNEventTasks() * getNComponentTasks();
204 for (std::size_t ix = 0; ix < N_tasks; ++ix) {
205 get_manager()->queue()->add({id_, state_id_, ix});
206 }
207 n_tasks_at_workers_ = N_tasks;
208
209 // wait for task results back from workers to master
211
212 // Note: initializing result_ to results_[0] instead of zero-initializing it makes
213 // a difference due to Kahan sum precision. This way, a single-worker run gives
214 // the same result as a run with serial likelihood. Adding the terms to a zero
215 // initial sum can cancel the carry in some cases, causing divergent values.
216 result_ = results_[0];
217 for (auto item_it = results_.cbegin() + 1; item_it != results_.cend(); ++item_it) {
218 result_ += *item_it;
219 }
220 results_.clear();
221 }
222}
223
224// --- RESULT LOGISTICS ---
225
227{
228 task_result_t task_result{id_, result_.Result(), result_.Carry()};
229 zmq::message_t message(sizeof(task_result_t));
230 memcpy(message.data(), &task_result, sizeof(task_result_t));
231 get_manager()->messenger().send_from_worker_to_master(std::move(message));
232}
233
234bool LikelihoodJob::receive_task_result_on_master(const zmq::message_t &message)
235{
236 auto task_result = message.data<task_result_t>();
237 results_.emplace_back(task_result->value, task_result->carry);
239 bool job_completed = (n_tasks_at_workers_ == 0);
240 return job_completed;
241}
242
243// --- END OF RESULT LOGISTICS ---
244
245void LikelihoodJob::evaluate_task(std::size_t task)
246{
247 assert(get_manager()->process_manager().is_worker());
248
249 double section_first = 0;
250 double section_last = 1;
251 if (getNEventTasks() > 1) {
252 std::size_t event_task = task % getNEventTasks();
253 std::size_t N_events = likelihood_->numDataEntries();
254 if (event_task > 0) {
255 std::size_t first = N_events * event_task / getNEventTasks();
256 section_first = static_cast<double>(first) / N_events;
257 }
258 if (event_task < getNEventTasks() - 1) {
259 std::size_t last = N_events * (event_task + 1) / getNEventTasks();
260 section_last = static_cast<double>(last) / N_events;
261 }
262 }
263
264 switch (likelihood_type_) {
267 result_ = likelihood_->evaluatePartition({section_first, section_last}, 0, 0);
268 if (do_offset_ && section_last == 1) {
269 // we only subtract at the end of event sections, otherwise the offset is subtracted for each event split
271 }
272 break;
273 }
275 result_ = likelihood_->evaluatePartition({0, 1}, 0, 0);
278 }
279 break;
280 }
281 case LikelihoodType::sum: {
282 std::size_t components_first = 0;
283 std::size_t components_last = likelihood_->getNComponents();
284 if (getNComponentTasks() > 1) {
285 std::size_t component_task = task / getNEventTasks();
286 components_first = likelihood_->getNComponents() * component_task / getNComponentTasks();
287 if (component_task == getNComponentTasks() - 1) {
288 components_last = likelihood_->getNComponents();
289 } else {
290 components_last = likelihood_->getNComponents() * (component_task + 1) / getNComponentTasks();
291 }
292 }
293
295 for (std::size_t comp_ix = components_first; comp_ix < components_last; ++comp_ix) {
296 auto component_result = likelihood_->evaluatePartition({section_first, section_last}, comp_ix, comp_ix + 1);
297 if (do_offset_ && section_last == 1 &&
299 // we only subtract at the end of event sections, otherwise the offset is subtracted for each event split
300 result_ += (component_result - shared_offset_.offsets()[comp_ix]);
301 } else {
302 result_ += component_result;
303 }
304 }
305
306 break;
307 }
308 }
309}
310
312{
316 printf("WARNING: when calling MinuitFcnGrad::setOffsetting after the run has already been started the "
317 "MinuitFcnGrad::likelihood_in_gradient object (a LikelihoodSerial) on the workers can no longer be "
318 "updated! This function (LikelihoodJob::enableOffsetting) can in principle be used outside of "
319 "MinuitFcnGrad, but be aware of this limitation. To do a minimization with a different offsetting "
320 "setting, please delete all RooFit::MultiProcess based objects so that the forked processes are killed "
321 "and then set up a new RooMinimizer.\n");
323 }
324}
325
326#define PROCESS_VAL(p) \
327 case (p): s = #p; break;
328
329std::ostream &operator<<(std::ostream &out, const LikelihoodJob::update_state_mode value)
330{
331 std::string s;
332 switch (value) {
335 default: s = std::to_string(static_cast<int>(value));
336 }
337 return out << s;
338}
339
340#undef PROCESS_VAL
341
342} // namespace TestStatistics
343} // namespace RooFit
#define PROCESS_VAL(p)
TBuffer & operator<<(TBuffer &buf, const Tmpl *obj)
Definition TBuffer.h:397
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h offset
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void value
Option_t Option_t TPoint TPoint const char mode
The Kahan summation is a compensated summation algorithm, which significantly reduces numerical error...
Definition Util.h:122
T Result() const
Definition Util.h:245
T Carry() const
Definition Util.h:250
bool isConstant() const
Check if the "Constant" attribute is set.
Definition RooAbsArg.h:361
virtual void removeAll()
Remove all arguments from our set, deleting them if we own them.
virtual bool add(const RooAbsArg &var, bool silent=false)
Add the specified argument to list.
Storage_t::size_type size() const
virtual RooAbsArg * addClone(const RooAbsArg &var, bool silent=false)
Add a clone of the specified argument to list.
void setConstant(bool value=true)
Abstract base class for objects that represent a real value and implements functionality common to al...
Definition RooAbsReal.h:59
double getVal(const RooArgSet *normalisationSet=nullptr) const
Evaluate object.
Definition RooAbsReal.h:103
RooArgList is a container object that can hold multiple RooAbsArg objects.
Definition RooArgList.h:22
RooAbsArg * at(Int_t idx) const
Return object at given index, or nullptr if index is out of range.
Definition RooArgList.h:110
ProcessManager & process_manager() const
std::size_t id_
Definition Job.h:45
std::size_t state_id_
Definition Job.h:46
JobManager * get_manager()
Get JobManager instance; create and activate if necessary.
Definition Job.cxx:112
void gather_worker_results()
Wait for all tasks to be retrieved for the current Job.
Definition Job.cxx:126
value_t receive_from_master_on_worker(bool *more=nullptr)
Definition Messenger.h:176
void publish_from_master_to_workers(T &&item)
specialization that sends the final message
Definition Messenger.h:150
virtual void add(JobTask job_task)=0
Enqueue a task.
void enableOffsetting(bool flag) override
bool receive_task_result_on_master(const zmq::message_t &message) override
void evaluate_task(std::size_t task) override
std::vector< ROOT::Math::KahanSum< double > > results_
void send_back_task_result_from_worker(std::size_t task) override
void update_state() override
Virtual function to update any necessary state on workers.
void evaluate() override
Triggers (possibly asynchronous) evaluation of the likelihood.
SharedOffset::OffsetVec offsets_previous_
ROOT::Math::KahanSum< double > result_
LikelihoodJob(std::shared_ptr< RooAbsL > _likelihood, std::shared_ptr< WrapperCalculationCleanFlags > calculation_is_clean, SharedOffset offset)
void evaluate() override
Triggers (possibly asynchronous) evaluation of the likelihood.
Virtual base class for implementation of likelihood calculation strategies.
Variable that can be changed from the outside.
Definition RooRealVar.h:37
void setVal(double value) override
Set value of variable to 'value'.
OffsetVec & offsets()
std::size_t State
Definition types.h:23
The namespace RooFit contains mostly switches that change the behaviour of functions of PDFs (or othe...
Definition JSONIO.h:26
static constexpr std::size_t automaticNEventTasks
Definition Config.h:34
static constexpr std::size_t automaticNComponentTasks
Definition Config.h:35