Tomographer  v2.0
Tomographer C++ Framework Documentation
multiproc.h
Go to the documentation of this file.
1 /* This file is part of the Tomographer project, which is distributed under the
2  * terms of the MIT license.
3  *
4  * The MIT License (MIT)
5  *
6  * Copyright (c) 2015 ETH Zurich, Institute for Theoretical Physics, Philippe Faist
7  *
8  * Permission is hereby granted, free of charge, to any person obtaining a copy
9  * of this software and associated documentation files (the "Software"), to deal
10  * in the Software without restriction, including without limitation the rights
11  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
12  * copies of the Software, and to permit persons to whom the Software is
13  * furnished to do so, subject to the following conditions:
14  *
15  * The above copyright notice and this permission notice shall be included in
16  * all copies or substantial portions of the Software.
17  *
18  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
19  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
20  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
21  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
22  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
23  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24  * SOFTWARE.
25  */
26 
27 #ifndef MULTIPROC_H
28 #define MULTIPROC_H
29 
30 #include <csignal>
31 
32 #include <string>
33 
35 
36 
37 
46 namespace Tomographer {
47 
48 namespace MultiProc {
49 
50 
51 
63 {
65  : fraction_done(0), msg("<unknown>")
66  { }
67  TaskStatusReport(double fraction_done_, std::string msg_)
68  : fraction_done(fraction_done_), msg(std::move(msg_))
69  { }
70 
71  double fraction_done;
72  std::string msg;
73 };
74 
75 
80 template<typename TaskStatusReportType>
82 {
84  : num_completed(0),
85  num_total_runs(0),
86  workers_running(),
87  workers_reports()
88  {
89  }
90 
93 
96 
106 
116  std::vector<TaskStatusReportType,
118 };
119 
120 
121 
122 
123 namespace Sequential {
124 
145 template<typename TaskType_, typename TaskCData_, typename ResultsCollector_,
146  typename LoggerType_, typename CountIntType_ = int>
148 {
149 public:
150  typedef TaskType_ TaskType;
151  typedef typename TaskType::StatusReportType TaskStatusReportType;
152  typedef TaskCData_ TaskCData;
153  typedef ResultsCollector_ ResultsCollector;
154  typedef LoggerType_ LoggerType;
155  typedef CountIntType_ CountIntType;
156 
157  // not directly needed, but make sure TaskType::ResultType exists as part of testing the
158  // task, cdata and result-collectors's correct type interface implementation
159  typedef typename TaskType::ResultType TaskResultType;
160 
162 
164 
165 private:
166 
167  const TaskCData * pcdata;
168  ResultsCollector * results;
169  LoggerType & logger;
170 
171  CountIntType num_total_runs;
172 
176  CountIntType task_k;
177 
178  struct TaskMgrIface {
179  TaskMgrIface(TaskDispatcher * dispatcher_)
180  : dispatcher(dispatcher_),
181  status_report_requested(false),
182  status_report_user_fn()
183  {
184  }
185 
186  TaskDispatcher * dispatcher;
187 
188  volatile std::sig_atomic_t status_report_requested;
189  FullStatusReportCallbackType status_report_user_fn;
190 
191  inline void _request_status_report() { status_report_requested = 1; }
192 
193  inline bool statusReportRequested() const
194  {
195  return status_report_requested;
196  }
197 
198  inline void submitStatusReport(const TaskStatusReportType &statreport)
199  {
201 
202  fullstatus.num_completed = dispatcher->task_k;
203  fullstatus.num_total_runs = dispatcher->num_total_runs;
204 
205  // initialize task-specific reports
206  // fill our lists with default-constructed values & set all running to false.
207  fullstatus.workers_running.clear();
208  fullstatus.workers_reports.clear();
209 
210  fullstatus.workers_running.resize(1, false);
211  fullstatus.workers_running[0] = true;
212 
213  fullstatus.workers_reports.resize(1, false);
214  fullstatus.workers_reports[0] = statreport;
215 
216  status_report_user_fn(fullstatus);
217 
218  status_report_requested = false;
219  }
220 
221  };
222 
223  TaskMgrIface mgriface;
224 
225 public:
226  TaskDispatcher(TaskCData * pcdata_, ResultsCollector * results_, LoggerType & logger_,
227  CountIntType num_total_runs_)
228  : pcdata(pcdata_), results(results_), logger(logger_), num_total_runs(num_total_runs_),
229  mgriface(this)
230  {
231  }
232 
233  void run()
234  {
235  results->init(num_total_runs, CountIntType(1), pcdata);
236 
237  logger.debug("MultiProc::Sequential::TaskDispatcher::run()", "preparing for sequential runs");
238 
239  for (task_k = 0; task_k < num_total_runs; ++task_k) {
240 
241  logger.debug("Tomographer::MultiProc::Sequential::TaskDispatcher::run()",
242  [&](std::ostream & stream) { stream << "Running task #" << task_k << " ..."; });
243 
244  auto input = pcdata->getTaskInput(task_k);
245 
246  // construct a new task instance
247  TaskType t(input, pcdata, logger);
248 
249  // and run it
250  t.run(pcdata, logger, &mgriface);
251 
252  // and collect the result
253  results->collectResult(task_k, t.getResult(), pcdata);
254  }
255 
256  results->runsFinished(num_total_runs, pcdata);
257  }
258 
259 
270  template<typename Fn>
271  inline void setStatusReportHandler(Fn fnstatus)
272  {
273  mgriface.status_report_user_fn = fnstatus;
274  }
275 
287  inline void requestStatusReport()
288  {
289  mgriface._request_status_report();
290  }
291 
292 }; // class TaskDispatcher
293 
294 
295 } // namespace Sequential
296 
297 } // namespace MultiProc
298 
299 } // namespace Tomographer
300 
301 #endif
void setStatusReportHandler(Fn fnstatus)
assign a callable to be called whenever a status report is requested
Definition: multiproc.h:271
Base namespace for the Tomographer project.
Definition: densellh.h:44
void requestStatusReport()
Request a status report.
Definition: multiproc.h:287
Provide appropriate operator new() definitions for a structure which has a member of the given stored...
std::vector< TaskStatusReportType, typename Tools::NeedOwnOperatorNew< TaskStatusReportType >::AllocatorType > workers_reports
List with the raw report submitted from each individual thread.
Definition: multiproc.h:117
T resize(T...args)
STL class.
int num_total_runs
Total number of tasks to perform.
Definition: multiproc.h:95
std::vector< bool > workers_running
List specifying for each worker (e.g. a spawned thread) whether it is active or not.
Definition: multiproc.h:105
T clear(T...args)
Basic status report class.
Definition: multiproc.h:62
T move(T...args)
Managing the need for specific overrides to operator new() for some types (especially Eigen types) ...
Executes multiple tasks sequentially.
Definition: multiproc.h:147
STL class.
int num_completed
Number of completed tasks.
Definition: multiproc.h:92
A complete status report, abstract version.
Definition: multiproc.h:81