Tomographer  v3.0
Tomographer C++ Framework Documentation
multiprocomp.h
Go to the documentation of this file.
1 /* This file is part of the Tomographer project, which is distributed under the
2  * terms of the MIT license.
3  *
4  * The MIT License (MIT)
5  *
6  * Copyright (c) 2016 ETH Zurich, Institute for Theoretical Physics, Philippe Faist
7  * Copyright (c) 2017 Caltech, Institute for Quantum Information and Matter, Philippe Faist
8  *
9  * Permission is hereby granted, free of charge, to any person obtaining a copy
10  * of this software and associated documentation files (the "Software"), to deal
11  * in the Software without restriction, including without limitation the rights
12  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
13  * copies of the Software, and to permit persons to whom the Software is
14  * furnished to do so, subject to the following conditions:
15  *
16  * The above copyright notice and this permission notice shall be included in
17  * all copies or substantial portions of the Software.
18  *
19  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
24  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
25  * SOFTWARE.
26  */
27 
28 #ifndef MULTIPROCOMP_H
29 #define MULTIPROCOMP_H
30 
31 #include <csignal>
32 #include <chrono>
33 #include <thread>
34 #include <stdexcept>
35 
36 #ifdef _OPENMP
37 #include <omp.h>
38 #else
39 inline constexpr int omp_get_thread_num() { return 0; }
40 inline constexpr int omp_get_num_threads() { return 1; }
41 #endif
42 
43 #include <boost/exception/diagnostic_information.hpp>
44 
46 #include <tomographer/tools/cxxutil.h> // tomographer_assert()
48 #include <tomographer/multiproc.h>
49 
50 #ifdef TOMOGRAPHER_USE_WINDOWS_SLEEP
51 // use MS Window's Sleep() function
52 #include <windows.h>
53 #define TOMOGRAPHER_SLEEP_FOR_MS(x) Sleep((x))
54 #else
55 // normal C++11 API function, not available on mingw32 w/ win threads
56 #define TOMOGRAPHER_SLEEP_FOR_MS(x) std::this_thread::sleep_for(std::chrono::milliseconds((x)))
57 #endif
58 
59 
69 namespace Tomographer {
70 namespace MultiProc {
71 namespace OMP {
72 
73 namespace tomo_internal {
74 
89 template<typename BaseLogger, bool baseLoggerIsThreadSafe>
90 struct ThreadSanitizerLoggerHelper
91 {
92  static inline void emitLog(BaseLogger & baselogger, int level, const char * origin, const std::string & msg)
93  {
94  bool got_exception = false;
95  std::string exception_str;
96 #pragma omp critical
97  {
98  //fprintf(stderr, "ThreadSanitizerLoggerHelper::emitLog(%d, %s, %s) -- OMP CRITICAL\n", level, origin, msg.c_str());
99  try {
100  baselogger.emitLog(level, origin, msg);
101  } catch (...) {
102  got_exception = true;
103  exception_str = std::string("Caught exception in emitLog: ") + boost::current_exception_diagnostic_information();
104  }
105  }
106  if (got_exception) {
107  throw std::runtime_error(exception_str);
108  }
109  }
110  static inline bool filterByOrigin(BaseLogger & baselogger, int level, const char * origin)
111  {
112  bool got_exception = false;
113  std::string exception_str;
114 
115  bool ok = true;
116 #pragma omp critical
117  {
118  //fprintf(stderr, "ThreadSanitizerLoggerHelper::filterByOrigin(%d, %s) -- OMP CRITICAL\n", level, origin);
119  try {
120  ok = baselogger.filterByOrigin(level, origin);
121  } catch (...) {
122  got_exception = true;
123  exception_str = std::string("Caught exception in filterByOrigni: ")
124  + boost::current_exception_diagnostic_information();
125  }
126  }
127  if (got_exception) {
128  throw std::runtime_error(exception_str);
129  }
130  return ok;
131  }
132 };
133 
134 //
135 // specialize the helper for when logging to a thread-safe base logger. No critical
136 // section needed because the logger is already thread-safe.
137 //
138 template<typename BaseLogger>
139 struct ThreadSanitizerLoggerHelper<BaseLogger, true>
140  {
141  static inline void emitLog(BaseLogger & baselogger, int level, const char * origin, const std::string & msg)
142  {
143  //fprintf(stderr, "ThreadSanitizerLoggerHelper::emitLog(%d, %s, %s) -- NORMAL\n", level, origin, msg.c_str());
144  baselogger.emitLog(level, origin, msg);
145  }
146  static inline bool filterByOrigin(BaseLogger & baselogger, int level, const char * origin)
147  {
148  //fprintf(stderr, "ThreadSanitizerLoggerHelper::filterByOrigin(%d, %s) -- NORMAL\n", level, origin);
149  return baselogger.filterByOrigin(level, origin);
150  }
151 };
152 
153 } // namespace tomo_internal
154 
155 
207 template<typename BaseLogger>
208 class ThreadSanitizerLogger : public Logger::LoggerBase<ThreadSanitizerLogger<BaseLogger> >
209 {
210  BaseLogger & _baselogger;
211 public:
212 
221  template<typename... MoreArgs>
222  ThreadSanitizerLogger(BaseLogger & logger, MoreArgs&&...)
223  // NOTE: pass the baselogger's level on here. The ThreadSanitizerLogger's level is
224  // this one, and is fixed and cannot be changed while running.
225  : Logger::LoggerBase<ThreadSanitizerLogger<BaseLogger> >(logger.level()),
226  _baselogger(logger)
227  {
228  // when you have to debug the debug log mechanism... lol
229  //printf("ThreadSanitizerLogger(): object created\n");
230  //_baselogger.debug("ThreadSanitizerLogger()", "log from constructor.");
231  //emitLog(Logger::DEBUG, "ThreadSanitizerLogger!", "emitLog from constructor");
232  //LoggerBase<ThreadSanitizerLogger<BaseLogger> >::debug("ThreadSanitizerLogger", "debug from constructor");
233  }
234 
236  {
237  }
238 
239 
241  inline void emitLog(int level, const char * origin, const std::string& msg)
242  {
243  //printf("ThreadSanitizerLogger::emitLog(%d, %s, %s)\n", level, origin, msg.c_str());
244  tomo_internal::ThreadSanitizerLoggerHelper<BaseLogger,
246  ::emitLog(
247  _baselogger, level, origin, msg
248  );
249  }
250 
252  template<bool dummy = true>
254  filterByOrigin(int level, const char * origin) const
255  {
256  return tomo_internal::ThreadSanitizerLoggerHelper<BaseLogger,
258  ::filterByOrigin(
259  _baselogger, level, origin
260  );
261  }
262 };
263 
264 } // namespace OMP
265 } // namespace MultiProc
266 
267 namespace Logger {
274 template<typename BaseLogger>
275 struct LoggerTraits<MultiProc::OMP::ThreadSanitizerLogger<BaseLogger> > : public LoggerTraits<BaseLogger>
276 {
278  enum {
281  HasOwnGetLevel = 0,
283  IsThreadSafe = 1
284  };
285 };
286 } // namespace Logger
287 
288 
289 namespace MultiProc {
290 namespace OMP {
291 
292 
349 template<typename TaskType_, typename TaskCData_, typename ResultsCollector_,
350  typename LoggerType_, typename CountIntType_ = int,
351  typename TaskLoggerType_ = ThreadSanitizerLogger<LoggerType_> >
353 {
354 public:
356  typedef TaskType_ TaskType;
358  typedef typename TaskType::StatusReportType TaskStatusReportType;
360  typedef TaskCData_ TaskCData;
362  typedef ResultsCollector_ ResultsCollector;
364  typedef LoggerType_ LoggerType;
366  typedef CountIntType_ CountIntType;
368  typedef TaskLoggerType_ TaskLoggerType;
371 
378 
379 private:
380 
381  typedef
382 #if defined(__GNUC__) && __GNUC__ == 4 && __GNUC_MINOR__ <= 6 && !defined(__clang__)
383  std::chrono::monotonic_clock // for GCC/G++ 4.6
384 #else
386 #endif
387  StdClockType;
388 
389  struct TaskInterruptedInnerException : public std::exception {
390  std::string msg;
391  public:
392  TaskInterruptedInnerException() : msg("Task Interrupted") { }
393  virtual ~TaskInterruptedInnerException() throw() { };
394  const char * what() const throw() { return msg.c_str(); }
395  };
396  struct TaskInnerException : public std::exception {
397  std::string msg;
398  public:
399  TaskInnerException(std::string msgexc) : msg("Task raised an exception: "+msgexc) { }
400  virtual ~TaskInnerException() throw() { };
401  const char * what() const throw() { return msg.c_str(); }
402  };
403 
405  struct thread_shared_data {
406  thread_shared_data(const TaskCData * pcdata_, ResultsCollector * results_, LoggerType & logger_,
407  CountIntType num_total_runs_, CountIntType n_chunk_)
408  : pcdata(pcdata_),
409  results(results_),
410  logger(logger_),
411  time_start(),
412  status_report_underway(false),
413  status_report_initialized(false),
414  status_report_ready(false),
415  status_report_counter(0),
416  status_report_periodic_interval(-1),
417  status_report_numreportsrecieved(0),
418  status_report_full(),
419  status_report_user_fn(),
420  interrupt_requested(0),
421  num_total_runs(num_total_runs_), n_chunk(n_chunk_), num_completed(0),
422  num_active_working_threads(0)
423  { }
424 
425  const TaskCData * pcdata;
426  ResultsCollector * results;
427  LoggerType & logger;
428 
429  StdClockType::time_point time_start;
430 
431  bool status_report_underway;
432  bool status_report_initialized;
433  bool status_report_ready;
434  volatile std::sig_atomic_t status_report_counter;
435  int status_report_periodic_interval;
436  CountIntType status_report_numreportsrecieved;
437 
438  FullStatusReportType status_report_full;
439  FullStatusReportCallbackType status_report_user_fn;
440 
441  volatile std::sig_atomic_t interrupt_requested;
442 
443  CountIntType num_total_runs;
444  CountIntType n_chunk;
445  CountIntType num_completed;
446 
447  CountIntType num_active_working_threads;
448  };
450  struct thread_private_data
451  {
452  thread_shared_data * shared_data;
453 
454  TaskLoggerType * logger;
455 
456  CountIntType kiter;
457  CountIntType local_status_report_counter;
458 
459  inline bool statusReportRequested() const
460  {
461  if (shared_data->interrupt_requested) {
462  throw TaskInterruptedInnerException();
463  }
464 
465  //
466  // if we're the master thread, we have some admin to do.
467  //
468  // NOTE: #pragma omp master prevents us from throwing an exception! (at least on clang++3.8)
469  if (omp_get_thread_num() == 0) {
470  // Update the status_report_counter according to whether
471  // we should provoke a periodic status report
472  if (shared_data->status_report_periodic_interval > 0) {
473  _master_thread_update_status_report_periodic_interval_counter();
474  }
475 
476  // if we're the master thread, then also check if there is a status report ready
477  // to be sent.
478  if (shared_data->status_report_ready) {
479  bool got_exception = false;
480  std::string exception_str;
481 #pragma omp critical
482  {
483  try {
484  // call user-defined status report handler
485  shared_data->status_report_user_fn(shared_data->status_report_full);
486  // all reports recieved: done --> reset our status_report_* flags
487  shared_data->status_report_numreportsrecieved = 0;
488  shared_data->status_report_underway = false;
489  shared_data->status_report_initialized = false;
490  shared_data->status_report_ready = false;
491  shared_data->status_report_full.workers_running.clear();
492  shared_data->status_report_full.workers_reports.clear();
493  } catch (...) {
494  got_exception = true;
495  exception_str = std::string("Caught exception in status reporting callback: ")
496  + boost::current_exception_diagnostic_information();
497  }
498  }
499  if (got_exception) {
500  throw TaskInnerException(exception_str);
501  }
502  }
503  } // omp master
504 
505  return (int)local_status_report_counter != (int)shared_data->status_report_counter;
506  }
507 
508  // internal use only:
509  inline void _master_thread_update_status_report_periodic_interval_counter() const
510  {
511  shared_data->status_report_counter = (
513  StdClockType::now().time_since_epoch()
514  ).count() / shared_data->status_report_periodic_interval) & 0x00FFFFFF
515  ) << 6;
516  // the (x << 6) (equivalent to (x * 64)) allows individual increments from
517  // unrelated additional requestStatusReport() to be taken into account (allows 64
518  // such additional requests per periodic status report)
519  }
520 
521  inline void submitStatusReport(const TaskStatusReportType &statreport)
522  {
523  if ((int)local_status_report_counter == (int)shared_data->status_report_counter) {
524  // error: task submitted unsollicited report
525  logger->warning("OMP TaskDispatcher/taskmanageriface", "Task submitted unsollicited status report");
526  return;
527  }
528 
529 
530  bool got_exception = false;
531  std::string exception_str;
532 #pragma omp critical
533  {
534  try {
535  bool ok = true; // whether to proceed or not
536 
537  // we've reacted to the given "signal"
538  local_status_report_counter = shared_data->status_report_counter;
539 
540  // add our status report to being-prepared status report in the shared data
541  int threadnum = omp_get_thread_num();
542 
543  // Important: access the original logger, not the thread-safe-wrapped logger!!
544  // Otherwise this could lead to deadlocks because of nested critical blocks.
545  shared_data->logger.longdebug("OMP TaskDispatcher/taskmanageriface", [&](std::ostream & stream) {
546  stream << "status report received for thread #" << threadnum << ", treating it ...";
547  });
548 
549  //
550  // If we're the first reporting thread, we need to initiate the status reporing
551  // procedure and initialize the general data
552  //
553  if (!shared_data->status_report_initialized) {
554 
555  //
556  // Check that we indeed have to submit a status report.
557  //
558  if (shared_data->status_report_underway) {
559  // status report already underway!
560  // Important: access the original logger, not the thread-safe-wrapped logger!!
561  // Otherwise this could lead to deadlocks because of nested critical blocks.
562  shared_data->logger.warning("OMP TaskDispatcher/taskmanageriface",
563  "status report already underway!");
564  ok = false;
565  }
566  if (!shared_data->status_report_user_fn) {
567  // no user handler set
568  // Important: access the original logger, not the thread-safe-wrapped logger!!
569  // Otherwise this could lead to deadlocks because of nested critical blocks.
570  shared_data->logger.warning("OMP TaskDispatcher/taskmanageriface",
571  "no user status report handler set!"
572  " call setStatusReportHandler() first.");
573  ok = false;
574  }
575 
576  // since we can't return out of a critical section, we use an if block.
577  if (ok) {
578 
579  shared_data->status_report_underway = true;
580  shared_data->status_report_initialized = true;
581  shared_data->status_report_ready = false;
582 
583  // initialize status report object & overall data
584  shared_data->status_report_full = FullStatusReportType();
585  shared_data->status_report_full.num_completed = shared_data->num_completed;
586  shared_data->status_report_full.num_total_runs = shared_data->num_total_runs;
587  shared_data->status_report_full.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
588  StdClockType::now() - shared_data->time_start
589  ).count() * 1e-3;
590  // shared_data->status_report_full.num_active_working_threads = shared_data->num_active_working_threads;
591  int num_threads = omp_get_num_threads();
592  // shared_data->status_report_full.num_threads = num_threads;
593 
594  // initialize task-specific reports
595  // fill our lists with default-constructed values & set all running to false.
596  shared_data->status_report_full.workers_running.clear();
597  shared_data->status_report_full.workers_reports.clear();
598  shared_data->status_report_full.workers_running.resize(num_threads, false);
599  shared_data->status_report_full.workers_reports.resize(num_threads);
600 
601  shared_data->status_report_numreportsrecieved = 0;
602 
603  // Important: access the original logger, not the thread-safe-wrapped logger!!
604  // Otherwise this could lead to deadlocks because of nested critical blocks.
605  shared_data->logger.debug("OMP TaskDispatcher/taskmanageriface", [&](std::ostream & stream) {
606  stream << "vectors resized to workers_running.size()="
607  << shared_data->status_report_full.workers_running.size()
608  << " and workers_reports.size()="
609  << shared_data->status_report_full.workers_reports.size()
610  << ".";
611  });
612  }
613 
614  } // status_report_initialized
615 
616  // if we're the first reporting thread, then maybe ok was set to false above, so
617  // check again.
618  if (ok) {
619 
620  //
621  // Report the data corresponding to this thread.
622  //
623  // Important: access the original logger, not the thread-safe-wrapped logger!!
624  // Otherwise this could lead to deadlocks because of nested critical blocks.
625  shared_data->logger.debug("OMP TaskDispatcher/taskmanageriface", "threadnum=%ld, workers_reports.size()=%ld",
626  (long)threadnum, (long)shared_data->status_report_full.workers_reports.size());
627 
628  tomographer_assert(0 <= threadnum &&
629  (std::size_t)threadnum < shared_data->status_report_full.workers_reports.size());
630 
631  shared_data->status_report_full.workers_running[threadnum] = true;
632  shared_data->status_report_full.workers_reports[threadnum] = statreport;
633 
634  ++ shared_data->status_report_numreportsrecieved;
635 
636  if (shared_data->status_report_numreportsrecieved == shared_data->num_active_working_threads) {
637  //
638  // the report is ready to be transmitted to the user: go!
639  //
640  // Don't send it quite yet, queue it for the master thread to send. We add
641  // this guarantee so that the status report handler can do things which only
642  // the master thread can do (e.g. in Python, call PyErr_CheckSignals()).
643  shared_data->status_report_ready = true;
644  }
645  } // if ok
646  } catch (...) {
647  // std::string msg(boost::current_exception_diagnostic_information());
648  // fprintf(stderr, "CAUGHT AN EXCEPTION: %s\n", msg.c_str());
649 
650  // if an exception occurred propagate it out to end the task and cause an
651  // interruption. The user may not know what caused the interrupt. Don't
652  // terminate or cause a huge fuss, as this might be actually controlled
653  // (e.g. boost::python::error_already_set) Also, the logger itself may have
654  // caused the exception, so don't use the logger here!!:
655  //shared_data->logger.debug("OMP TaskDispatcher/taskmanageriface", [&](std::ostream & stream) {
656  // stream << "Error while processing status report, exception caught: "
657  // << boost::current_exception_diagnostic_information();
658  // });
659  got_exception = true;
660  exception_str = std::string("Caught exception while processing status report: ")
661  + boost::current_exception_diagnostic_information();
662  }
663  } // omp critical
664  if (got_exception) {
665  throw TaskInnerException(exception_str);
666  }
667  }
668  };
669 
670  thread_shared_data shared_data;
671 
672 public:
694  TaskDispatcher(TaskCData * pcdata_, ResultsCollector * results_, LoggerType & logger_,
695  CountIntType num_total_runs_, CountIntType n_chunk_)
696  : shared_data(pcdata_, results_, logger_, num_total_runs_, n_chunk_)
697  {
698  }
699 
704  void run()
705  {
706  shared_data.logger.debug("MultiProc::OMP::TaskDispatcher::run()", "Let's go!");
707  shared_data.time_start = StdClockType::now();
708 
709  shared_data.results->init(shared_data.num_total_runs, shared_data.n_chunk, shared_data.pcdata);
710 
711  shared_data.logger.debug("MultiProc::OMP::TaskDispatcher::run()", "preparing for parallel runs");
712 
713 #ifndef _OPENMP
714  shared_data.logger.warning("MultiProc::OMP::TaskDispatcher::run()", "OpenMP is disabled; tasks will run serially.");
715 #endif
716 
717  // declaring these as "const" causes a weird compiler error
718  // "`n_chunk' is predetermined `shared' for `shared'"
719  CountIntType num_total_runs = shared_data.num_total_runs;
720  CountIntType n_chunk = shared_data.n_chunk;
721  (void)n_chunk; // silence "unused variable" warning when compiling without OMP support
722 
723  CountIntType k = 0;
724 
725  thread_shared_data *shdat = &shared_data;
726  thread_private_data privdat;
727 
728  shared_data.logger.debug("MultiProc::OMP::TaskDispatcher::run()", "About to start parallel section");
729 
730  int num_active_parallel = 0;
731 
732  std::string inner_exception;
733 
734 #pragma omp parallel default(none) private(k, privdat) shared(shdat, num_total_runs, n_chunk, num_active_parallel, inner_exception)
735  {
736  privdat.shared_data = shdat;
737  privdat.kiter = 0;
738 
739 #pragma omp atomic
740  ++num_active_parallel;
741 
742  //
743  // The main, parallel FOR loop:
744  //
745 #pragma omp for schedule(dynamic,n_chunk) nowait
746  for (k = 0; k < num_total_runs; ++k) {
747 
748  try {
749 
750  // make separate function call, so that we can tell GCC to realign the stack on
751  // platforms which don't do that automatically (yes, MinGW, it's you I'm looking
752  // at)
753  _run_task(privdat, shdat, k);
754 
755  } catch (...) {
756 #pragma omp critical
757  {
758  shdat->interrupt_requested = 1;
759  inner_exception += std::string("Exception caught inside task: ")
760  + boost::current_exception_diagnostic_information() + "\n";
761  }
762  }
763 
764  } // omp for
765 
766 #pragma omp atomic
767  --num_active_parallel;
768 
769 #pragma omp master
770  {
771  if (shdat->status_report_periodic_interval > 0) {
772  // master thread should continue providing regular status updates
773  while (num_active_parallel > 0) {
774  TOMOGRAPHER_SLEEP_FOR_MS(shdat->status_report_periodic_interval);
775  privdat._master_thread_update_status_report_periodic_interval_counter();
776  }
777  }
778  }
779 
780  } // omp parallel
781 
782  if (inner_exception.size()) {
783  // interrupt was requested because of an inner exception, not an explicit interrupt request
784  throw std::runtime_error(inner_exception);
785  }
786 
787  if (shared_data.interrupt_requested) {
789  }
790 
791  shared_data.results->runsFinished(num_total_runs, shared_data.pcdata);
792  }
793 
794 private:
795  void _run_task(thread_private_data & privdat, thread_shared_data * shdat, CountIntType k)
797  {
798 
799  // do not execute task if an interrupt was requested.
800  if (shdat->interrupt_requested) {
801  return;
802  }
803 
804 #pragma omp critical
805  {
806  ++ shdat->num_active_working_threads;
807  privdat.local_status_report_counter = shdat->status_report_counter;
808  }
809 
810  // construct a thread-safe logger we can use
811  TaskLoggerType threadsafelogger(shdat->logger, shdat->pcdata, k);
812 
813  // not sure an std::ostream would be safe here threadwise...?
814  threadsafelogger.longdebug("Tomographer::MultiProc::OMP::TaskDispatcher::_run_task()",
815  "Run #%lu: thread-safe logger set up", (unsigned long)k);
816 
817  // set up our thread-private data
818  privdat.kiter = k;
819  privdat.logger = &threadsafelogger;
820 
821  // not sure an std::ostream would be safe here threadwise...?
822  threadsafelogger.longdebug("Tomographer::MultiProc::OMP::TaskDispatcher::_run_task()",
823  "Run #%lu: querying CData for task input", (unsigned long)k);
824 
825  auto input = shdat->pcdata->getTaskInput(k);
826 
827  // not sure an std::ostream would be safe here threadwise...?
828  threadsafelogger.debug("Tomographer::MultiProc::OMP::TaskDispatcher::_run_task()",
829  "Running task #%lu ...", (unsigned long)k);
830 
831  // construct a new task instance
832  TaskType t(input, shdat->pcdata, threadsafelogger);
833 
834  // not sure an std::ostream would be safe here threadwise...?
835  threadsafelogger.longdebug("Tomographer::MultiProc::OMP::TaskDispatcher::_run_task()",
836  "Task #%lu set up.", (unsigned long)k);
837 
838  // and run it
839  try {
840  t.run(shdat->pcdata, threadsafelogger, &privdat);
841  } catch (const TaskInterruptedInnerException & ) {
842  return;
843  }
844 
845  bool got_exception = false;
846  std::string exception_str;
847 #pragma omp critical
848  {
849  try {
850  shdat->results->collectResult(k, t.getResult(), shdat->pcdata);
851  } catch (...) {
852  got_exception = true;
853  exception_str = std::string("Caught exception while processing status report: ")
854  + boost::current_exception_diagnostic_information();
855  }
856 
857  if ((int)privdat.local_status_report_counter != (int)shdat->status_report_counter) {
858  // status report request missed by task... do as if we had provided a
859  // report, but don't provide report.
860  ++ shdat->status_report_numreportsrecieved;
861  }
862 
863  ++ shdat->num_completed;
864  -- shdat->num_active_working_threads;
865  } // omp critical
866  if (got_exception) {
867  throw std::runtime_error(exception_str);
868  }
869 
870  }
871 
872 public:
873 
885  inline void setStatusReportHandler(FullStatusReportCallbackType fnstatus)
886  {
887 #pragma omp critical
888  {
889  shared_data.status_report_user_fn = fnstatus;
890  }
891  }
892 
903  inline void requestStatusReport()
904  {
905  //
906  // This function can be called from a signal handler. We essentially can't do
907  // anything here because the state of the program can be pretty much anything,
908  // including inside a malloc() or gomp lock. So can't call any function which needs
909  // malloc or a #pragma omp critical.
910  //
911  // So just increment an atomic int.
912  //
913 
914  shared_data.status_report_counter = (shared_data.status_report_counter + 1) & 0x7f;
915 
916  }
917 
925  inline void requestPeriodicStatusReport(int milliseconds)
926  {
927 #pragma omp critical
928  {
929  shared_data.status_report_periodic_interval = milliseconds;
930  }
931  }
932 
944  inline void requestInterrupt()
945  {
946  shared_data.interrupt_requested = 1;
947  }
948 
949 
950 }; // class TaskDispatcher
951 
952 
956 template<typename TaskType_, typename TaskCData_, typename ResultsCollector_,
957  typename LoggerType_, typename CountIntType_ = int>
958 inline TaskDispatcher<TaskType_, TaskCData_, ResultsCollector_,
959  LoggerType_, CountIntType_>
960 makeTaskDispatcher(TaskCData_ * pcdata_, ResultsCollector_ * results_, LoggerType_ & logger_,
961  CountIntType_ num_total_runs_, CountIntType_ n_chunk_)
962 {
963  // RVO should be rather obvious to the compiler
964  return TaskDispatcher<TaskType_, TaskCData_, ResultsCollector_,
965  LoggerType_, CountIntType_>(
966  pcdata_, results_, logger_, num_total_runs_, n_chunk_
967  );
968 }
969 
970 
971 
972 } // namespace OMP
973 } // namespace MultiProc
974 
975 } // namespace Tomographer
976 
977 
978 
979 
980 
981 #endif
ThreadSanitizerLogger(BaseLogger &logger, MoreArgs &&...)
Constructor.
Definition: multiprocomp.h:222
LoggerType_ LoggerType
The logger type specified to the dispatcher (not necessarily thread-safe)
Definition: multiprocomp.h:364
Base namespace for the Tomographer project.
Definition: densellh.h:45
void run()
Run the specified tasks.
Definition: multiprocomp.h:704
void emitLog(int level, const char *origin, const std::string &msg)
Implementation of Logger::LoggerBase::emitLog()
Definition: multiprocomp.h:241
TaskCData_ TaskCData
The type which stores constant, shared data for all tasks to access.
Definition: multiprocomp.h:360
T duration_cast(T... args)
void setStatusReportHandler(FullStatusReportCallbackType fnstatus)
assign a callable to be called whenever a status report is requested
Definition: multiprocomp.h:885
Base logger class.
Definition: loggers.h:428
TaskLoggerType_ TaskLoggerType
A thread-safe logger type which is passed on to the child tasks.
Definition: multiprocomp.h:368
CountIntType_ CountIntType
Integer type used to count the number of tasks to run (or running)
Definition: multiprocomp.h:366
STL class.
#define TOMOGRAPHER_CXX_STACK_FORCE_REALIGN
Definition: cxxutil.h:451
ResultsCollector_ ResultsCollector
The type which is responsible to collect the final results of the individual tasks.
Definition: multiprocomp.h:362
std::enable_if< dummy &&Logger::LoggerTraits< BaseLogger >::HasFilterByOrigin, bool >::type filterByOrigin(int level, const char *origin) const
Implementation of Logger::LoggerBase::filterByOrigin()
Definition: multiprocomp.h:254
TaskDispatcher< TaskType_, TaskCData_, ResultsCollector_, LoggerType_, CountIntType_ > makeTaskDispatcher(TaskCData_ *pcdata_, ResultsCollector_ *results_, LoggerType_ &logger_, CountIntType_ num_total_runs_, CountIntType_ n_chunk_)
Create an OMP task dispatcher. Useful if you want C++&#39;s template argument deduction mechanism...
Definition: multiprocomp.h:960
TaskDispatcher(TaskCData *pcdata_, ResultsCollector *results_, LoggerType &logger_, CountIntType num_total_runs_, CountIntType n_chunk_)
Task dispatcher constructor.
Definition: multiprocomp.h:694
void requestInterrupt()
Request an immediate interruption of the tasks.
Definition: multiprocomp.h:944
FullStatusReport< TaskStatusReportType > FullStatusReportType
The type to use to generate a full status report of all running tasks.
Definition: multiprocomp.h:370
TaskType::StatusReportType TaskStatusReportType
The type used by a single task when providing a status report.
Definition: multiprocomp.h:358
std::function< void(const FullStatusReportType &)> FullStatusReportCallbackType
The relevant type for a callback function (or callable) which is provided with the full status report...
Definition: multiprocomp.h:377
Some C++ utilities, with a tad of C++11 tricks.
STL class.
Managing the need for specific overrides to operator new() for some types (especially Eigen types) ...
T size(T... args)
Some common definitions for multiprocessing interfaces.
Traits template struct to be specialized for specific Logger implementations.
Definition: loggers.h:352
T c_str(T... args)
Wrapper logger to call non-thread-safe loggers from a multithreaded environment.
Definition: multiprocomp.h:208
Dispatches tasks to parallel threads using OpenMP.
Definition: multiprocomp.h:352
void requestPeriodicStatusReport(int milliseconds)
Request a periodic status report.
Definition: multiprocomp.h:925
STL class.
Utilities for logging messages.
void requestStatusReport()
Request a status report.
Definition: multiprocomp.h:903
A complete status report, abstract version.
Definition: multiproc.h:85