Tomographer  v5.4
Tomographer C++ Framework Documentation
multiproc.h
Go to the documentation of this file.
1 /* This file is part of the Tomographer project, which is distributed under the
2  * terms of the MIT license.
3  *
4  * The MIT License (MIT)
5  *
6  * Copyright (c) 2016 ETH Zurich, Institute for Theoretical Physics, Philippe Faist
7  * Copyright (c) 2017 Caltech, Institute for Quantum Information and Matter, Philippe Faist
8  *
9  * Permission is hereby granted, free of charge, to any person obtaining a copy
10  * of this software and associated documentation files (the "Software"), to deal
11  * in the Software without restriction, including without limitation the rights
12  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
13  * copies of the Software, and to permit persons to whom the Software is
14  * furnished to do so, subject to the following conditions:
15  *
16  * The above copyright notice and this permission notice shall be included in
17  * all copies or substantial portions of the Software.
18  *
19  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
24  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
25  * SOFTWARE.
26  */
27 
28 #ifndef MULTIPROC_H
29 #define MULTIPROC_H
30 
31 #include <csignal>
32 
33 #include <string>
34 #include <chrono>
35 #include <exception>
36 
37 // task status reports etc. can be serialized with boost::serialization
38 #include <boost/serialization/serialization.hpp>
39 
40 #include <tomographer/tools/fmt.h>
42 
43 
44 
53 namespace Tomographer {
54 
55 namespace MultiProc {
56 
57 
58 
69 struct TOMOGRAPHER_EXPORT TaskStatusReport
70 {
72  : fraction_done(0), msg("<unknown>")
73  { }
74  TaskStatusReport(double fraction_done_, std::string msg_)
75  : fraction_done(fraction_done_), msg(std::move(msg_))
76  { }
77 
78  double fraction_done;
79  std::string msg;
80 
81 private:
82  friend class boost::serialization::access;
83  template<typename Archive>
84  void serialize(Archive & a, unsigned int /*version*/)
85  {
86  a & fraction_done;
87  a & msg;
88  }
89 };
90 
91 
96 template<typename TaskStatusReportType, typename TaskCountIntType>
97 struct TOMOGRAPHER_EXPORT FullStatusReport
98 {
100  : num_completed(0),
101  num_total_runs(0),
102  workers_running(),
103  workers_reports(),
105  {
106  }
107 
109  TaskCountIntType num_completed;
110 
112  TaskCountIntType num_total_runs;
113 
123 
133  std::vector<TaskStatusReportType,
135 
139  double elapsed;
140 
141 
149  inline double totalFractionDone() const
150  {
151  // idea: cumulate in f one unit per task completed.
152  double f = num_completed;
153  for (std::size_t k = 0; k < workers_running.size(); ++k) {
154  if (workers_running[k]) {
155  // partially completed tasks contribute a fraction
156  f += workers_reports[k].fraction_done;
157  }
158  }
159  // and f goes from zero to num_total_runs
160  return f / num_total_runs;
161  }
162 
167  {
170  (std::chrono::milliseconds::rep)(elapsed*1000)
171  ));
173  ss << "=========================== Intermediate Progress Report ============================\n"
174  << " "
175  << elapsed_s << "s elapsed"
176  << " - "
177  << num_completed << "/" << num_total_runs << " runs completed"
178  << " - "
179  << std::fixed << std::setw(5) << std::setprecision(2) << totalFractionDone() * 100.0 << "% total done"
180  << "\n";
181 
182  if (workers_running.size() == 0) {
183  // no info
184  } else if (workers_running.size() == 1) {
185  if (workers_running[0]) {
186  ss << "--> " << workers_reports[0].msg << "\n";
187  }
188  } else {
189  ss << "Current Run(s) information (workers working/spawned "
190  << std::count(workers_running.begin(), workers_running.end(), true)
191  << "/" << workers_running.size() << "):\n";
192  for (std::size_t k = 0; k < workers_running.size(); ++k) {
193  ss << "=== " << std::setw(2) << k << ": ";
194  if (!workers_running[k]) {
195  ss << "<idle>\n";
196  } else {
197  ss << workers_reports[k].msg << "\n";
198  }
199  }
200  }
201  ss << "=====================================================================================\n";
202  return ss.str();
203  }
204 
205 
206 private:
207  friend class boost::serialization::access;
208  template<typename Archive>
209  void serialize(Archive & a, unsigned int /*version*/)
210  {
211  a & num_completed;
212  a & num_total_runs;
213  a & workers_running;
214  a & workers_reports;
215  a & elapsed;
216  }
217 };
218 
219 
220 
221 class TOMOGRAPHER_EXPORT TasksInterruptedException : public std::exception
222 {
223  std::string msg_;
224 public:
225  TasksInterruptedException(std::string msg = "Tasks Interrupted.") : msg_(msg) { }
226  virtual ~TasksInterruptedException() throw() { }
227  const char * what() const throw() { return msg_.c_str(); }
228 };
229 
230 
231 
232 namespace Sequential {
233 
253 template<typename TaskType_, typename TaskCData_,
254  typename LoggerType_, typename TaskCountIntType_ = int>
255 class TOMOGRAPHER_EXPORT TaskDispatcher
256 {
257 public:
258  typedef TaskType_ TaskType;
259  typedef typename TaskType::StatusReportType TaskStatusReportType;
260  typedef TaskCData_ TaskCData;
261  typedef LoggerType_ LoggerType;
262  typedef TaskCountIntType_ TaskCountIntType;
263 
264  // not directly needed, but make sure TaskType::ResultType exists as part of testing the
265  // task, cdata and result-collectors's correct type interface implementation
266  typedef typename TaskType::ResultType TaskResultType;
267 
269 
271 
272 private:
273 
274  const TaskCData * pcdata;
276  LoggerType & logger;
277 
278  TaskCountIntType num_total_runs;
279 
283  TaskCountIntType task_k;
284 
285  typedef
286 #if defined(__GNUC__) && __GNUC__ == 4 && __GNUC_MINOR__ <= 6 && !defined(__clang__)
287  std::chrono::monotonic_clock // for GCC/G++ 4.6
288 #else
290 #endif
291  StdClockType;
292 
293 
294  struct TaskMgrIface {
295  TaskMgrIface(TaskDispatcher * dispatcher_)
296  : dispatcher(dispatcher_),
297  interrupt_requested(0),
298  status_report_requested(0),
299  status_report_user_fn(),
300  _tasks_start_time(StdClockType::now()),
301  _last_status_report(StdClockType::now()),
302  _status_report_periodic_interval(0)
303  {
304  }
305 
306  private:
307  TaskDispatcher * dispatcher;
308 
309  volatile std::sig_atomic_t interrupt_requested; // could be written to by signal handler
310  volatile std::sig_atomic_t status_report_requested; // could be written to by signal handler
311  FullStatusReportCallbackType status_report_user_fn;
312 
313  const StdClockType::time_point _tasks_start_time;
314  StdClockType::time_point _last_status_report;
315  StdClockType::duration _status_report_periodic_interval;
316 
317  friend class TaskDispatcher;
318 
319  inline void _request_status_report() {
320  status_report_requested = 1;
321  }
322  inline void _request_interrupt() {
323  interrupt_requested = 1;
324  }
325  template<typename IntType>
326  inline void _request_periodic_status_report(IntType milliseconds) {
327  if ( milliseconds >= 0 ) {
328  _status_report_periodic_interval = std::chrono::duration_cast<StdClockType::duration>(
329  std::chrono::milliseconds(1+milliseconds)
330  );
331  } else {
332  _status_report_periodic_interval = StdClockType::duration(0);
333  }
334  }
335 
336  public:
337  inline bool statusReportRequested() const
338  {
339  if (interrupt_requested) {
341  }
342  if (_status_report_periodic_interval.count() > 0
343  && (StdClockType::now() - (_last_status_report + _status_report_periodic_interval)).count() > 0) {
344  return true;
345  }
346  return (bool) status_report_requested;
347  }
348 
349  inline void submitStatusReport(const TaskStatusReportType &statreport)
350  {
351  FullStatusReportType fullstatus;
352 
353  fullstatus.num_completed = dispatcher->task_k;
354  fullstatus.num_total_runs = dispatcher->num_total_runs;
355 
356  // initialize task-specific reports
357  // fill our lists with default-constructed values & set all running to false.
358  fullstatus.workers_running.clear();
359  fullstatus.workers_reports.clear();
360 
361  fullstatus.workers_running.resize(1, false);
362  fullstatus.workers_running[0] = true;
363 
364  fullstatus.workers_reports.resize(1);
365  fullstatus.workers_reports[0] = statreport;
366 
368  StdClockType::now() - _tasks_start_time
369  ).count() * 1e-6;
370 
371  status_report_user_fn(std::move(fullstatus));
372 
373  status_report_requested = false;
374  _last_status_report = StdClockType::now();
375  }
376 
377  };
378 
379  TaskMgrIface mgriface;
380 
381 public:
382  TaskDispatcher(TaskCData * pcdata_, LoggerType & logger_, TaskCountIntType num_total_runs_)
383  : pcdata(pcdata_), results(), logger(logger_), num_total_runs(num_total_runs_),
384  mgriface(this)
385  {
386  }
387  ~TaskDispatcher() {
388  for ( auto r : results ) {
389  if (r != NULL) {
390  delete r;
391  }
392  }
393  }
394 
399  void run()
400  {
401  results.resize((std::size_t)num_total_runs, NULL);
402 
403  logger.debug("MultiProc::Sequential::TaskDispatcher::run()", "preparing for sequential runs");
404 
405  for (task_k = 0; task_k < num_total_runs; ++task_k) {
406 
407  logger.debug("Tomographer::MultiProc::Sequential::TaskDispatcher::run()",
408  [&](std::ostream & stream) { stream << "Running task #" << task_k << " ..."; });
409 
410  auto input = pcdata->getTaskInput(task_k);
411 
412  // construct a new task instance
413  TaskType t(input, pcdata, logger);
414 
415  // and run it
416  t.run(pcdata, logger, &mgriface);
417 
418  // and collect the result
419  logger.longdebug("MultiProc::Sequential::TaskDispatcher::run()", "collecting result");
420  results[(std::size_t)task_k] = new TaskResultType(t.stealResult());
421  }
422 
423  // all done
424  logger.debug("MultiProc::Sequential::TaskDispatcher::run()", "all done");
425  }
426 
430  inline TaskCountIntType numTaskRuns() const { return num_total_runs; }
431 
436  return results;
437  }
438 
442  inline const TaskResultType & collectedTaskResult(std::size_t k) const { return *results[k]; }
443 
444 
455  template<typename Fn>
456  inline void setStatusReportHandler(Fn fnstatus)
457  {
458  mgriface.status_report_user_fn = fnstatus;
459  }
460 
472  inline void requestStatusReport()
473  {
474  mgriface._request_status_report();
475  }
476 
484  template<typename IntType>
485  inline void requestPeriodicStatusReport(IntType milliseconds)
486  {
487  mgriface._request_periodic_status_report(milliseconds);
488  }
489 
496  inline void requestInterrupt()
497  {
498  mgriface._request_interrupt();
499  }
500 
501 }; // class TaskDispatcher
502 
503 
504 
505 template<typename TaskType_, typename TaskCData_,
506  typename LoggerType_, typename TaskCountIntType_ = int>
508 mkTaskDispatcher(TaskCData_ * pcdata_, LoggerType_ & logger_, TaskCountIntType_ num_total_runs_)
509 {
511  pcdata_, logger_, num_total_runs_
512  );
513 }
514 
515 
516 
517 
518 
519 } // namespace Sequential
520 } // namespace MultiProc
521 } // namespace Tomographer
522 
523 #endif
Utilities for formatting strings.
const std::vector< TaskResultType * > & collectedTaskResults() const
Returns the results of all the tasks.
Definition: multiproc.h:435
Base namespace for the Tomographer project.
Definition: densellh.h:45
std::vector< TaskStatusReportType, typename Tools::NeedOwnOperatorNew< TaskStatusReportType >::AllocatorType > workers_reports
List with the raw report submitted from each individual thread.
Definition: multiproc.h:134
std::vector< bool > workers_running
List specifying for each worker (e.g. a spawned thread) whether it is active or not.
Definition: multiproc.h:122
Provide appropriate operator new() definitions for a structure which has a member of the given stored...
TaskCountIntType num_completed
Number of completed tasks.
Definition: multiproc.h:109
std::string getHumanReport() const
Produce a text-based human-readable short representation of the status report.
Definition: multiproc.h:166
T duration_cast(T... args)
T end(T... args)
std::string fmtDuration(double seconds)
Format a number of seconds into a human-readable string.
Definition: fmt.h:370
T setw(T... args)
T resize(T... args)
STL class.
void requestPeriodicStatusReport(IntType milliseconds)
Request a status report periodically.
Definition: multiproc.h:485
double totalFractionDone() const
The total fraction of the job completed.
Definition: multiproc.h:149
T str(T... args)
T clear(T... args)
Basic status report class.
Definition: multiproc.h:69
STL class.
T move(T... args)
T count(T... args)
T fixed(T... args)
Managing the need for specific overrides to operator new() for some types (especially Eigen types) ...
T size(T... args)
TaskCountIntType num_total_runs
Total number of tasks to perform.
Definition: multiproc.h:112
TaskCountIntType numTaskRuns() const
The total number of task instances that were run.
Definition: multiproc.h:430
T begin(T... args)
Executes multiple tasks sequentially.
Definition: multiproc.h:255
T setprecision(T... args)
void requestStatusReport()
Request a status report.
Definition: multiproc.h:472
double elapsed
Number of seconds elapsed since launching the tasks.
Definition: multiproc.h:139
STL class.
void requestInterrupt()
Interrupt all tasks as soon as possible.
Definition: multiproc.h:496
void setStatusReportHandler(Fn fnstatus)
assign a callable to be called whenever a status report is requested
Definition: multiproc.h:456
const TaskResultType & collectedTaskResult(std::size_t k) const
Returns the result of the given task.
Definition: multiproc.h:442
A complete status report, abstract version.
Definition: multiproc.h:97