Tomographer  v5.2
Tomographer C++ Framework Documentation
multiprocthreadcommon.h
Go to the documentation of this file.
1 /* This file is part of the Tomographer project, which is distributed under the
2  * terms of the MIT license.
3  *
4  * The MIT License (MIT)
5  *
6  * Copyright (c) 2016 ETH Zurich, Institute for Theoretical Physics, Philippe Faist
7  * Copyright (c) 2017 Caltech, Institute for Quantum Information and Matter, Philippe Faist
8  *
9  * Permission is hereby granted, free of charge, to any person obtaining a copy
10  * of this software and associated documentation files (the "Software"), to deal
11  * in the Software without restriction, including without limitation the rights
12  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
13  * copies of the Software, and to permit persons to whom the Software is
14  * furnished to do so, subject to the following conditions:
15  *
16  * The above copyright notice and this permission notice shall be included in
17  * all copies or substantial portions of the Software.
18  *
19  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
24  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
25  * SOFTWARE.
26  */
27 
28 #ifndef MULTIPROCTHREADCOMMON_H
29 #define MULTIPROCTHREADCOMMON_H
30 
31 #include <chrono>
32 #include <stdexcept>
33 
34 #include <boost/exception/diagnostic_information.hpp>
35 
37 #include <tomographer/tools/cxxutil.h> // tomographer_assert()
39 #include <tomographer/multiproc.h>
40 
41 
42 
54 #ifdef TOMOGRAPHER_USE_WINDOWS_SLEEP
55 // use MS Window's Sleep() function
56 # include <windows.h>
57 # define TOMOGRAPHER_SLEEP_FOR_MS(x) Sleep((x))
58 #else
59 // normal C++11 API function, not available on mingw32 w/ win threads
60 # include <thread>
61 # ifdef TOMOGRAPHER_USE_MINGW_STD_THREAD
62 # include <mingw.thread.h>
63 # endif
64 # define TOMOGRAPHER_SLEEP_FOR_MS(x) \
65  std::this_thread::sleep_for(std::chrono::milliseconds((x)))
66 #endif
67 
68 
69 
70 namespace Tomographer {
71 namespace MultiProc {
72 namespace ThreadCommon {
73 
74 
75 
94 template<typename TaskType_, typename TaskCountIntType_ = int>
95 class TOMOGRAPHER_EXPORT TaskDispatcherBase
96 {
97 public:
99  typedef TaskType_ TaskType;
101  typedef TaskCountIntType_ TaskCountIntType;
102 
104  typedef typename TaskType::ResultType TaskResultType;
106  typedef typename TaskType::StatusReportType TaskStatusReportType;
107 
110 
118 
119 
120 protected:
121 
122  typedef
123 #if defined(__GNUC__) && __GNUC__ == 4 && __GNUC_MINOR__ <= 6 && !defined(__clang__)
124  std::chrono::monotonic_clock // for GCC/G++ 4.6
125 #else
127 #endif
128  StdClockType;
129 
131  std::string msg;
132  public:
133  TaskInterruptedInnerException() : msg("Task Interrupted") { }
134  virtual ~TaskInterruptedInnerException() throw() { };
135  const char * what() const throw() { return msg.c_str(); }
136  };
137 
139  template<typename TaskCData, typename LoggerType>
141  ThreadSharedData(const TaskCData * pcdata_, LoggerType & logger_,
142  TaskCountIntType num_total_runs, int num_threads)
143  : pcdata(pcdata_),
144  results((std::size_t)num_total_runs, NULL),
145  logger(logger_),
146  time_start(StdClockType::now()),
147  schedule(num_total_runs, num_threads),
148  status_report()
149  {
150  }
151 
153  : pcdata(x.pcdata),
154  results(std::move(x.results)),
155  logger(x.logger),
156  time_start(std::move(x.time_start)),
157  schedule(std::move(x.schedule)),
158  status_report(std::move(x.status_report))
159  { }
160 
162  {
163  for (auto r : results) {
164  if (r != NULL) {
165  delete r;
166  }
167  }
168  }
169 
170  const TaskCData * pcdata;
171 
172  // Apparently it would be better if the elements are aligned in memory, not sure if
173  // it's important here: http://stackoverflow.com/a/41387941/1694896
175 
176  LoggerType & logger;
177 
178  StdClockType::time_point time_start;
179 
180  struct Schedule {
181  int num_threads;
182  int num_active_working_threads;
183 
184  const TaskCountIntType num_total_runs;
185  TaskCountIntType num_completed;
186  TaskCountIntType num_launched;
187 
188  volatile std::sig_atomic_t interrupt_requested;
189  std::vector<std::exception_ptr> inner_exception;
190 
191  Schedule(TaskCountIntType num_total_runs_, int num_threads_)
192  : num_threads(num_threads_),
193  num_active_working_threads(0),
194  num_total_runs(num_total_runs_),
195  num_completed(0),
196  num_launched(0),
197  interrupt_requested(0),
198  inner_exception()
199  {
200  }
201  Schedule(Schedule && x)
202  : num_threads(std::move(x.num_threads)),
203  num_active_working_threads(x.num_active_working_threads),
204  num_total_runs(x.num_total_runs),
205  num_completed(x.num_completed),
206  num_launched(x.num_launched),
207  interrupt_requested(x.interrupt_requested),
208  inner_exception(std::move(x.inner_exception))
209  {
210  }
211  };
212  Schedule schedule;
213 
214  struct StatusReport {
215  bool in_preparation;
216  bool ready;
217  int periodic_interval;
218  int num_waiting_reports;
219 
220  FullStatusReportType full_report;
221  FullStatusReportCallbackType user_fn;
222 
225 
229  unsigned int event_counter_master;
230 
232  StdClockType::time_point last_report_time;
233 
234  StatusReport()
235  : in_preparation(false),
236  ready(false),
237  periodic_interval(-1),
238  num_waiting_reports(0),
239  full_report(),
240  user_fn(),
241  event_counter_user(0),
242  event_counter_master(0u),
243  last_report_time()
244  {
245  }
247  : in_preparation(x.in_preparation),
248  ready(x.ready),
249  periodic_interval(x.periodic_interval),
250  num_waiting_reports(x.num_waiting_reports),
251  full_report(std::move(x.full_report)),
252  user_fn(std::move(x.user_fn)),
253  event_counter_user(x.event_counter_user),
254  event_counter_master(x.event_counter_master),
255  last_report_time(x.last_report_time)
256  {
257  }
258  };
259  StatusReport status_report;
260  };
261 
263  template<typename ThreadSharedDataType, typename TaskLocalLoggerType, typename CriticalExecutionLocker>
265  {
266  const int thread_id;
267 
268  CriticalExecutionLocker & locker;
269 
270  ThreadSharedDataType * shared_data;
271 
272  TaskLocalLoggerType & llogger;
273 
274  TaskCountIntType task_id;
275 
276  // only used by master thread
277  int local_status_report_event_counter_user;
278  // used by all threads
279  unsigned int local_status_report_event_counter;
280 
281 
282  ThreadPrivateData(int thread_id_, ThreadSharedDataType * shared_data_,
283  TaskLocalLoggerType & llogger_, CriticalExecutionLocker & locker_)
284  : thread_id(thread_id_),
285  locker(locker_),
286  shared_data(shared_data_),
287  llogger(llogger_),
288  task_id(-1),
289  local_status_report_event_counter_user(0),
290  local_status_report_event_counter(0u)
291  {
292  }
294  : thread_id(x.thread_id),
295  locker(x.locker),
296  shared_data(x.shared_data),
297  llogger(x.llogger),
298  task_id(x.task_id),
299  local_status_report_event_counter_user(x.local_status_report_event_counter_user),
300  local_status_report_event_counter(x.local_status_report_event_counter)
301  {
302  }
303 
304  inline bool statusReportRequested()
305  {
306  if (shared_data->schedule.interrupt_requested) {
307  llogger.subLogger("/TaskManagerIface::statusReportRequested()")
308  .longdebug("Tasks interrupt has been requested");
310  }
311 
312  //
313  // if we're the master thread, we have some admin to do.
314  //
315  if (thread_id == 0) {
316  // update event counters
317  _master_update_event_counter();
318 
319  // if we're the master thread, then also check if there is a status report ready
320  // to be sent.
321  if (shared_data->status_report.ready) {
322  _master_send_status_report();
323  }
324  } // master thread
325 
326  return local_status_report_event_counter != shared_data->status_report.event_counter_master;
327  }
328 
329  // internal use only:
330  inline void _master_update_event_counter()
331  {
332  if (local_status_report_event_counter_user != (int)shared_data->status_report.event_counter_user) {
333 
334  // user requested new status report -- need to do some initialization etc.
335 
336  // first, note that we have responded to this request
337  local_status_report_event_counter_user = (int)shared_data->status_report.event_counter_user;
338 
339  _master_initiate_status_report();
340 
341  } else if ( shared_data->status_report.periodic_interval > 0 &&
342  ( std::chrono::duration_cast<std::chrono::milliseconds>(
343  StdClockType::now() - shared_data->status_report.last_report_time
344  ).count() > shared_data->status_report.periodic_interval ) ) {
345  // enough time has passed since last status report, generate new one
346 
347  _master_initiate_status_report();
348 
349  }
350  }
351 
352  inline void _master_initiate_status_report()
353  {
354  locker.critical_status_report([&]() {
355  // a LocalLogger wrapping the unprotected shared_data->logger (we are
356  // already in a critical section!!)
358  llogger.originPrefix() + llogger.glue()
359  + std::string("TaskManagerIface::statusReportRequested()"),
360  shared_data->logger
361  );
362 
363  // report already in preparation, ignore this request
364  if (shared_data->status_report.in_preparation) {
365  logger.debug("Still working on previous status report, ignoring new report due");
366  return; // no new status report, we're still working on previous one
367  }
368 
369  if (!shared_data->status_report.user_fn) {
370  // no user handler set
371  logger.warning("no user status report handler set! Call setStatusReportHandler() first.");
372  return;
373  }
374 
375  shared_data->status_report.in_preparation = true;
376  shared_data->status_report.ready = false;
377 
378  // mark the last report time as the moment the report is
379  // *initiated*, so that report interval does not get added the
380  // overhead of preparing the report itself
381  shared_data->status_report.last_report_time = StdClockType::now();
382 
383  // initialize status report object & overall data
384  shared_data->status_report.full_report = FullStatusReportType();
385  shared_data->status_report.full_report.num_completed = shared_data->schedule.num_completed;
386  shared_data->status_report.full_report.num_total_runs = shared_data->schedule.num_total_runs;
387  shared_data->status_report.full_report.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
388  StdClockType::now() - shared_data->time_start
389  ).count() * 1e-3;
390  int num_threads = shared_data->schedule.num_threads;
391 
392  // initialize task-specific reports
393  // fill our lists with default-constructed values & set all running to false.
394  shared_data->status_report.full_report.workers_running.clear();
395  shared_data->status_report.full_report.workers_reports.clear();
396  shared_data->status_report.full_report.workers_running.resize((std::size_t)num_threads, false);
397  shared_data->status_report.full_report.workers_reports.resize((std::size_t)num_threads);
398 
399  shared_data->status_report.num_waiting_reports = shared_data->schedule.num_active_working_threads;
400 
401  logger.debug([&](std::ostream & stream) {
402  stream << "vectors resized to workers_running.size()="
403  << shared_data->status_report.full_report.workers_running.size()
404  << " and workers_reports.size()="
405  << shared_data->status_report.full_report.workers_reports.size()
406  << ".";
407  });
408 
409  // now update the master event counter, so that all threads provide their reports
410  ++ shared_data->status_report.event_counter_master;
411  }) ;
412  }
413 
414  inline void _master_send_status_report()
415  {
416  auto logger = llogger.subLogger("/TaskManagerIface::statusReportRequested()");
417  logger.longdebug("Status report is ready, sending to user function.");
418 
419  locker.critical_status_report_and_user_fn([&](){
420  // call user-defined status report handler
421  shared_data->status_report.user_fn(std::move(shared_data->status_report.full_report));
422  // all reports received: done --> reset our status_report flags
423  shared_data->status_report.in_preparation = false;
424  shared_data->status_report.ready = false;
425  shared_data->status_report.num_waiting_reports = 0;
426  shared_data->status_report.full_report.workers_running.clear();
427  shared_data->status_report.full_report.workers_reports.clear();
428  }) ;
429  }
430 
431  inline void submitStatusReport(const TaskStatusReportType & report)
432  {
433  local_status_report_event_counter = shared_data->status_report.event_counter_master;
434 
435  // use protected logger
436  llogger.longdebug([&](std::ostream & stream) {
437  stream << "status report received for thread #" << thread_id
438  << ", treating it ... "
439  << "number of reports still expected="
440  << shared_data->status_report.num_waiting_reports
441  << " num_active_working_threads="
442  << shared_data->schedule.num_active_working_threads ;
443  });
444 
445  //
446  // Report the data corresponding to this thread.
447  //
448  llogger.debug([&](std::ostream & stream) {
449  stream << "thread_id=" << thread_id << ", workers_reports.size()="
450  << shared_data->status_report.full_report.workers_reports.size();
451  }) ;
452 
453  locker.critical_status_report_and_schedule([&]() {
454 
455  // do all of this inside critical section, to make sure the
456  // worker_reports vector is memory-updated from the master thread
457 
458  if (thread_id < 0 ||
459  thread_id >= (int)shared_data->status_report.full_report.workers_reports.size()) {
460  fprintf(
461  stderr,
462  "Tomographer::MultiProc::ThreadCommon::TaskDispatcherBase::TaskPrivateData::submitStatusReport(): "
463  "Internal inconsistency: thread_id=%d out of range [0,%d]\n",
464  thread_id, (int)shared_data->status_report.full_report.workers_reports.size()
465  ) ;
466  -- shared_data->status_report.num_waiting_reports ;
467  return;
468  }
469 
470  shared_data->status_report.full_report.workers_running[(std::size_t)thread_id] = true;
471  shared_data->status_report.full_report.workers_reports[(std::size_t)thread_id] = report;
472 
473  int num_waiting = -- shared_data->status_report.num_waiting_reports ;
474 
475  if (num_waiting <= 0) {
476  // The report is ready to be transmitted to the user. But don't send it
477  // directly quite yet, let the master thread send it. We add this
478  // guarantee so that the status report handler can do things which only
479  // the master thread can do (e.g. in Python, call PyErr_CheckSignals()).
480  shared_data->status_report.ready = true;
481  }
482  }) ;
483 
484  } // submitStatusReport()
485 
486  inline void _interrupt_with_inner_exception(std::exception_ptr exc)
487  {
488  locker.critical_schedule([&]() {
489  shared_data->schedule.interrupt_requested = 1;
490  shared_data->schedule.inner_exception.push_back(exc);
491  });
492  }
493 
494 
495  }; // ThreadPrivateData
496 
497 
498 
503  {
504  }
505 
507  {
508  }
509 
511  {
512  }
513 
514 
517  template<typename ThreadPrivateDataType, typename ThreadSharedDataType>
518  void run_worker_enter(ThreadPrivateDataType & private_data, ThreadSharedDataType & shared_data)
519  {
520  private_data.locker.critical_status_report_and_schedule([&]() {
521  ++ shared_data.schedule.num_active_working_threads;
522  if (shared_data.status_report.in_preparation) {
523  // we just entered the game in the middle of a status report preparation
524  // moment -- make sure we send in ours later
525  private_data.local_status_report_event_counter =
526  shared_data.status_report.event_counter_master - 1;
527  } else {
528  private_data.local_status_report_event_counter =
529  shared_data.status_report.event_counter_master;
530  }
531  });
532  }
533 
536  template<typename ThreadPrivateDataType, typename ThreadSharedDataType>
537  void run_worker_exit(ThreadPrivateDataType & private_data, ThreadSharedDataType & shared_data)
538  {
539  private_data.locker.critical_status_report_and_schedule([&]() {
540  -- shared_data.schedule.num_active_working_threads;
541  if (shared_data.status_report.in_preparation) {
542  // we're just leaving the game in the middle of a status report preparation
543  // moment -- so make sure we don't mess up with the status reporting
544  // accounting.
545  if (private_data.local_status_report_event_counter
546  != shared_data.status_report.event_counter_master) {
547  // We haven't sent in our report yet. No problem, but just flag the
548  // full-status-report as ready if we're the last reporting thread
549  // everyone's waiting for
550  if (shared_data.status_report.num_waiting_reports == 1) {
551  // it's our report they're waiting for -- leave the thread as idle
552  shared_data.status_report.num_waiting_reports = 0;
553  shared_data.status_report.ready = true;
554  }
555  } else {
556  // Report has already been sent in, no problem, as
557  // num_waiting_reports is still accurate
558  }
559  }
560  });
561  };
562 
563 
564 
568  template<typename ThreadPrivateDataType, typename ThreadSharedDataType>
570  run_task(ThreadPrivateDataType & private_data, ThreadSharedDataType & shared_data)
571  {
572  auto & logger = private_data.llogger;
573  //... = Tomographer::Logger::makeLocalLogger(TOMO_ORIGIN, private_data.llogger.parentLogger());
574 
575  try {
576 
577  // do not execute task if an interrupt was requested.
578  if (shared_data.schedule.interrupt_requested) {
580  }
581 
582  logger.longdebug([&](std::ostream & stream) {
583  stream << "Run #" << private_data.task_id << ": querying CData for task input";
584  }) ;
585 
586  const auto input = shared_data.pcdata->getTaskInput(private_data.task_id);
587 
588  logger.longdebug([&](std::ostream & stream) {
589  stream << "Running task #" << private_data.task_id;
590  }) ;
591 
592  // construct a new task instance
593  TaskType t(input, shared_data.pcdata, logger.parentLogger());
594 
595  // not sure an std::ostream would be safe here threadwise...?
596  logger.longdebug([&](std::ostream & stream) {
597  stream << "Task #" << private_data.task_id << " set up.";
598  }) ;
599 
600  // and run it
601  t.run(shared_data.pcdata, logger.parentLogger(), &private_data);
602 
603  logger.longdebug([&](std::ostream & stream) {
604  stream << "Task #" << private_data.task_id << " finished, about to collect result.";
605  }) ;
606 
607  // collect result -- no need for protected/locked/critical access because we
608  // are the only thread which will write to this element; vector is pre-allocated
609  shared_data.results[(std::size_t)private_data.task_id] = new TaskResultType(t.stealResult());
610 
611  private_data.locker.critical_schedule([&shared_data]() {
612  // not sure we should make this into a full atomic type, because it
613  // has custom type TaskCountIntType; plus this is a real small
614  // overhead at 1x / task executed
615  ++ shared_data.schedule.num_completed;
616  }) ;
617 
618  logger.longdebug([&](std::ostream & stream) {
619  stream << "Task #" << private_data.task_id << " done.";
620  }) ;
621 
622  } catch (TaskInterruptedInnerException & ) {
623 
624  logger.debug("Task interrupted.") ;
625  return;
626 
627  } catch (...) {
628 
629  private_data._interrupt_with_inner_exception(std::current_exception());
630  return;
631 
632  }
633 
634  }
635 
638  template<typename ThreadPrivateDataType, typename ThreadSharedDataType>
639  void master_continue_monitoring_status(ThreadPrivateDataType & private_data, ThreadSharedDataType & shared_data)
640  {
641  do {
642 
643  // check if we need to send a status report every 100ms
644  TOMOGRAPHER_SLEEP_FOR_MS( 100 ) ;
645 
646  try {
647 
648  private_data.statusReportRequested();
649 
650  } catch (TaskInterruptedInnerException & ) {
651 
652  private_data.llogger.debug("[master] tasks were interrupted, returning") ;
653  return;
654 
655  } catch (...) {
656 
657  private_data.llogger.debug("[master] Exception caught inside task!") ;
658  private_data._interrupt_with_inner_exception( std::current_exception() );
659  private_data.llogger.debug("[master] Exception caught inside task -- handled.") ;
660  return;
661 
662  }
663 
664  } while (shared_data.schedule.num_active_working_threads > 0) ;
665 
666  }
667 
671  template<typename ThreadSharedDataType, typename LocalLoggerType>
672  void run_epilog(ThreadSharedDataType & shared_data, LocalLoggerType & llogger)
673  {
674  if (shared_data.schedule.inner_exception.size()) {
675  // interrupt was requested because of an inner exception, not an explicit interrupt request
676  if (shared_data.schedule.inner_exception.size() > 1) {
677  llogger.warning("Multiple exceptions caught in tasks, only the first one is re-thrown");
678  }
679  std::rethrow_exception(shared_data.schedule.inner_exception[0]);
680  }
681 
682  // if tasks were interrupted, throw the corresponding exception
683  if (shared_data.schedule.interrupt_requested) {
685  }
686  }
687 
688 }; // class TaskDispatcherBase
689 
690 
691 
692 } // namespace ThreadCommon
693 } // namespace MultiProc
694 
695 } // namespace Tomographer
696 
697 
698 
699 
700 
701 #endif
void run_worker_exit(ThreadPrivateDataType &private_data, ThreadSharedDataType &shared_data)
A worker exits the game.
void run_worker_enter(ThreadPrivateDataType &private_data, ThreadSharedDataType &shared_data)
New worker in the game.
TaskCountIntType_ TaskCountIntType
Integer type used to count the number of tasks to run (or running)
TaskType::ResultType TaskResultType
The task result type.
Base namespace for the Tomographer project.
Definition: densellh.h:45
FullStatusReport< TaskStatusReportType, TaskCountIntType > FullStatusReportType
The type to use to generate a full status report of all running tasks.
thread-local variables and stuff — also serves as TaskManagerIface
T duration_cast(T... args)
TaskType::StatusReportType TaskStatusReportType
The type used by a single task when providing a status report.
T current_exception(T... args)
StdClockType::time_point last_report_time
Only used by master thread to detect when to send periodic status reports.
volatile std::sig_atomic_t event_counter_user
Gets incremented when user requests status report.
STL class.
void debug(const char *fmt,...)
Generate a log message with level Logger::DEBUG (printf-like syntax)
Definition: loggers.h:1803
unsigned int event_counter_master
Master thread increments this whenever other threads should provide status report – use unsigned to ...
#define TOMOGRAPHER_CXX_STACK_FORCE_REALIGN
Definition: cxxutil.h:528
void TOMOGRAPHER_CXX_STACK_FORCE_REALIGN run_task(ThreadPrivateDataType &private_data, ThreadSharedDataType &shared_data)
Run a given task.
LocalLogger< BaseLoggerType > makeLocalLogger(std::string origin_fn_name, BaseLoggerType &baselogger)
Create a local logger.
Definition: loggers.h:1960
void run_epilog(ThreadSharedDataType &shared_data, LocalLoggerType &llogger)
To be called after all workers are done, to e.g. throw proper exception if an error occurred...
Some C++ utilities, with a tad of C++11 tricks.
STL class.
T move(T... args)
Managing the need for specific overrides to operator new() for some types (especially Eigen types) ...
STL class.
Some common definitions for multiprocessing interfaces.
T c_str(T... args)
void master_continue_monitoring_status(ThreadPrivateDataType &private_data, ThreadSharedDataType &shared_data)
To be called by master thread only to continue monitoring for status reports.
std::function< void(const FullStatusReportType &)> FullStatusReportCallbackType
The relevant type for a callback function (or callable) which is provided with the full status report...
T rethrow_exception(T... args)
STL class.
Provide common functionality to thread-based MultiProc implementations.
Utilities for logging messages.
A complete status report, abstract version.
Definition: multiproc.h:85