Loading [MathJax]/extensions/tex2jax.js
Tomographerv4.1
Tomographer C++ Framework Documentation
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
multiproc.h
Go to the documentation of this file.
1 /* This file is part of the Tomographer project, which is distributed under the
2  * terms of the MIT license.
3  *
4  * The MIT License (MIT)
5  *
6  * Copyright (c) 2016 ETH Zurich, Institute for Theoretical Physics, Philippe Faist
7  * Copyright (c) 2017 Caltech, Institute for Quantum Information and Matter, Philippe Faist
8  *
9  * Permission is hereby granted, free of charge, to any person obtaining a copy
10  * of this software and associated documentation files (the "Software"), to deal
11  * in the Software without restriction, including without limitation the rights
12  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
13  * copies of the Software, and to permit persons to whom the Software is
14  * furnished to do so, subject to the following conditions:
15  *
16  * The above copyright notice and this permission notice shall be included in
17  * all copies or substantial portions of the Software.
18  *
19  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
24  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
25  * SOFTWARE.
26  */
27 
28 #ifndef MULTIPROC_H
29 #define MULTIPROC_H
30 
31 #include <csignal>
32 
33 #include <string>
34 #include <chrono>
35 #include <exception>
36 
37 #include <tomographer/tools/fmt.h>
39 
40 
41 
50 namespace Tomographer {
51 
52 namespace MultiProc {
53 
54 
55 
66 TOMOGRAPHER_EXPORT struct TaskStatusReport
67 {
69  : fraction_done(0), msg("<unknown>")
70  { }
71  TaskStatusReport(double fraction_done_, std::string msg_)
72  : fraction_done(fraction_done_), msg(std::move(msg_))
73  { }
74 
75  double fraction_done;
76  std::string msg;
77 };
78 
79 
84 template<typename TaskStatusReportType>
85 TOMOGRAPHER_EXPORT struct FullStatusReport
86 {
88  : num_completed(0),
89  num_total_runs(0),
90  workers_running(),
91  workers_reports(),
93  {
94  }
95 
98 
101 
111 
121  std::vector<TaskStatusReportType,
123 
127  double elapsed;
128 
129 
137  inline double totalFractionDone() const
138  {
139  // idea: cumulate in f one unit per task completed.
140  double f = num_completed;
141  for (std::size_t k = 0; k < workers_running.size(); ++k) {
142  if (workers_running[k]) {
143  // partially completed tasks contribute a fraction
144  f += workers_reports[k].fraction_done;
145  }
146  }
147  // and f goes from zero to num_total_runs
148  return f / num_total_runs;
149  }
150 
155  {
158  ss << "=========================== Intermediate Progress Report ============================\n"
159  << " "
160  << elapsed_s << "s elapsed"
161  << " - "
162  << num_completed << "/" << num_total_runs << " runs completed"
163  << " - "
164  << std::fixed << std::setw(5) << std::setprecision(2) << totalFractionDone() * 100.0 << "% total done"
165  << "\n";
166 
167  if (workers_running.size() == 0) {
168  // no info
169  } else if (workers_running.size() == 1) {
170  if (workers_running[0]) {
171  ss << "--> " << workers_reports[0].msg << "\n";
172  }
173  } else {
174  ss << "Current Run(s) information (workers working/spawned "
175  << (int)std::count(workers_running.begin(), workers_running.end(), true)
176  << "/" << workers_running.size() << "):\n";
177  for (std::size_t k = 0; k < workers_running.size(); ++k) {
178  ss << "=== " << std::setw(2) << k << ": ";
179  if (!workers_running[k]) {
180  ss << "<idle>\n";
181  } else {
182  ss << workers_reports[k].msg << "\n";
183  }
184  }
185  }
186  ss << "=====================================================================================\n";
187  return ss.str();
188  }
189 };
190 
191 
192 
193 TOMOGRAPHER_EXPORT class TasksInterruptedException : public std::exception
194 {
195  std::string msg_;
196 public:
197  TasksInterruptedException(std::string msg = "Tasks Interrupted.") : msg_(msg) { }
198  virtual ~TasksInterruptedException() throw() { }
199  const char * what() const throw() { return msg_.c_str(); }
200 };
201 
202 
203 
204 namespace Sequential {
205 
226 template<typename TaskType_, typename TaskCData_, typename ResultsCollector_,
227  typename LoggerType_, typename CountIntType_ = int>
228 TOMOGRAPHER_EXPORT class TaskDispatcher
229 {
230 public:
231  typedef TaskType_ TaskType;
232  typedef typename TaskType::StatusReportType TaskStatusReportType;
233  typedef TaskCData_ TaskCData;
234  typedef ResultsCollector_ ResultsCollector;
235  typedef LoggerType_ LoggerType;
236  typedef CountIntType_ CountIntType;
237 
238  // not directly needed, but make sure TaskType::ResultType exists as part of testing the
239  // task, cdata and result-collectors's correct type interface implementation
240  typedef typename TaskType::ResultType TaskResultType;
241 
243 
245 
246 private:
247 
248  const TaskCData * pcdata;
249  ResultsCollector * results;
250  LoggerType & logger;
251 
252  CountIntType num_total_runs;
253 
257  CountIntType task_k;
258 
259  typedef
260 #if defined(__GNUC__) && __GNUC__ == 4 && __GNUC_MINOR__ <= 6 && !defined(__clang__)
261  std::chrono::monotonic_clock // for GCC/G++ 4.6
262 #else
264 #endif
265  StdClockType;
266 
267 
268  struct TaskMgrIface {
269  TaskMgrIface(TaskDispatcher * dispatcher_)
270  : dispatcher(dispatcher_),
271  interrupt_requested(0),
272  status_report_requested(0),
273  status_report_user_fn(),
274  _tasks_start_time(StdClockType::now()),
275  _last_status_report(StdClockType::now()),
276  _status_report_periodic_interval(0)
277  {
278  }
279 
280  private:
281  TaskDispatcher * dispatcher;
282 
283  volatile std::sig_atomic_t interrupt_requested; // could be written to by signal handler
284  volatile std::sig_atomic_t status_report_requested; // could be written to by signal handler
285  FullStatusReportCallbackType status_report_user_fn;
286 
287  const StdClockType::time_point _tasks_start_time;
288  StdClockType::time_point _last_status_report;
289  StdClockType::duration _status_report_periodic_interval;
290 
291  friend class TaskDispatcher;
292 
293  inline void _request_status_report() {
294  status_report_requested = 1;
295  }
296  inline void _request_interrupt() {
297  interrupt_requested = 1;
298  }
299  inline void _request_periodic_status_report(int milliseconds) {
300  if ( milliseconds >= 0 ) {
301  _status_report_periodic_interval = std::chrono::duration_cast<StdClockType::duration>(
302  std::chrono::milliseconds(1+milliseconds)
303  );
304  } else {
305  _status_report_periodic_interval = StdClockType::duration(0);
306  }
307  }
308 
309  public:
310  inline bool statusReportRequested() const
311  {
312  if (interrupt_requested) {
314  }
315  if (_status_report_periodic_interval.count() > 0
316  && (StdClockType::now() - (_last_status_report + _status_report_periodic_interval)).count() > 0) {
317  return true;
318  }
319  return (bool) status_report_requested;
320  }
321 
322  inline void submitStatusReport(const TaskStatusReportType &statreport)
323  {
325 
326  fullstatus.num_completed = dispatcher->task_k;
327  fullstatus.num_total_runs = dispatcher->num_total_runs;
328 
329  // initialize task-specific reports
330  // fill our lists with default-constructed values & set all running to false.
331  fullstatus.workers_running.clear();
332  fullstatus.workers_reports.clear();
333 
334  fullstatus.workers_running.resize(1, false);
335  fullstatus.workers_running[0] = true;
336 
337  fullstatus.workers_reports.resize(1);
338  fullstatus.workers_reports[0] = statreport;
339 
341  StdClockType::now() - _tasks_start_time
342  ).count() * 1e-6;
343 
344  status_report_user_fn(fullstatus);
345 
346  status_report_requested = false;
347  _last_status_report = StdClockType::now();
348  }
349 
350  };
351 
352  TaskMgrIface mgriface;
353 
354 public:
355  TaskDispatcher(TaskCData * pcdata_, ResultsCollector * results_, LoggerType & logger_,
356  CountIntType num_total_runs_)
357  : pcdata(pcdata_), results(results_), logger(logger_), num_total_runs(num_total_runs_),
358  mgriface(this)
359  {
360  }
361 
366  void run()
367  {
368  results->init(num_total_runs, CountIntType(1), pcdata);
369 
370  logger.debug("MultiProc::Sequential::TaskDispatcher::run()", "preparing for sequential runs");
371 
372  for (task_k = 0; task_k < num_total_runs; ++task_k) {
373 
374  logger.debug("Tomographer::MultiProc::Sequential::TaskDispatcher::run()",
375  [&](std::ostream & stream) { stream << "Running task #" << task_k << " ..."; });
376 
377  auto input = pcdata->getTaskInput(task_k);
378 
379  // construct a new task instance
380  TaskType t(input, pcdata, logger);
381 
382  // and run it
383  t.run(pcdata, logger, &mgriface);
384 
385  // and collect the result
386  results->collectResult(task_k, t.getResult(), pcdata);
387  }
388 
389  results->runsFinished(num_total_runs, pcdata);
390  }
391 
392 
403  template<typename Fn>
404  inline void setStatusReportHandler(Fn fnstatus)
405  {
406  mgriface.status_report_user_fn = fnstatus;
407  }
408 
420  inline void requestStatusReport()
421  {
422  mgriface._request_status_report();
423  }
424 
432  inline void requestPeriodicStatusReport(int milliseconds)
433  {
434  mgriface._request_periodic_status_report(milliseconds);
435  }
436 
443  inline void requestInterrupt()
444  {
445  mgriface._request_interrupt();
446  }
447 
448 }; // class TaskDispatcher
449 
450 
451 } // namespace Sequential
452 
453 } // namespace MultiProc
454 
455 } // namespace Tomographer
456 
457 #endif
Utilities for formatting strings.
void setStatusReportHandler(Fn fnstatus)
assign a callable to be called whenever a status report is requested
Definition: multiproc.h:404
Base namespace for the Tomographer project.
Definition: densellh.h:45
void requestInterrupt()
Interrupt all tasks as soon as possible.
Definition: multiproc.h:443
void requestStatusReport()
Request a status report.
Definition: multiproc.h:420
Provide appropriate operator new() definitions for a structure which has a member of the given stored...
std::vector< TaskStatusReportType, typename Tools::NeedOwnOperatorNew< TaskStatusReportType >::AllocatorType > workers_reports
List with the raw report submitted from each individual thread.
Definition: multiproc.h:122
T duration_cast(T... args)
double totalFractionDone() const
The total fraction of the job completed.
Definition: multiproc.h:137
T end(T... args)
std::string fmtDuration(double seconds)
Format a number of seconds into a human-readable string.
Definition: fmt.h:367
T setw(T... args)
T resize(T... args)
STL class.
double elapsed
Number of seconds elapsed since launching the tasks.
Definition: multiproc.h:127
int num_total_runs
Total number of tasks to perform.
Definition: multiproc.h:100
T str(T... args)
std::vector< bool > workers_running
List specifying for each worker (e.g. a spawned thread) whether it is active or not.
Definition: multiproc.h:110
T clear(T... args)
Basic status report class.
Definition: multiproc.h:66
STL class.
T move(T... args)
T count(T... args)
T fixed(T... args)
Managing the need for specific overrides to operator new() for some types (especially Eigen types) ...
T size(T... args)
T begin(T... args)
void requestPeriodicStatusReport(int milliseconds)
Request a status report periodically.
Definition: multiproc.h:432
Executes multiple tasks sequentially.
Definition: multiproc.h:228
std::string getHumanReport() const
Produce a text-based human-readable short representation of the status report.
Definition: multiproc.h:154
T setprecision(T... args)
STL class.
int num_completed
Number of completed tasks.
Definition: multiproc.h:97
A complete status report, abstract version.
Definition: multiproc.h:85