Tomographer  v5.0
Tomographer C++ Framework Documentation
multiprocthreads.h
Go to the documentation of this file.
1 /* This file is part of the Tomographer project, which is distributed under the
2  * terms of the MIT license.
3  *
4  * The MIT License (MIT)
5  *
6  * Copyright (c) 2016 ETH Zurich, Institute for Theoretical Physics, Philippe Faist
7  * Copyright (c) 2017 Caltech, Institute for Quantum Information and Matter, Philippe Faist
8  *
9  * Permission is hereby granted, free of charge, to any person obtaining a copy
10  * of this software and associated documentation files (the "Software"), to deal
11  * in the Software without restriction, including without limitation the rights
12  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
13  * copies of the Software, and to permit persons to whom the Software is
14  * furnished to do so, subject to the following conditions:
15  *
16  * The above copyright notice and this permission notice shall be included in
17  * all copies or substantial portions of the Software.
18  *
19  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
24  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
25  * SOFTWARE.
26  */
27 
28 #ifndef MULTIPROCTHREADS_H
29 #define MULTIPROCTHREADS_H
30 
31 #include <chrono>
32 #include <stdexcept>
33 
34 #include <boost/exception/diagnostic_information.hpp>
35 
37 #include <tomographer/tools/cxxutil.h> // tomographer_assert()
39 #include <tomographer/multiproc.h>
40 
41 #include <thread>
42 #include <mutex>
43 // Include these here only, because on MinGW with mingw-std-threads, this
44 // includes windows headers which messes up tomographer/tools/logger.h by
45 // defining ERROR preprocessor symbols and other sh***t...
46 #ifdef TOMOGRAPHER_USE_MINGW_STD_THREAD
47 # include <mingw.thread.h>
48 # include <mingw.mutex.h>
49 #endif
50 
51 
52 
62 namespace Tomographer {
63 namespace MultiProc {
64 namespace CxxThreads {
65 
66 
110 template<typename BaseLogger>
111 class TOMOGRAPHER_EXPORT ThreadSanitizerLogger
112  : public Logger::LoggerBase<ThreadSanitizerLogger<BaseLogger> >
113 {
114 public:
115  static constexpr bool IsBaseLoggerThreadSafe = Logger::LoggerTraits<BaseLogger>::IsThreadSafe;
116 private:
117  BaseLogger & _baselogger;
118 
119  std::mutex * _mutex;
120 public:
121 
130  template<typename... MoreArgs>
131  ThreadSanitizerLogger(BaseLogger & logger, std::mutex * mutex)
132  // NOTE: pass the baselogger's level on here. The ThreadSanitizerLogger's level is
133  // this one, and is fixed and cannot be changed while running.
134  : Logger::LoggerBase<ThreadSanitizerLogger<BaseLogger> >(logger.level()),
135  _baselogger(logger),
136  _mutex(mutex)
137  {
138  }
139 
141  {
142  }
143 
144 
146  TOMOGRAPHER_ENABLED_IF(IsBaseLoggerThreadSafe)
147  inline void emitLog(int level, const char * origin, const std::string& msg)
148  {
149  _baselogger.emitLog(level, origin, msg);
150  }
151 
154  IsBaseLoggerThreadSafe)
155  bool filterByOrigin(int level, const char * origin) const
156  {
157  return _baselogger.filterByOrigin(level, origin);
158  }
159 
161  TOMOGRAPHER_ENABLED_IF(!IsBaseLoggerThreadSafe)
162  inline void emitLog(int level, const char * origin, const std::string& msg)
163  {
164  std::lock_guard<std::mutex> lock(*_mutex);
165  _baselogger.emitLog(level, origin, msg);
166  }
167 
170  !IsBaseLoggerThreadSafe)
171  bool filterByOrigin(int level, const char * origin) const
172  {
173  std::lock_guard<std::mutex> lock(*_mutex);
174  return _baselogger.filterByOrigin(level, origin);
175  }
176 
177 };
178 
179 } // namespace CxxThreads
180 } // namespace MultiProc
181 
182 namespace Logger {
189 template<typename BaseLogger>
190 struct TOMOGRAPHER_EXPORT LoggerTraits<MultiProc::CxxThreads::ThreadSanitizerLogger<BaseLogger> >
191  : public LoggerTraits<BaseLogger>
192 {
194  enum {
197  HasOwnGetLevel = 0,
199  IsThreadSafe = 1
200  };
201 };
202 } // namespace Logger
203 
204 
205 namespace MultiProc {
206 namespace CxxThreads {
207 
208 
266 template<typename TaskType_, typename TaskCData_,
267  typename LoggerType_, typename TaskCountIntType_ = int>
268 class TOMOGRAPHER_EXPORT TaskDispatcher
269 {
270 public:
272  typedef TaskType_ TaskType;
274  typedef typename TaskType::ResultType TaskResultType;
276  typedef typename TaskType::StatusReportType TaskStatusReportType;
278  typedef TaskCData_ TaskCData;
280  typedef LoggerType_ LoggerType;
282  typedef TaskCountIntType_ TaskCountIntType;
287 
294 
295 private:
296 
297  typedef
298 #if defined(__GNUC__) && __GNUC__ == 4 && __GNUC_MINOR__ <= 6 && !defined(__clang__)
299  std::chrono::monotonic_clock // for GCC/G++ 4.6
300 #else
302 #endif
303  StdClockType;
304 
305  struct TaskInterruptedInnerException : public std::exception {
306  std::string msg;
307  public:
308  TaskInterruptedInnerException() : msg("Task Interrupted") { }
309  virtual ~TaskInterruptedInnerException() throw() { };
310  const char * what() const throw() { return msg.c_str(); }
311  };
312  struct TaskInnerException : public std::exception {
313  std::string msg;
314  public:
315  TaskInnerException(std::string msgexc) : msg("Task raised an exception: "+msgexc) { }
316  virtual ~TaskInnerException() throw() { };
317  const char * what() const throw() { return msg.c_str(); }
318  };
319 
321  struct thread_shared_data {
322  thread_shared_data(const TaskCData * pcdata_, LoggerType & logger_,
323  TaskCountIntType num_total_runs, int num_threads)
324  : pcdata(pcdata_),
325  user_mutex(),
326  results(),
327  logger(logger_),
328  time_start(),
329  schedule(num_total_runs, num_threads),
330  status_report()
331  { }
332 
333  thread_shared_data(thread_shared_data && x)
334  : pcdata(x.pcdata),
335  user_mutex(),
336  results(std::move(x.results)),
337  logger(x.logger),
338  time_start(std::move(x.time_start)),
339  schedule(std::move(x.schedule)),
340  status_report(std::move(x.status_report))
341  { }
342 
343  const TaskCData * pcdata;
344  std::mutex user_mutex; // mutex for IO, as well as interface user interaction (status report callback fn, etc.)
345 
346  // Apparently it's better if the elements are aligned in memory:
347  // http://stackoverflow.com/a/41387941/1694896
349  // std::mutex results_mutex; // don't need mutex for accessing different elements of a std::vector
350 
351  LoggerType & logger;
352 
353  StdClockType::time_point time_start;
354 
355  struct Schedule {
356  const int num_threads;
357  TaskCountIntType num_active_working_threads;
358 
359  const TaskCountIntType num_total_runs;
360  TaskCountIntType num_completed;
361  TaskCountIntType num_launched;
362 
363  volatile std::sig_atomic_t interrupt_requested;
364  std::string inner_exception;
365 
366  std::mutex mutex;
367 
368  Schedule(TaskCountIntType num_total_runs_, int num_threads_)
369  : num_threads(num_threads_),
370  num_active_working_threads(0),
371  num_total_runs(num_total_runs_),
372  num_completed(0),
373  num_launched(0),
374  interrupt_requested(0),
375  inner_exception(),
376  mutex()
377  {
378  }
379  Schedule(Schedule && x)
380  : num_threads(std::move(x.num_threads)),
381  num_active_working_threads(x.num_active_working_threads),
382  num_total_runs(x.num_total_runs),
383  num_completed(x.num_completed),
384  num_launched(x.num_launched),
385  interrupt_requested(x.interrupt_requested),
386  inner_exception(std::move(x.inner_exception)),
387  mutex()
388  {
389  }
390  };
391  Schedule schedule;
392 
393  struct StatusReport {
394  bool underway;
395  bool initialized;
396  bool ready;
397  int periodic_interval;
398  TaskCountIntType numreportsrecieved;
399  FullStatusReportType full_report;
400  FullStatusReportCallbackType user_fn;
401 
402  volatile std::sig_atomic_t counter_user;
403  volatile std::sig_atomic_t counter_periodic;
404 
405  std::mutex mutex;
406 
407  StatusReport()
408  : underway(false),
409  initialized(false),
410  ready(false),
411  periodic_interval(-1),
412  numreportsrecieved(0),
413  full_report(),
414  user_fn(),
415  counter_user(0),
416  counter_periodic(0),
417  mutex()
418  {
419  }
421  : underway(x.underway),
422  initialized(x.initialized),
423  ready(x.ready),
424  periodic_interval(x.periodic_interval),
425  numreportsrecieved(x.numreportsrecieved),
426  full_report(std::move(x.full_report)),
427  user_fn(std::move(x.user_fn)),
428  counter_user(x.counter_user),
429  counter_periodic(x.counter_periodic),
430  mutex()
431  {
432  }
433  };
434  StatusReport status_report;
435 
436  template<typename Struct, typename Fn>
437  void with_lock(Struct & s, Fn fn) {
438  std::lock_guard<std::mutex> lock(s.mutex);
439  fn(s);
440  }
441  };
442 
444  struct thread_private_data
445  {
446  const int thread_id;
447 
448  thread_shared_data * shared_data;
449 
450  TaskLoggerType & logger;
451 
452  TaskCountIntType task_id;
453  int local_status_report_counter_user;
454  int local_status_report_counter_periodic;
455 
456  thread_private_data(int thread_id_, thread_shared_data * shared_data_, TaskLoggerType & logger_)
457  : thread_id(thread_id_),
458  shared_data(shared_data_),
459  logger(logger_),
460  task_id(-1),
461  local_status_report_counter_user(0),
462  local_status_report_counter_periodic(0)
463  {
464  }
465 
466  inline bool statusReportRequested() const
467  {
468  if (shared_data->schedule.interrupt_requested) {
469  logger.longdebug("CxxThreads::thread_private_data::statusReportRequested()",
470  "tasks interrupt has been requested");
471  throw TaskInterruptedInnerException();
472  }
473 
474  //
475  // if we're the master thread, we have some admin to do.
476  //
477  if (thread_id == 0) {
478  // Update the status_report_counter according to whether
479  // we should provoke a periodic status report
480  if (shared_data->status_report.periodic_interval > 0 && shared_data->status_report.user_fn) {
481  _master_thread_update_status_report_periodic_interval_counter();
482  }
483 
484  // if we're the master thread, then also check if there is a status report ready
485  // to be sent.
486  if (shared_data->status_report.ready) {
487  logger.longdebug("Tomographer::MultiProc::CxxThreads::thread_private_data::statusReportRequested()",
488  "Status report is ready.");
489 
490  // guard this block for status_report access
491  std::lock(shared_data->status_report.mutex, shared_data->user_mutex);
492  std::lock_guard<std::mutex> lck1(shared_data->status_report.mutex, std::adopt_lock);
493  std::lock_guard<std::mutex> lck2(shared_data->user_mutex, std::adopt_lock);
494 
495  // call user-defined status report handler
496  shared_data->status_report.user_fn(std::move(shared_data->status_report.full_report));
497  // all reports recieved: done --> reset our status_report flags
498  shared_data->status_report.numreportsrecieved = 0;
499  shared_data->status_report.underway = false;
500  shared_data->status_report.initialized = false;
501  shared_data->status_report.ready = false;
502  shared_data->status_report.full_report.workers_running.clear();
503  shared_data->status_report.full_report.workers_reports.clear();
504  }
505  } // master thread
506 
507  return local_status_report_counter_user != (int)shared_data->status_report.counter_user ||
508  local_status_report_counter_periodic != (int)shared_data->status_report.counter_periodic;
509  }
510 
511  // internal use only:
512  inline void _master_thread_update_status_report_periodic_interval_counter() const
513  {
514  shared_data->status_report.counter_periodic = (std::sig_atomic_t) (
515  std::chrono::duration_cast<std::chrono::milliseconds>(
516  StdClockType::now().time_since_epoch()
517  ).count() / shared_data->status_report.periodic_interval
518  ) ;
519  }
520 
521  inline void submitStatusReport(const TaskStatusReportType &statreport)
522  {
523  if (local_status_report_counter_user == (int)shared_data->status_report.counter_user &&
524  local_status_report_counter_periodic == (int)shared_data->status_report.counter_periodic) {
525  // error: task submitted unsollicited report
526  logger.warning("CxxThreads TaskDispatcher/taskmanageriface", "Task submitted unsollicited status report");
527  return;
528  }
529 
530  std::lock_guard<std::mutex> lockguard(shared_data->status_report.mutex) ;
531 
532  // we've reacted to the given "signal"
533  local_status_report_counter_user = shared_data->status_report.counter_user;
534  local_status_report_counter_periodic = shared_data->status_report.counter_periodic;
535 
536  // access to the local logger is fine as a different mutex is used
537  logger.longdebug("CxxThreads TaskDispatcher/taskmanageriface", [&](std::ostream & stream) {
538  stream << "status report received for thread #" << thread_id << ", treating it ... "
539  << "numreportsrecieved=" << shared_data->status_report.numreportsrecieved
540  << " num_active_working_threads=" << shared_data->schedule.num_active_working_threads ;
541  });
542 
543  //
544  // If we're the first reporting thread, we need to initiate the status reporing
545  // procedure and initialize the general data
546  //
547  if (!shared_data->status_report.initialized) {
548 
549  //
550  // Check that we indeed have to submit a status report.
551  //
552  if (shared_data->status_report.underway) {
553  // status report already underway!
554  logger.warning("CxxThreads TaskDispatcher/taskmanageriface", "status report already underway!");
555  return;
556  }
557  if (!shared_data->status_report.user_fn) {
558  // no user handler set
559  logger.warning("CxxThreads TaskDispatcher/taskmanageriface",
560  "no user status report handler set! Call setStatusReportHandler() first.");
561  return;
562  }
563 
564  shared_data->status_report.underway = true;
565  shared_data->status_report.initialized = true;
566  shared_data->status_report.ready = false;
567 
568  // initialize status report object & overall data
569  shared_data->status_report.full_report = FullStatusReportType();
570  shared_data->status_report.full_report.num_completed = shared_data->schedule.num_completed;
571  shared_data->status_report.full_report.num_total_runs = shared_data->schedule.num_total_runs;
572  shared_data->status_report.full_report.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
573  StdClockType::now() - shared_data->time_start
574  ).count() * 1e-3;
575  int num_threads = shared_data->schedule.num_threads;
576 
577  // initialize task-specific reports
578  // fill our lists with default-constructed values & set all running to false.
579  shared_data->status_report.full_report.workers_running.clear();
580  shared_data->status_report.full_report.workers_reports.clear();
581  shared_data->status_report.full_report.workers_running.resize((std::size_t)num_threads, false);
582  shared_data->status_report.full_report.workers_reports.resize((std::size_t)num_threads);
583 
584  shared_data->status_report.numreportsrecieved = 0;
585 
586  logger.debug("CxxThreads TaskDispatcher/taskmanageriface", [&](std::ostream & stream) {
587  stream << "vectors resized to workers_running.size()="
588  << shared_data->status_report.full_report.workers_running.size()
589  << " and workers_reports.size()="
590  << shared_data->status_report.full_report.workers_reports.size()
591  << ".";
592  });
593  } // status_report.initialized
594 
595  //
596  // Report the data corresponding to this thread.
597  //
598  logger.debug("CxxThreads TaskDispatcher/taskmanageriface", "thread_id=%ld, workers_reports.size()=%ld",
599  (long)thread_id, (long)shared_data->status_report.full_report.workers_reports.size());
600 
601  tomographer_assert(0 <= thread_id &&
602  (std::size_t)thread_id < shared_data->status_report.full_report.workers_reports.size());
603 
604  shared_data->status_report.full_report.workers_running[(std::size_t)thread_id] = true;
605  shared_data->status_report.full_report.workers_reports[(std::size_t)thread_id] = statreport;
606 
607  ++ shared_data->status_report.numreportsrecieved;
608 
609  if (shared_data->status_report.numreportsrecieved == shared_data->schedule.num_active_working_threads) {
610  //
611  // the report is ready to be transmitted to the user: go!
612  //
613  // Don't send it quite yet, queue it for the master thread to send. We add
614  // this guarantee so that the status report handler can do things which only
615  // the master thread can do (e.g. in Python, call PyErr_CheckSignals()).
616  shared_data->status_report.ready = true;
617  }
618 
619  } // submitStatusReport()
620 
621  }; // thread_private_data
622 
623  thread_shared_data shared_data;
624 
625 public:
637  TaskDispatcher(TaskCData * pcdata_, LoggerType & logger_,
638  TaskCountIntType num_total_runs_,
639  int num_threads_ = (int)std::thread::hardware_concurrency())
640  : shared_data(pcdata_, logger_, num_total_runs_, num_threads_)
641  {
642  }
643 
645  : shared_data(std::move(other.shared_data))
646  {
647  }
648 
649  ~TaskDispatcher()
650  {
651  for (auto r : shared_data.results) {
652  if (r != NULL) {
653  delete r;
654  }
655  }
656  }
657 
662  void run()
663  {
664  shared_data.logger.debug("MultiProc::CxxThreads::TaskDispatcher::run()", "Let's go!");
665  shared_data.time_start = StdClockType::now();
666 
667  shared_data.results.resize((std::size_t)shared_data.schedule.num_total_runs, NULL);
668 
669  shared_data.logger.debug("MultiProc::CxxThreads::TaskDispatcher::run()", "preparing for parallel runs");
670 
671  typedef typename thread_shared_data::Schedule Schedule;
672 
673  auto worker_fn_id = [&](const int thread_id) noexcept(true) {
674 
675  // construct a thread-safe logger we can use
676  TaskLoggerType threadsafelogger(shared_data.logger, & shared_data.user_mutex);
677 
678  thread_private_data privdat(thread_id, & shared_data, threadsafelogger);
679 
680  // not sure an std::ostream would be safe here threadwise...?
681  privdat.logger.longdebug("Tomographer::MultiProc::CxxThreads::TaskDispatcher::run()",
682  "Thread #%d: thread-safe logger and private thread data set up", thread_id);
683 
684  {
685  // active working region. This thread takes care of handling tasks.
686 
687  shared_data.with_lock(shared_data.schedule, [](Schedule & schedule) {
688  ++ schedule.num_active_working_threads;
689  });
690  auto _f0 = Tools::finally([&](){
691  shared_data.with_lock(shared_data.schedule, [](Schedule & schedule) {
692  -- schedule.num_active_working_threads;
693  });
694  });
695 
696  for ( ;; ) {
697  // continue doing stuff until we stop
698 
699  if (shared_data.schedule.interrupt_requested) {
700  break;
701  }
702 
703  // get new task to perform
704  shared_data.with_lock(shared_data.schedule, [&privdat](Schedule & schedule) {
705  if (schedule.num_launched == schedule.num_total_runs) {
706  privdat.task_id = -1; // all tasks already launched -> nothing else to do
707  return;
708  }
709  privdat.task_id = schedule.num_launched;
710  ++ schedule.num_launched ;
711  }) ;
712 
713  if ( privdat.task_id < 0 ) {
714  // all tasks already launched -> nothing else to do
715  break;
716  }
717 
718  // run this task.
719 
720  {
721  std::lock_guard<std::mutex> lk2(shared_data.status_report.mutex);
722  privdat.local_status_report_counter_user = shared_data.status_report.counter_user;
723  privdat.local_status_report_counter_periodic = shared_data.status_report.counter_periodic;
724  }
725 
726  try {
727 
728  _run_task(privdat, shared_data) ;
729 
730  } catch (TaskInterruptedInnerException & exc) {
731  privdat.logger.debug("CxxThreads::run()/worker", "Tasks interrupted.") ;
732  std::lock_guard<std::mutex> lk(shared_data.schedule.mutex) ;
733  shared_data.schedule.interrupt_requested = true;
734  break;
735  } catch (...) {
736  privdat.logger.debug("CxxThreads::run()/worker", "Exception caught inside task! "
737  + boost::current_exception_diagnostic_information()) ;
738  std::lock_guard<std::mutex> lk(shared_data.schedule.mutex) ;
739  shared_data.schedule.interrupt_requested = true;
740  shared_data.schedule.inner_exception += std::string("Exception caught inside task: ")
741  + boost::current_exception_diagnostic_information() + "\n";
742  break;
743  }
744 
745  { std::lock_guard<std::mutex> lk(shared_data.schedule.mutex) ;
746  ++ shared_data.schedule.num_completed;
747  }
748 
749  } // for(;;)
750 
751  } // end of active working region, thread on longer serves to run tasks
752  // (--num_active_working_threads is executed at this point)
753 
754  // only master thread should make sure it continues to serve status report requests
755  if (thread_id == 0 && !shared_data.schedule.interrupt_requested) {
756 
757  const int sleep_val = std::max(shared_data.status_report.periodic_interval, 200);
758 
759  while (shared_data.schedule.num_active_working_threads > 0) {
760 
762  try {
763  privdat.statusReportRequested();
764  } catch (...) {
765  privdat.logger.debug("CxxThreads::run()", "[master] Exception caught inside task!") ;
766  std::lock_guard<std::mutex> lk(shared_data.schedule.mutex) ;
767  shared_data.schedule.interrupt_requested = true;
768  shared_data.schedule.inner_exception += std::string("Exception caught inside task: ")
769  + boost::current_exception_diagnostic_information() + "\n";
770  privdat.logger.debug("CxxThreads::run()", "[master] Exception caught inside task -- handled.") ;
771  break;
772  }
773 
774  }
775  }
776 
777  } ; // worker_fn_id
778 
779  //
780  // now, prepare & launch the workers
781  //
782 
783  shared_data.logger.debug("MultiProc::CxxThreads::TaskDispatcher::run()", "About to launch threads");
784 
785  std::vector<std::thread> threads;
786 
787  // thread_id = 0 is reserved for ourselves.
788  for (int thread_id = 1; thread_id < shared_data.schedule.num_threads; ++thread_id) {
789  threads.push_back( std::thread( [thread_id,worker_fn_id]() { // do NOT capture thread_id by reference!
790  worker_fn_id(thread_id);
791  } ) );
792  }
793 
794  // also run stuff as master thread
795  worker_fn_id(0);
796 
797  std::for_each(threads.begin(), threads.end(), [](std::thread & thread) { thread.join(); }) ;
798 
799  shared_data.logger.debug("MultiProc::CxxThreads::TaskDispatcher::run()", "Threads finished");
800 
801  if (shared_data.schedule.inner_exception.size()) {
802  // interrupt was requested because of an inner exception, not an explicit interrupt request
803  throw std::runtime_error(shared_data.schedule.inner_exception);
804  }
805 
806  // if tasks were interrupted, throw the corresponding exception
807  if (shared_data.schedule.interrupt_requested) {
809  }
810 
811  shared_data.logger.debug("MultiProc::CxxThreads::TaskDispatcher::run()", "Done.");
812  } // run()
813 
814 
815 
819  inline TaskCountIntType numTaskRuns() const {
820  return shared_data.schedule.num_total_runs;
821  }
822 
827  return shared_data.results;
828  }
829 
833  inline const TaskResultType & collectedTaskResult(std::size_t k) const {
834  return *shared_data.results[(std::size_t)k];
835  }
836 
837 
838 private:
839  void _run_task(thread_private_data & privdat, thread_shared_data & shared_data)
841  {
842 
843  // do not execute task if an interrupt was requested.
844  if (shared_data.schedule.interrupt_requested) {
845  return;
846  }
847 
848  // not sure an std::ostream would be safe here threadwise...?
849  privdat.logger.longdebug("Tomographer::MultiProc::CxxThreads::TaskDispatcher::_run_task()",
850  "Run #%lu: querying CData for task input", (unsigned long)privdat.task_id);
851 
852  // See OMP implementation: getTaskInput() is not thread protected
853  //
854  // const auto input = [](thread_shared_data & x, task_id) {
855  // std::lock_guard<std::mutex> lck(x.user_mutex);
856  // return x.pcdata->getTaskInput(task_id);
857  // } (shared_data, privdat.task_id);
858  const auto input = shared_data.pcdata->getTaskInput(privdat.task_id);
859 
860  // not sure an std::ostream would be safe here threadwise...?
861  privdat.logger.debug("Tomographer::MultiProc::CxxThreads::TaskDispatcher::_run_task()",
862  "Running task #%lu ...", (unsigned long)privdat.task_id);
863 
864  // construct a new task instance
865  TaskType t(input, shared_data.pcdata, privdat.logger);
866 
867  // not sure an std::ostream would be safe here threadwise...?
868  privdat.logger.longdebug("Tomographer::MultiProc::CxxThreads::TaskDispatcher::_run_task()",
869  "Task #%lu set up.", (unsigned long)privdat.task_id);
870 
871  // and run it
872  t.run(shared_data.pcdata, privdat.logger, &privdat);
873 
874  privdat.logger.longdebug("Tomographer::MultiProc::CxxThreads::TaskDispatcher::_run_task()",
875  "Task #%lu finished, about to collect result.", (unsigned long)privdat.task_id);
876 
877  // collect result
878  shared_data.results[(std::size_t)privdat.task_id] = new TaskResultType(t.stealResult());
879 
880  privdat.logger.longdebug("Tomographer::MultiProc::CxxThreads::TaskDispatcher::_run_task()", "task done") ;
881  }
882 
883 
884 public:
885 
896  inline void setStatusReportHandler(FullStatusReportCallbackType fnstatus)
897  {
898  std::lock_guard<std::mutex> lck(shared_data.status_report.mutex) ;
899  shared_data.status_report.user_fn = fnstatus;
900  }
901 
912  inline void requestStatusReport()
913  {
914  //
915  // This function can be called from a signal handler. We essentially can't do
916  // anything here because the state of the program can be pretty much anything,
917  // including inside a malloc() or thread lock.
918  //
919  // So just increment an atomic int.
920  //
921 
922  ++ shared_data.status_report.counter_user;
923  }
924 
932  inline void requestPeriodicStatusReport(int milliseconds)
933  {
934  std::lock_guard<std::mutex> lck(shared_data.status_report.mutex) ;
935  shared_data.status_report.periodic_interval = milliseconds;
936  }
937 
949  inline void requestInterrupt()
950  {
951  // set the atomic int
952  shared_data.interrupt_requested = 1;
953  }
954 
955 
956 }; // class TaskDispatcher
957 
958 
959 template<typename TaskType_, typename TaskCData_,
960  typename LoggerType_, typename TaskCountIntType_ = int>
961 inline
963 mkTaskDispatcher(TaskCData_ * pcdata_,
964  LoggerType_ & logger_,
965  TaskCountIntType_ num_total_runs_,
966  int num_threads_ = (int)std::thread::hardware_concurrency())
967 {
969  pcdata_, logger_, num_total_runs_, num_threads_
970  ) ;
971 }
972 
973 
974 
975 } // namespace CxxThreads
976 } // namespace MultiProc
977 
978 } // namespace Tomographer
979 
980 
981 
982 
983 
984 #endif
ThreadSanitizerLogger< LoggerType_ > TaskLoggerType
A thread-safe logger type which is passed on to the child tasks.
const TaskResultType & collectedTaskResult(std::size_t k) const
Get the result of a specific given task.
FullStatusReport< TaskStatusReportType, TaskCountIntType > FullStatusReportType
The type to use to generate a full status report of all running tasks.
Base namespace for the Tomographer project.
Definition: densellh.h:45
TaskDispatcher(TaskCData *pcdata_, LoggerType &logger_, TaskCountIntType num_total_runs_, int num_threads_=(int) std::thread::hardware_concurrency())
Task dispatcher constructor.
void emitLog(int level, const char *origin, const std::string &msg)
Implementation of Logger::LoggerBase::emitLog() for a base logger which is thread-safe.
STL namespace.
T sleep_for(T... args)
tomo_internal::FinalAction< F > finally(F f)
implementation of a finally clause, somewhat like in Python
Definition: cxxutil.h:83
T duration_cast(T... args)
T end(T... args)
Base logger class.
Definition: loggers.h:437
void setStatusReportHandler(FullStatusReportCallbackType fnstatus)
assign a callable to be called whenever a status report is requested
T hardware_concurrency(T... args)
void requestPeriodicStatusReport(int milliseconds)
Request a periodic status report.
STL class.
ThreadSanitizerLogger(BaseLogger &logger, std::mutex *mutex)
Constructor.
void requestStatusReport()
Request a status report.
T push_back(T... args)
LoggerType_ LoggerType
The logger type specified to the dispatcher (not necessarily thread-safe)
#define TOMOGRAPHER_CXX_STACK_FORCE_REALIGN
Definition: cxxutil.h:433
T join(T... args)
T lock(T... args)
TaskType::StatusReportType TaskStatusReportType
The type used by a single task when providing a status report.
void warning(const char *origin, const char *fmt,...)
emit a warning message
Definition: loggers.h:1027
TaskCountIntType_ TaskCountIntType
Integer type used to count the number of tasks to run (or running)
Some C++ utilities, with a tad of C++11 tricks.
void debug(const char *origin, const char *fmt,...)
emit an debug message
Definition: loggers.h:1073
T max(T... args)
STL class.
T move(T... args)
Managing the need for specific overrides to operator new() for some types (especially Eigen types) ...
std::function< void(const FullStatusReportType &)> FullStatusReportCallbackType
The relevant type for a callback function (or callable) which is provided with the full status report...
STL class.
void longdebug(const char *origin, const char *fmt,...)
emit a very verbose debugging message
Definition: loggers.h:1096
Some common definitions for multiprocessing interfaces.
T begin(T... args)
Traits template struct to be specialized for specific Logger implementations.
Definition: loggers.h:357
TaskType::ResultType TaskResultType
The task result type.
T c_str(T... args)
TaskCData_ TaskCData
The type which stores constant, shared data for all tasks to access.
Dispatches tasks to parallel threads using C++11 native threads.
TaskCountIntType numTaskRuns() const
Total number of task run instances.
T for_each(T... args)
void requestInterrupt()
Request an immediate interruption of the tasks.
STL class.
STL class.
#define tomographer_assert(...)
Assertion test macro.
Definition: cxxdefs.h:84
const std::vector< TaskResultType * > & collectedTaskResults() const
Get all the task results.
Utilities for logging messages.
A complete status report, abstract version.
Definition: multiproc.h:85
Wrapper logger to call non-thread-safe loggers from a multithreaded environment.