Tomographer  v5.2
Tomographer C++ Framework Documentation
multiproc.h
Go to the documentation of this file.
1 /* This file is part of the Tomographer project, which is distributed under the
2  * terms of the MIT license.
3  *
4  * The MIT License (MIT)
5  *
6  * Copyright (c) 2016 ETH Zurich, Institute for Theoretical Physics, Philippe Faist
7  * Copyright (c) 2017 Caltech, Institute for Quantum Information and Matter, Philippe Faist
8  *
9  * Permission is hereby granted, free of charge, to any person obtaining a copy
10  * of this software and associated documentation files (the "Software"), to deal
11  * in the Software without restriction, including without limitation the rights
12  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
13  * copies of the Software, and to permit persons to whom the Software is
14  * furnished to do so, subject to the following conditions:
15  *
16  * The above copyright notice and this permission notice shall be included in
17  * all copies or substantial portions of the Software.
18  *
19  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
24  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
25  * SOFTWARE.
26  */
27 
28 #ifndef MULTIPROC_H
29 #define MULTIPROC_H
30 
31 #include <csignal>
32 
33 #include <string>
34 #include <chrono>
35 #include <exception>
36 
37 #include <tomographer/tools/fmt.h>
39 
40 
41 
50 namespace Tomographer {
51 
52 namespace MultiProc {
53 
54 
55 
66 struct TOMOGRAPHER_EXPORT TaskStatusReport
67 {
69  : fraction_done(0), msg("<unknown>")
70  { }
71  TaskStatusReport(double fraction_done_, std::string msg_)
72  : fraction_done(fraction_done_), msg(std::move(msg_))
73  { }
74 
75  double fraction_done;
76  std::string msg;
77 };
78 
79 
84 template<typename TaskStatusReportType, typename TaskCountIntType>
85 struct TOMOGRAPHER_EXPORT FullStatusReport
86 {
88  : num_completed(0),
89  num_total_runs(0),
90  workers_running(),
91  workers_reports(),
93  {
94  }
95 
97  TaskCountIntType num_completed;
98 
100  TaskCountIntType num_total_runs;
101 
111 
121  std::vector<TaskStatusReportType,
123 
127  double elapsed;
128 
129 
137  inline double totalFractionDone() const
138  {
139  // idea: cumulate in f one unit per task completed.
140  double f = num_completed;
141  for (std::size_t k = 0; k < workers_running.size(); ++k) {
142  if (workers_running[k]) {
143  // partially completed tasks contribute a fraction
144  f += workers_reports[k].fraction_done;
145  }
146  }
147  // and f goes from zero to num_total_runs
148  return f / num_total_runs;
149  }
150 
155  {
158  (std::chrono::milliseconds::rep)(elapsed*1000)
159  ));
161  ss << "=========================== Intermediate Progress Report ============================\n"
162  << " "
163  << elapsed_s << "s elapsed"
164  << " - "
165  << num_completed << "/" << num_total_runs << " runs completed"
166  << " - "
167  << std::fixed << std::setw(5) << std::setprecision(2) << totalFractionDone() * 100.0 << "% total done"
168  << "\n";
169 
170  if (workers_running.size() == 0) {
171  // no info
172  } else if (workers_running.size() == 1) {
173  if (workers_running[0]) {
174  ss << "--> " << workers_reports[0].msg << "\n";
175  }
176  } else {
177  ss << "Current Run(s) information (workers working/spawned "
178  << std::count(workers_running.begin(), workers_running.end(), true)
179  << "/" << workers_running.size() << "):\n";
180  for (std::size_t k = 0; k < workers_running.size(); ++k) {
181  ss << "=== " << std::setw(2) << k << ": ";
182  if (!workers_running[k]) {
183  ss << "<idle>\n";
184  } else {
185  ss << workers_reports[k].msg << "\n";
186  }
187  }
188  }
189  ss << "=====================================================================================\n";
190  return ss.str();
191  }
192 };
193 
194 
195 
196 class TOMOGRAPHER_EXPORT TasksInterruptedException : public std::exception
197 {
198  std::string msg_;
199 public:
200  TasksInterruptedException(std::string msg = "Tasks Interrupted.") : msg_(msg) { }
201  virtual ~TasksInterruptedException() throw() { }
202  const char * what() const throw() { return msg_.c_str(); }
203 };
204 
205 
206 
207 namespace Sequential {
208 
228 template<typename TaskType_, typename TaskCData_,
229  typename LoggerType_, typename TaskCountIntType_ = int>
230 class TOMOGRAPHER_EXPORT TaskDispatcher
231 {
232 public:
233  typedef TaskType_ TaskType;
234  typedef typename TaskType::StatusReportType TaskStatusReportType;
235  typedef TaskCData_ TaskCData;
236  typedef LoggerType_ LoggerType;
237  typedef TaskCountIntType_ TaskCountIntType;
238 
239  // not directly needed, but make sure TaskType::ResultType exists as part of testing the
240  // task, cdata and result-collectors's correct type interface implementation
241  typedef typename TaskType::ResultType TaskResultType;
242 
244 
246 
247 private:
248 
249  const TaskCData * pcdata;
251  LoggerType & logger;
252 
253  TaskCountIntType num_total_runs;
254 
258  TaskCountIntType task_k;
259 
260  typedef
261 #if defined(__GNUC__) && __GNUC__ == 4 && __GNUC_MINOR__ <= 6 && !defined(__clang__)
262  std::chrono::monotonic_clock // for GCC/G++ 4.6
263 #else
265 #endif
266  StdClockType;
267 
268 
269  struct TaskMgrIface {
270  TaskMgrIface(TaskDispatcher * dispatcher_)
271  : dispatcher(dispatcher_),
272  interrupt_requested(0),
273  status_report_requested(0),
274  status_report_user_fn(),
275  _tasks_start_time(StdClockType::now()),
276  _last_status_report(StdClockType::now()),
277  _status_report_periodic_interval(0)
278  {
279  }
280 
281  private:
282  TaskDispatcher * dispatcher;
283 
284  volatile std::sig_atomic_t interrupt_requested; // could be written to by signal handler
285  volatile std::sig_atomic_t status_report_requested; // could be written to by signal handler
286  FullStatusReportCallbackType status_report_user_fn;
287 
288  const StdClockType::time_point _tasks_start_time;
289  StdClockType::time_point _last_status_report;
290  StdClockType::duration _status_report_periodic_interval;
291 
292  friend class TaskDispatcher;
293 
294  inline void _request_status_report() {
295  status_report_requested = 1;
296  }
297  inline void _request_interrupt() {
298  interrupt_requested = 1;
299  }
300  template<typename IntType>
301  inline void _request_periodic_status_report(IntType milliseconds) {
302  if ( milliseconds >= 0 ) {
303  _status_report_periodic_interval = std::chrono::duration_cast<StdClockType::duration>(
304  std::chrono::milliseconds(1+milliseconds)
305  );
306  } else {
307  _status_report_periodic_interval = StdClockType::duration(0);
308  }
309  }
310 
311  public:
312  inline bool statusReportRequested() const
313  {
314  if (interrupt_requested) {
316  }
317  if (_status_report_periodic_interval.count() > 0
318  && (StdClockType::now() - (_last_status_report + _status_report_periodic_interval)).count() > 0) {
319  return true;
320  }
321  return (bool) status_report_requested;
322  }
323 
324  inline void submitStatusReport(const TaskStatusReportType &statreport)
325  {
326  FullStatusReportType fullstatus;
327 
328  fullstatus.num_completed = dispatcher->task_k;
329  fullstatus.num_total_runs = dispatcher->num_total_runs;
330 
331  // initialize task-specific reports
332  // fill our lists with default-constructed values & set all running to false.
333  fullstatus.workers_running.clear();
334  fullstatus.workers_reports.clear();
335 
336  fullstatus.workers_running.resize(1, false);
337  fullstatus.workers_running[0] = true;
338 
339  fullstatus.workers_reports.resize(1);
340  fullstatus.workers_reports[0] = statreport;
341 
343  StdClockType::now() - _tasks_start_time
344  ).count() * 1e-6;
345 
346  status_report_user_fn(std::move(fullstatus));
347 
348  status_report_requested = false;
349  _last_status_report = StdClockType::now();
350  }
351 
352  };
353 
354  TaskMgrIface mgriface;
355 
356 public:
357  TaskDispatcher(TaskCData * pcdata_, LoggerType & logger_, TaskCountIntType num_total_runs_)
358  : pcdata(pcdata_), results(), logger(logger_), num_total_runs(num_total_runs_),
359  mgriface(this)
360  {
361  }
362  ~TaskDispatcher() {
363  for ( auto r : results ) {
364  if (r != NULL) {
365  delete r;
366  }
367  }
368  }
369 
374  void run()
375  {
376  results.resize((std::size_t)num_total_runs, NULL);
377 
378  logger.debug("MultiProc::Sequential::TaskDispatcher::run()", "preparing for sequential runs");
379 
380  for (task_k = 0; task_k < num_total_runs; ++task_k) {
381 
382  logger.debug("Tomographer::MultiProc::Sequential::TaskDispatcher::run()",
383  [&](std::ostream & stream) { stream << "Running task #" << task_k << " ..."; });
384 
385  auto input = pcdata->getTaskInput(task_k);
386 
387  // construct a new task instance
388  TaskType t(input, pcdata, logger);
389 
390  // and run it
391  t.run(pcdata, logger, &mgriface);
392 
393  // and collect the result
394  logger.longdebug("MultiProc::Sequential::TaskDispatcher::run()", "collecting result");
395  results[(std::size_t)task_k] = new TaskResultType(t.stealResult());
396  }
397 
398  // all done
399  logger.debug("MultiProc::Sequential::TaskDispatcher::run()", "all done");
400  }
401 
405  inline TaskCountIntType numTaskRuns() const { return num_total_runs; }
406 
411  return results;
412  }
413 
417  inline const TaskResultType & collectedTaskResult(std::size_t k) const { return *results[k]; }
418 
419 
430  template<typename Fn>
431  inline void setStatusReportHandler(Fn fnstatus)
432  {
433  mgriface.status_report_user_fn = fnstatus;
434  }
435 
447  inline void requestStatusReport()
448  {
449  mgriface._request_status_report();
450  }
451 
459  template<typename IntType>
460  inline void requestPeriodicStatusReport(IntType milliseconds)
461  {
462  mgriface._request_periodic_status_report(milliseconds);
463  }
464 
471  inline void requestInterrupt()
472  {
473  mgriface._request_interrupt();
474  }
475 
476 }; // class TaskDispatcher
477 
478 
479 
480 template<typename TaskType_, typename TaskCData_,
481  typename LoggerType_, typename TaskCountIntType_ = int>
483 mkTaskDispatcher(TaskCData_ * pcdata_, LoggerType_ & logger_, TaskCountIntType_ num_total_runs_)
484 {
486  pcdata_, logger_, num_total_runs_
487  );
488 }
489 
490 
491 
492 
493 
494 } // namespace Sequential
495 } // namespace MultiProc
496 } // namespace Tomographer
497 
498 #endif
Utilities for formatting strings.
const std::vector< TaskResultType * > & collectedTaskResults() const
Returns the results of all the tasks.
Definition: multiproc.h:410
Base namespace for the Tomographer project.
Definition: densellh.h:45
std::vector< TaskStatusReportType, typename Tools::NeedOwnOperatorNew< TaskStatusReportType >::AllocatorType > workers_reports
List with the raw report submitted from each individual thread.
Definition: multiproc.h:122
std::vector< bool > workers_running
List specifying for each worker (e.g. a spawned thread) whether it is active or not.
Definition: multiproc.h:110
Provide appropriate operator new() definitions for a structure which has a member of the given stored...
TaskCountIntType num_completed
Number of completed tasks.
Definition: multiproc.h:97
std::string getHumanReport() const
Produce a text-based human-readable short representation of the status report.
Definition: multiproc.h:154
T duration_cast(T... args)
T end(T... args)
std::string fmtDuration(double seconds)
Format a number of seconds into a human-readable string.
Definition: fmt.h:370
T setw(T... args)
T resize(T... args)
STL class.
void requestPeriodicStatusReport(IntType milliseconds)
Request a status report periodically.
Definition: multiproc.h:460
double totalFractionDone() const
The total fraction of the job completed.
Definition: multiproc.h:137
T str(T... args)
T clear(T... args)
Basic status report class.
Definition: multiproc.h:66
STL class.
T move(T... args)
T count(T... args)
T fixed(T... args)
Managing the need for specific overrides to operator new() for some types (especially Eigen types) ...
T size(T... args)
TaskCountIntType num_total_runs
Total number of tasks to perform.
Definition: multiproc.h:100
TaskCountIntType numTaskRuns() const
The total number of task instances that were run.
Definition: multiproc.h:405
T begin(T... args)
Executes multiple tasks sequentially.
Definition: multiproc.h:230
T setprecision(T... args)
void requestStatusReport()
Request a status report.
Definition: multiproc.h:447
double elapsed
Number of seconds elapsed since launching the tasks.
Definition: multiproc.h:127
STL class.
void requestInterrupt()
Interrupt all tasks as soon as possible.
Definition: multiproc.h:471
void setStatusReportHandler(Fn fnstatus)
assign a callable to be called whenever a status report is requested
Definition: multiproc.h:431
const TaskResultType & collectedTaskResult(std::size_t k) const
Returns the result of the given task.
Definition: multiproc.h:417
A complete status report, abstract version.
Definition: multiproc.h:85