Tomographer  v5.4
Tomographer C++ Framework Documentation
multiprocthreadcommon.h
Go to the documentation of this file.
1 /* This file is part of the Tomographer project, which is distributed under the
2  * terms of the MIT license.
3  *
4  * The MIT License (MIT)
5  *
6  * Copyright (c) 2016 ETH Zurich, Institute for Theoretical Physics, Philippe Faist
7  * Copyright (c) 2017 Caltech, Institute for Quantum Information and Matter, Philippe Faist
8  *
9  * Permission is hereby granted, free of charge, to any person obtaining a copy
10  * of this software and associated documentation files (the "Software"), to deal
11  * in the Software without restriction, including without limitation the rights
12  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
13  * copies of the Software, and to permit persons to whom the Software is
14  * furnished to do so, subject to the following conditions:
15  *
16  * The above copyright notice and this permission notice shall be included in
17  * all copies or substantial portions of the Software.
18  *
19  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
24  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
25  * SOFTWARE.
26  */
27 
28 #ifndef MULTIPROCTHREADCOMMON_H
29 #define MULTIPROCTHREADCOMMON_H
30 
31 #include <chrono>
32 #include <stdexcept>
33 
34 #include <boost/exception/diagnostic_information.hpp>
35 
37 #include <tomographer/tools/cxxutil.h> // tomographer_assert()
39 #include <tomographer/multiproc.h>
40 
41 
42 
54 #ifdef TOMOGRAPHER_USE_WINDOWS_SLEEP
55 // use MS Window's Sleep() function
56 # include <windows.h>
57 # define TOMOGRAPHER_SLEEP_FOR_MS(x) Sleep((x))
58 #else
59 // normal C++11 API function, not available on mingw32 w/ win threads
60 # include <thread>
61 # ifdef TOMOGRAPHER_USE_MINGW_STD_THREAD
62 # include <mingw.thread.h>
63 # endif
64 # define TOMOGRAPHER_SLEEP_FOR_MS(x) \
65  std::this_thread::sleep_for(std::chrono::milliseconds((x)))
66 #endif
67 
68 
69 
70 namespace Tomographer {
71 namespace MultiProc {
72 namespace ThreadCommon {
73 
74 
75 
97 template<typename TaskType_, typename TaskCountIntType_ = int>
98 class TOMOGRAPHER_EXPORT TaskDispatcherBase
99 {
100 public:
102  typedef TaskType_ TaskType;
104  typedef TaskCountIntType_ TaskCountIntType;
105 
107  typedef typename TaskType::ResultType TaskResultType;
109  typedef typename TaskType::StatusReportType TaskStatusReportType;
110 
113 
121 
122  // \a TaskCountIntType must be a signed integer type, because we might need to set the
123  // special value \a -1
125 
126 protected:
127 
128  typedef
129 #if defined(__GNUC__) && __GNUC__ == 4 && __GNUC_MINOR__ <= 6 && !defined(__clang__)
130  std::chrono::monotonic_clock // for GCC/G++ 4.6
131 #else
133 #endif
134  StdClockType;
135 
137  std::string msg;
138  public:
139  TaskInterruptedInnerException() : msg("Task Interrupted") { }
140  virtual ~TaskInterruptedInnerException() throw() { };
141  const char * what() const throw() { return msg.c_str(); }
142  };
143 
145  template<typename TaskCData, typename LoggerType>
147  ThreadSharedData(const TaskCData * pcdata_, LoggerType & logger_,
148  TaskCountIntType num_total_runs, int num_threads)
149  : pcdata(pcdata_),
150  results((std::size_t)num_total_runs, NULL),
151  logger(logger_),
152  time_start(StdClockType::now()),
153  schedule(num_total_runs, num_threads),
154  status_report()
155  {
156  }
157 
159  : pcdata(x.pcdata),
160  results(std::move(x.results)),
161  logger(x.logger),
162  time_start(std::move(x.time_start)),
163  schedule(std::move(x.schedule)),
164  status_report(std::move(x.status_report))
165  { }
166 
168  {
169  for (auto r : results) {
170  if (r != NULL) {
171  delete r;
172  }
173  }
174  }
175 
176  const TaskCData * pcdata;
177 
178  // Apparently it would be better if the elements are aligned in memory, not sure if
179  // it's important here: http://stackoverflow.com/a/41387941/1694896
181 
182  LoggerType & logger;
183 
184  StdClockType::time_point time_start;
185 
186  struct Schedule {
187  int num_threads;
188  int num_active_working_threads;
189 
190  const TaskCountIntType num_total_runs;
191  TaskCountIntType num_completed;
192  TaskCountIntType num_launched;
193 
194  volatile std::sig_atomic_t interrupt_requested;
195  std::vector<std::exception_ptr> inner_exception;
196 
197  Schedule(TaskCountIntType num_total_runs_, int num_threads_)
198  : num_threads(num_threads_),
199  num_active_working_threads(0),
200  num_total_runs(num_total_runs_),
201  num_completed(0),
202  num_launched(0),
203  interrupt_requested(0),
204  inner_exception()
205  {
206  }
207  Schedule(Schedule && x)
208  : num_threads(std::move(x.num_threads)),
209  num_active_working_threads(x.num_active_working_threads),
210  num_total_runs(x.num_total_runs),
211  num_completed(x.num_completed),
212  num_launched(x.num_launched),
213  interrupt_requested(x.interrupt_requested),
214  inner_exception(std::move(x.inner_exception))
215  {
216  }
217  };
218  Schedule schedule;
219 
220  struct StatusReport {
221  bool in_preparation;
222  bool ready;
223  int periodic_interval;
224  int num_waiting_reports;
225 
226  FullStatusReportType full_report;
227  FullStatusReportCallbackType user_fn;
228 
231 
235  unsigned int event_counter_master;
236 
238  StdClockType::time_point last_report_time;
239 
240  StatusReport()
241  : in_preparation(false),
242  ready(false),
243  periodic_interval(-1),
244  num_waiting_reports(0),
245  full_report(),
246  user_fn(),
247  event_counter_user(0),
248  event_counter_master(0u),
249  last_report_time()
250  {
251  }
253  : in_preparation(x.in_preparation),
254  ready(x.ready),
255  periodic_interval(x.periodic_interval),
256  num_waiting_reports(x.num_waiting_reports),
257  full_report(std::move(x.full_report)),
258  user_fn(std::move(x.user_fn)),
259  event_counter_user(x.event_counter_user),
260  event_counter_master(x.event_counter_master),
261  last_report_time(x.last_report_time)
262  {
263  }
264  };
265  StatusReport status_report;
266  };
267 
269  template<typename ThreadSharedDataType, typename TaskLocalLoggerType, typename CriticalExecutionLocker>
271  {
272  const int thread_id;
273 
274  CriticalExecutionLocker & locker;
275 
276  ThreadSharedDataType * shared_data;
277 
278  TaskLocalLoggerType & llogger;
279 
280  TaskCountIntType task_id;
281 
282  // only used by master thread
283  int local_status_report_event_counter_user;
284  // used by all threads
285  unsigned int local_status_report_event_counter;
286 
287 
288  ThreadPrivateData(int thread_id_, ThreadSharedDataType * shared_data_,
289  TaskLocalLoggerType & llogger_, CriticalExecutionLocker & locker_)
290  : thread_id(thread_id_),
291  locker(locker_),
292  shared_data(shared_data_),
293  llogger(llogger_),
294  task_id(-1),
295  local_status_report_event_counter_user(0),
296  local_status_report_event_counter(0u)
297  {
298  }
300  : thread_id(x.thread_id),
301  locker(x.locker),
302  shared_data(x.shared_data),
303  llogger(x.llogger),
304  task_id(x.task_id),
305  local_status_report_event_counter_user(x.local_status_report_event_counter_user),
306  local_status_report_event_counter(x.local_status_report_event_counter)
307  {
308  }
309 
310  inline bool statusReportRequested()
311  {
312  if (shared_data->schedule.interrupt_requested) {
313  llogger.subLogger("/TaskManagerIface::statusReportRequested()")
314  .longdebug("Tasks interrupt has been requested");
316  }
317 
318  //
319  // if we're the master thread, we have some admin to do.
320  //
321  if (thread_id == 0) {
322  // update event counters
323  _master_update_event_counter();
324 
325  // if we're the master thread, then also check if there is a status report ready
326  // to be sent.
327  if (shared_data->status_report.ready) {
328  _master_send_status_report();
329  }
330  } // master thread
331 
332  return local_status_report_event_counter != shared_data->status_report.event_counter_master;
333  }
334 
335  // internal use only:
336  inline void _master_update_event_counter()
337  {
338  if (local_status_report_event_counter_user != (int)shared_data->status_report.event_counter_user) {
339 
340  // user requested new status report -- need to do some initialization etc.
341 
342  // first, note that we have responded to this request
343  local_status_report_event_counter_user = (int)shared_data->status_report.event_counter_user;
344 
345  _master_initiate_status_report();
346 
347  } else if ( shared_data->status_report.periodic_interval > 0 &&
348  ( std::chrono::duration_cast<std::chrono::milliseconds>(
349  StdClockType::now() - shared_data->status_report.last_report_time
350  ).count() > shared_data->status_report.periodic_interval ) ) {
351  // enough time has passed since last status report, generate new one
352 
353  _master_initiate_status_report();
354 
355  }
356  }
357 
358  inline void _master_initiate_status_report()
359  {
360  locker.critical_status_report([&]() {
361  // a LocalLogger wrapping the unprotected shared_data->logger (we are
362  // already in a critical section!!)
364  llogger.originPrefix() + llogger.glue()
365  + std::string("TaskManagerIface::statusReportRequested()"),
366  shared_data->logger
367  );
368 
369  // report already in preparation, ignore this request
370  if (shared_data->status_report.in_preparation) {
371  logger.longdebug("Still working on previous status report, ignoring new report due");
372  return; // no new status report, we're still working on previous one
373  }
374 
375  if (!shared_data->status_report.user_fn) {
376  // no user handler set
377  logger.warning("no user status report handler set! Call setStatusReportHandler() first.");
378  return;
379  }
380 
381  shared_data->status_report.in_preparation = true;
382  shared_data->status_report.ready = false;
383 
384  // mark the last report time as the moment the report is
385  // *initiated*, so that report interval does not get added the
386  // overhead of preparing the report itself
387  shared_data->status_report.last_report_time = StdClockType::now();
388 
389  // initialize status report object & overall data
390  shared_data->status_report.full_report = FullStatusReportType();
391  shared_data->status_report.full_report.num_completed = shared_data->schedule.num_completed;
392  shared_data->status_report.full_report.num_total_runs = shared_data->schedule.num_total_runs;
393  shared_data->status_report.full_report.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
394  StdClockType::now() - shared_data->time_start
395  ).count() * 1e-3;
396  int num_threads = shared_data->schedule.num_threads;
397 
398  // initialize task-specific reports
399  // fill our lists with default-constructed values & set all running to false.
400  shared_data->status_report.full_report.workers_running.clear();
401  shared_data->status_report.full_report.workers_reports.clear();
402  shared_data->status_report.full_report.workers_running.resize((std::size_t)num_threads, false);
403  shared_data->status_report.full_report.workers_reports.resize((std::size_t)num_threads);
404 
405  shared_data->status_report.num_waiting_reports = shared_data->schedule.num_active_working_threads;
406 
407  logger.longdebug([&](std::ostream & stream) {
408  stream << "vectors resized to workers_running.size()="
409  << shared_data->status_report.full_report.workers_running.size()
410  << " and workers_reports.size()="
411  << shared_data->status_report.full_report.workers_reports.size()
412  << ".";
413  });
414 
415  // now update the master event counter, so that all threads provide their reports
416  ++ shared_data->status_report.event_counter_master;
417  }) ;
418  }
419 
420  inline void _master_send_status_report()
421  {
422  auto logger = llogger.subLogger("/TaskManagerIface::statusReportRequested()");
423  logger.longdebug("Status report is ready, sending to user function.");
424 
425  locker.critical_status_report_and_user_fn([&](){
426  // call user-defined status report handler
427  shared_data->status_report.user_fn(std::move(shared_data->status_report.full_report));
428  // all reports received: done --> reset our status_report flags
429  shared_data->status_report.in_preparation = false;
430  shared_data->status_report.ready = false;
431  shared_data->status_report.num_waiting_reports = 0;
432  shared_data->status_report.full_report.workers_running.clear();
433  shared_data->status_report.full_report.workers_reports.clear();
434  }) ;
435  }
436 
437  inline void submitStatusReport(const TaskStatusReportType & report)
438  {
439  local_status_report_event_counter = shared_data->status_report.event_counter_master;
440 
441  // use protected logger
442  llogger.longdebug([&](std::ostream & stream) {
443  stream << "status report received for thread #" << thread_id
444  << ", treating it ... "
445  << "number of reports still expected="
446  << shared_data->status_report.num_waiting_reports
447  << " num_active_working_threads="
448  << shared_data->schedule.num_active_working_threads
449  << " workers_reports.size()="
450  << shared_data->status_report.full_report.workers_reports.size();
451  }) ;
452 
453  locker.critical_status_report_and_schedule([&]() {
454 
455  // do all of this inside critical section, to make sure the
456  // worker_reports vector is memory-updated from the master thread
457 
458  if (thread_id < 0 ||
459  thread_id >= (int)shared_data->status_report.full_report.workers_reports.size()) {
460  fprintf(
461  stderr,
462  "Tomographer::MultiProc::ThreadCommon::TaskDispatcherBase::TaskPrivateData::submitStatusReport(): "
463  "Internal inconsistency: thread_id=%d out of range [0,%d]\n",
464  thread_id, (int)shared_data->status_report.full_report.workers_reports.size()
465  ) ;
466  -- shared_data->status_report.num_waiting_reports ;
467  return;
468  }
469 
470  shared_data->status_report.full_report.workers_running[(std::size_t)thread_id] = true;
471  shared_data->status_report.full_report.workers_reports[(std::size_t)thread_id] = report;
472 
473  int num_waiting = -- shared_data->status_report.num_waiting_reports ;
474 
475  if (num_waiting <= 0) {
476  // The report is ready to be transmitted to the user. But don't send it
477  // directly quite yet, let the master thread send it. We add this
478  // guarantee so that the status report handler can do things which only
479  // the master thread can do (e.g. in Python, call PyErr_CheckSignals()).
480  shared_data->status_report.ready = true;
481  }
482  }) ;
483 
484  } // submitStatusReport()
485 
486  inline void _interrupt_with_inner_exception(std::exception_ptr exc)
487  {
488  locker.critical_schedule([&]() {
489  shared_data->schedule.interrupt_requested = 1;
490  shared_data->schedule.inner_exception.push_back(exc);
491  });
492  }
493 
494 
495  }; // ThreadPrivateData
496 
497 
498 
503  {
504  }
505 
507  {
508  }
509 
511  {
512  }
513 
514 
517  template<typename ThreadPrivateDataType, typename ThreadSharedDataType>
518  void run_worker_enter(ThreadPrivateDataType & private_data, ThreadSharedDataType & shared_data)
519  {
520  private_data.locker.critical_status_report_and_schedule([&]() {
521  ++ shared_data.schedule.num_active_working_threads;
522  if (shared_data.status_report.in_preparation) {
523  // we just entered the game in the middle of a status report preparation
524  // moment -- make sure we send in ours later
525  private_data.local_status_report_event_counter =
526  shared_data.status_report.event_counter_master - 1;
527  } else {
528  private_data.local_status_report_event_counter =
529  shared_data.status_report.event_counter_master;
530  }
531  });
532  }
533 
536  template<typename ThreadPrivateDataType, typename ThreadSharedDataType>
537  void run_worker_exit(ThreadPrivateDataType & private_data, ThreadSharedDataType & shared_data)
538  {
539  private_data.locker.critical_status_report_and_schedule([&]() {
540  -- shared_data.schedule.num_active_working_threads;
541  if (shared_data.status_report.in_preparation) {
542  // we're just leaving the game in the middle of a status report preparation
543  // moment -- so make sure we don't mess up with the status reporting
544  // accounting.
545  if (private_data.local_status_report_event_counter
546  != shared_data.status_report.event_counter_master) {
547  // We haven't sent in our report yet. No problem, but just flag the
548  // full-status-report as ready if we're the last reporting thread
549  // everyone's waiting for
550  if (shared_data.status_report.num_waiting_reports == 1) {
551  // it's our report they're waiting for -- leave the thread as idle
552  shared_data.status_report.num_waiting_reports = 0;
553  shared_data.status_report.ready = true;
554  }
555  } else {
556  // Report has already been sent in, no problem, as
557  // num_waiting_reports is still accurate
558  }
559  }
560  });
561  };
562 
563 
564 
568  template<typename ThreadPrivateDataType, typename ThreadSharedDataType>
570  run_task(ThreadPrivateDataType & private_data, ThreadSharedDataType & shared_data)
571  {
572  auto & logger = private_data.llogger;
573  //... = Tomographer::Logger::makeLocalLogger(TOMO_ORIGIN, private_data.llogger.parentLogger());
574 
575  try {
576 
577  // do not execute task if an interrupt was requested.
578  if (shared_data.schedule.interrupt_requested) {
580  }
581 
582  logger.longdebug([&](std::ostream & stream) {
583  stream << "Run #" << private_data.task_id << ": querying CData for task input";
584  }) ;
585 
586  const auto input = shared_data.pcdata->getTaskInput(private_data.task_id);
587 
588  logger.longdebug([&](std::ostream & stream) {
589  stream << "Running task #" << private_data.task_id;
590  }) ;
591 
592  // construct a new task instance
593  TaskType t(input, shared_data.pcdata, logger.parentLogger());
594 
595  // not sure an std::ostream would be safe here threadwise...?
596  logger.longdebug([&](std::ostream & stream) {
597  stream << "Task #" << private_data.task_id << " set up.";
598  }) ;
599 
600  // and run it
601  t.run(shared_data.pcdata, logger.parentLogger(), &private_data);
602 
603  logger.longdebug([&](std::ostream & stream) {
604  stream << "Task #" << private_data.task_id << " finished, about to collect result.";
605  }) ;
606 
607  // collect result -- no need for protected/locked/critical access because we
608  // are the only thread which will write to this element; vector is pre-allocated
609  shared_data.results[(std::size_t)private_data.task_id] = new TaskResultType(t.stealResult());
610 
611  private_data.locker.critical_schedule([&shared_data]() {
612  // not sure we should make this into a full atomic type, because it
613  // has custom type TaskCountIntType; plus this is a real small
614  // overhead at 1x / task executed
615  ++ shared_data.schedule.num_completed;
616  }) ;
617 
618  logger.longdebug([&](std::ostream & stream) {
619  stream << "Task #" << private_data.task_id << " done.";
620  }) ;
621 
622  } catch (TaskInterruptedInnerException & ) {
623 
624  logger.debug("Task interrupted.") ;
625  return;
626 
627  } catch (...) {
628 
629  private_data._interrupt_with_inner_exception(std::current_exception());
630  return;
631 
632  }
633 
634  }
635 
638  template<typename ThreadPrivateDataType, typename ThreadSharedDataType>
639  void master_continue_monitoring_status(ThreadPrivateDataType & private_data, ThreadSharedDataType & shared_data)
640  {
641  do {
642 
643  // check if we need to send a status report every 100ms
644  TOMOGRAPHER_SLEEP_FOR_MS( 100 ) ;
645 
646  try {
647 
648  private_data.statusReportRequested();
649 
650  } catch (TaskInterruptedInnerException & ) {
651 
652  private_data.llogger.debug("[master] tasks were interrupted, returning") ;
653  return;
654 
655  } catch (...) {
656 
657  private_data.llogger.debug("[master] Exception caught inside task!") ;
658  private_data._interrupt_with_inner_exception( std::current_exception() );
659  private_data.llogger.debug("[master] Exception caught inside task -- handled.") ;
660  return;
661 
662  }
663 
664  } while (shared_data.schedule.num_active_working_threads > 0) ;
665 
666  }
667 
671  template<typename ThreadSharedDataType, typename LocalLoggerType>
672  void run_epilog(ThreadSharedDataType & shared_data, LocalLoggerType & llogger)
673  {
674  if (shared_data.schedule.inner_exception.size()) {
675  // interrupt was requested because of an inner exception, not an explicit interrupt request
676  if (shared_data.schedule.inner_exception.size() > 1) {
677  llogger.warning("Multiple exceptions caught in tasks, only the first one is re-thrown");
678  }
679  std::rethrow_exception(shared_data.schedule.inner_exception[0]);
680  }
681 
682  // if tasks were interrupted, throw the corresponding exception
683  if (shared_data.schedule.interrupt_requested) {
685  }
686  }
687 
688 }; // class TaskDispatcherBase
689 
690 
691 
692 } // namespace ThreadCommon
693 } // namespace MultiProc
694 
695 } // namespace Tomographer
696 
697 
698 
699 
700 
701 #endif
void run_worker_exit(ThreadPrivateDataType &private_data, ThreadSharedDataType &shared_data)
A worker exits the game.
void run_worker_enter(ThreadPrivateDataType &private_data, ThreadSharedDataType &shared_data)
New worker in the game.
TaskCountIntType_ TaskCountIntType
Integer type used to count the number of tasks to run (or running)
TaskType::ResultType TaskResultType
The task result type.
Base namespace for the Tomographer project.
Definition: densellh.h:45
FullStatusReport< TaskStatusReportType, TaskCountIntType > FullStatusReportType
The type to use to generate a full status report of all running tasks.
thread-local variables and stuff — also serves as TaskManagerIface
T duration_cast(T... args)
TaskType::StatusReportType TaskStatusReportType
The type used by a single task when providing a status report.
T current_exception(T... args)
StdClockType::time_point last_report_time
Only used by master thread to detect when to send periodic status reports.
volatile std::sig_atomic_t event_counter_user
Gets incremented when user requests status report.
STL class.
unsigned int event_counter_master
Master thread increments this whenever other threads should provide status report – use unsigned to ...
#define TOMOGRAPHER_CXX_STACK_FORCE_REALIGN
Definition: cxxutil.h:528
#define TOMO_STATIC_ASSERT_EXPR(...)
Tool for static assertions without message.
Definition: cxxdefs.h:77
void TOMOGRAPHER_CXX_STACK_FORCE_REALIGN run_task(ThreadPrivateDataType &private_data, ThreadSharedDataType &shared_data)
Run a given task.
LocalLogger< BaseLoggerType > makeLocalLogger(std::string origin_fn_name, BaseLoggerType &baselogger)
Create a local logger.
Definition: loggers.h:2040
void run_epilog(ThreadSharedDataType &shared_data, LocalLoggerType &llogger)
To be called after all workers are done, to e.g. throw proper exception if an error occurred...
Some C++ utilities, with a tad of C++11 tricks.
STL class.
T move(T... args)
void longdebug(const char *fmt,...)
Generate a log message with level Logger::LONGDEBUG (printf-like syntax)
Definition: loggers.h:1876
Managing the need for specific overrides to operator new() for some types (especially Eigen types) ...
STL class.
Some common definitions for multiprocessing interfaces.
T c_str(T... args)
void master_continue_monitoring_status(ThreadPrivateDataType &private_data, ThreadSharedDataType &shared_data)
To be called by master thread only to continue monitoring for status reports.
std::function< void(const FullStatusReportType &)> FullStatusReportCallbackType
The relevant type for a callback function (or callable) which is provided with the full status report...
T rethrow_exception(T... args)
STL class.
Provide common functionality to thread-based MultiProc implementations.
Utilities for logging messages.
A complete status report, abstract version.
Definition: multiproc.h:97