Tomographer  v5.0
Tomographer C++ Framework Documentation
multiprocomp.h
Go to the documentation of this file.
1 /* This file is part of the Tomographer project, which is distributed under the
2  * terms of the MIT license.
3  *
4  * The MIT License (MIT)
5  *
6  * Copyright (c) 2016 ETH Zurich, Institute for Theoretical Physics, Philippe Faist
7  * Copyright (c) 2017 Caltech, Institute for Quantum Information and Matter, Philippe Faist
8  *
9  * Permission is hereby granted, free of charge, to any person obtaining a copy
10  * of this software and associated documentation files (the "Software"), to deal
11  * in the Software without restriction, including without limitation the rights
12  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
13  * copies of the Software, and to permit persons to whom the Software is
14  * furnished to do so, subject to the following conditions:
15  *
16  * The above copyright notice and this permission notice shall be included in
17  * all copies or substantial portions of the Software.
18  *
19  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
24  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
25  * SOFTWARE.
26  */
27 
28 #ifndef MULTIPROCOMP_H
29 #define MULTIPROCOMP_H
30 
31 #include <csignal>
32 #include <chrono>
33 #include <stdexcept>
34 
35 #ifdef _OPENMP
36 #include <omp.h>
37 #else
38 inline constexpr int omp_get_thread_num() { return 0; }
39 inline constexpr int omp_get_num_threads() { return 1; }
40 #endif
41 
42 #include <boost/exception/diagnostic_information.hpp>
43 
45 #include <tomographer/tools/cxxutil.h> // tomographer_assert()
47 #include <tomographer/multiproc.h>
48 
49 #ifdef TOMOGRAPHER_USE_WINDOWS_SLEEP
50 // use MS Window's Sleep() function
51 # include <windows.h>
52 # define TOMOGRAPHER_SLEEP_FOR_MS(x) Sleep((x))
53 #else
54 // normal C++11 API function, not available on mingw32 w/ win threads
55 # include <thread>
56 # ifdef TOMOGRAPHER_USE_MINGW_STD_THREAD
57 # include <mingw.thread.h>
58 # endif
59 # define TOMOGRAPHER_SLEEP_FOR_MS(x) \
60  std::this_thread::sleep_for(std::chrono::milliseconds((x)))
61 #endif
62 
63 
73 namespace Tomographer {
74 namespace MultiProc {
75 namespace OMP {
76 
77 namespace tomo_internal {
78 
93 template<typename BaseLogger, bool baseLoggerIsThreadSafe>
94 struct ThreadSanitizerLoggerHelper
95 {
96  static inline void emitLog(BaseLogger & baselogger, int level, const char * origin, const std::string & msg)
97  {
98  bool got_exception = false;
99  std::string exception_str;
100 #pragma omp critical
101  {
102  //fprintf(stderr, "ThreadSanitizerLoggerHelper::emitLog(%d, %s, %s) -- OMP CRITICAL\n", level, origin, msg.c_str());
103  try {
104  baselogger.emitLog(level, origin, msg);
105  } catch (...) {
106  got_exception = true;
107  exception_str = std::string("Caught exception in emitLog: ") + boost::current_exception_diagnostic_information();
108  }
109  }
110  if (got_exception) {
111  throw std::runtime_error(exception_str);
112  }
113  }
114  static inline bool filterByOrigin(BaseLogger & baselogger, int level, const char * origin)
115  {
116  bool got_exception = false;
117  std::string exception_str;
118 
119  bool ok = true;
120 #pragma omp critical
121  {
122  //fprintf(stderr, "ThreadSanitizerLoggerHelper::filterByOrigin(%d, %s) -- OMP CRITICAL\n", level, origin);
123  try {
124  ok = baselogger.filterByOrigin(level, origin);
125  } catch (...) {
126  got_exception = true;
127  exception_str = std::string("Caught exception in filterByOrigni: ")
128  + boost::current_exception_diagnostic_information();
129  }
130  }
131  if (got_exception) {
132  throw std::runtime_error(exception_str);
133  }
134  return ok;
135  }
136 };
137 
138 //
139 // specialize the helper for when logging to a thread-safe base logger. No critical
140 // section needed because the logger is already thread-safe.
141 //
142 template<typename BaseLogger>
143 struct ThreadSanitizerLoggerHelper<BaseLogger, true>
144  {
145  static inline void emitLog(BaseLogger & baselogger, int level, const char * origin, const std::string & msg)
146  {
147  //fprintf(stderr, "ThreadSanitizerLoggerHelper::emitLog(%d, %s, %s) -- NORMAL\n", level, origin, msg.c_str());
148  baselogger.emitLog(level, origin, msg);
149  }
150  static inline bool filterByOrigin(BaseLogger & baselogger, int level, const char * origin)
151  {
152  //fprintf(stderr, "ThreadSanitizerLoggerHelper::filterByOrigin(%d, %s) -- NORMAL\n", level, origin);
153  return baselogger.filterByOrigin(level, origin);
154  }
155 };
156 
157 } // namespace tomo_internal
158 
159 
211 template<typename BaseLogger>
212 class TOMOGRAPHER_EXPORT ThreadSanitizerLogger
213  : public Logger::LoggerBase<ThreadSanitizerLogger<BaseLogger> >
214 {
215  BaseLogger & _baselogger;
216 public:
217 
226  template<typename... MoreArgs>
227  ThreadSanitizerLogger(BaseLogger & logger, MoreArgs&&...)
228  // NOTE: pass the baselogger's level on here. The ThreadSanitizerLogger's level is
229  // this one, and is fixed and cannot be changed while running.
230  : Logger::LoggerBase<ThreadSanitizerLogger<BaseLogger> >(logger.level()),
231  _baselogger(logger)
232  {
233  // when you have to debug the debug log mechanism... lol
234  //printf("ThreadSanitizerLogger(): object created\n");
235  //_baselogger.debug("ThreadSanitizerLogger()", "log from constructor.");
236  //emitLog(Logger::DEBUG, "ThreadSanitizerLogger!", "emitLog from constructor");
237  //LoggerBase<ThreadSanitizerLogger<BaseLogger> >::debug("ThreadSanitizerLogger", "debug from constructor");
238  }
239 
241  {
242  }
243 
244 
246  inline void emitLog(int level, const char * origin, const std::string& msg)
247  {
248  //printf("ThreadSanitizerLogger::emitLog(%d, %s, %s)\n", level, origin, msg.c_str());
249  tomo_internal::ThreadSanitizerLoggerHelper<BaseLogger,
251  ::emitLog(
252  _baselogger, level, origin, msg
253  );
254  }
255 
257  template<bool dummy = true>
259  filterByOrigin(int level, const char * origin) const
260  {
261  return tomo_internal::ThreadSanitizerLoggerHelper<BaseLogger,
263  ::filterByOrigin(
264  _baselogger, level, origin
265  );
266  }
267 };
268 
269 } // namespace OMP
270 } // namespace MultiProc
271 
272 namespace Logger {
279 template<typename BaseLogger>
280 struct TOMOGRAPHER_EXPORT LoggerTraits<MultiProc::OMP::ThreadSanitizerLogger<BaseLogger> >
281  : public LoggerTraits<BaseLogger>
282 {
284  enum {
287  HasOwnGetLevel = 0,
289  IsThreadSafe = 1
290  };
291 };
292 } // namespace Logger
293 
294 
295 namespace MultiProc {
296 namespace OMP {
297 
298 
356 template<typename TaskType_, typename TaskCData_, typename LoggerType_,
357  typename TaskCountIntType_ = int,
358  typename TaskLoggerType_ = ThreadSanitizerLogger<LoggerType_> >
359 class TOMOGRAPHER_EXPORT TaskDispatcher
360 {
361 public:
363  typedef TaskType_ TaskType;
365  typedef typename TaskType::ResultType TaskResultType;
367  typedef typename TaskType::StatusReportType TaskStatusReportType;
369  typedef TaskCData_ TaskCData;
371  typedef LoggerType_ LoggerType;
373  typedef TaskCountIntType_ TaskCountIntType;
375  typedef TaskLoggerType_ TaskLoggerType;
378 
385 
386 private:
387 
388  typedef
389 #if defined(__GNUC__) && __GNUC__ == 4 && __GNUC_MINOR__ <= 6 && !defined(__clang__)
390  std::chrono::monotonic_clock // for GCC/G++ 4.6
391 #else
393 #endif
394  StdClockType;
395 
396  struct TaskInterruptedInnerException : public std::exception {
397  std::string msg;
398  public:
399  TaskInterruptedInnerException() : msg("Task Interrupted") { }
400  virtual ~TaskInterruptedInnerException() throw() { };
401  const char * what() const throw() { return msg.c_str(); }
402  };
403  struct TaskInnerException : public std::exception {
404  std::string msg;
405  public:
406  TaskInnerException(std::string msgexc) : msg("Task raised an exception: "+msgexc) { }
407  virtual ~TaskInnerException() throw() { };
408  const char * what() const throw() { return msg.c_str(); }
409  };
410 
412  struct thread_shared_data {
413  thread_shared_data(const TaskCData * pcdata_, LoggerType & logger_,
414  TaskCountIntType num_total_runs_, TaskCountIntType n_chunk_)
415  : pcdata(pcdata_),
416  results(),
417  logger(logger_),
418  time_start(),
419  status_report_underway(false),
420  status_report_initialized(false),
421  status_report_ready(false),
422  status_report_counter(0),
423  status_report_periodic_interval(-1),
424  status_report_numreportsrecieved(0),
425  status_report_full(),
426  status_report_user_fn(),
427  interrupt_requested(0),
428  num_total_runs(num_total_runs_), n_chunk(n_chunk_), num_completed(0),
429  num_active_working_threads(0)
430  { }
431 
432  const TaskCData * pcdata;
434  LoggerType & logger;
435 
436  StdClockType::time_point time_start;
437 
438  bool status_report_underway;
439  bool status_report_initialized;
440  bool status_report_ready;
441  volatile std::sig_atomic_t status_report_counter;
442  int status_report_periodic_interval;
443  TaskCountIntType status_report_numreportsrecieved;
444 
445  FullStatusReportType status_report_full;
446  FullStatusReportCallbackType status_report_user_fn;
447 
448  volatile std::sig_atomic_t interrupt_requested;
449 
450  TaskCountIntType num_total_runs;
451  TaskCountIntType n_chunk;
452  TaskCountIntType num_completed;
453 
454  TaskCountIntType num_active_working_threads;
455  };
457  struct thread_private_data
458  {
459  thread_shared_data * shared_data;
460 
461  TaskLoggerType * logger;
462 
463  TaskCountIntType kiter;
464  int local_status_report_counter;
465 
466  inline bool statusReportRequested() const
467  {
468  if (shared_data->interrupt_requested) {
469  throw TaskInterruptedInnerException();
470  }
471 
472  //
473  // if we're the master thread, we have some admin to do.
474  //
475  // NOTE: #pragma omp master prevents us from throwing an exception! (at least on clang++3.8)
476  if (omp_get_thread_num() == 0) {
477  // Update the status_report_counter according to whether
478  // we should provoke a periodic status report
479  if (shared_data->status_report_periodic_interval > 0) {
480  _master_thread_update_status_report_periodic_interval_counter();
481  }
482 
483  // if we're the master thread, then also check if there is a status report ready
484  // to be sent.
485  if (shared_data->status_report_ready) {
486  bool got_exception = false;
487  std::string exception_str;
488 #pragma omp critical
489  {
490  try {
491  // call user-defined status report handler
492  shared_data->status_report_user_fn(std::move(shared_data->status_report_full));
493  // all reports recieved: done --> reset our status_report_* flags
494  shared_data->status_report_numreportsrecieved = 0;
495  shared_data->status_report_underway = false;
496  shared_data->status_report_initialized = false;
497  shared_data->status_report_ready = false;
498  shared_data->status_report_full.workers_running.clear();
499  shared_data->status_report_full.workers_reports.clear();
500  } catch (...) {
501  got_exception = true;
502  exception_str = std::string("Caught exception in status reporting callback: ")
503  + boost::current_exception_diagnostic_information();
504  }
505  }
506  if (got_exception) {
507  throw TaskInnerException(exception_str);
508  }
509  }
510  } // omp master
511 
512  return local_status_report_counter != (int)shared_data->status_report_counter;
513  }
514 
515  // internal use only:
516  inline void _master_thread_update_status_report_periodic_interval_counter() const
517  {
518  shared_data->status_report_counter = (
519  std::sig_atomic_t(std::chrono::duration_cast<std::chrono::milliseconds>(
520  StdClockType::now().time_since_epoch()
521  ).count() / shared_data->status_report_periodic_interval) & 0x00FFFFFF
522  ) << 6;
523  // the (x << 6) (equivalent to (x * 64)) allows individual increments from
524  // unrelated additional requestStatusReport() to be taken into account (allows 64
525  // such additional requests per periodic status report)
526  }
527 
528  inline void submitStatusReport(const TaskStatusReportType &statreport)
529  {
530  if (local_status_report_counter == (int)shared_data->status_report_counter) {
531  // error: task submitted unsollicited report
532  logger->warning("OMP TaskDispatcher/taskmanageriface", "Task submitted unsollicited status report");
533  return;
534  }
535 
536 
537  bool got_exception = false;
538  std::string exception_str;
539 #pragma omp critical
540  {
541  try {
542  bool ok = true; // whether to proceed or not
543 
544  // we've reacted to the given "signal"
545  local_status_report_counter = shared_data->status_report_counter;
546 
547  // add our status report to being-prepared status report in the shared data
548  int threadnum = omp_get_thread_num();
549 
550  // Important: access the original logger, not the thread-safe-wrapped logger!!
551  // Otherwise this could lead to deadlocks because of nested critical blocks.
552  shared_data->logger.longdebug("OMP TaskDispatcher/taskmanageriface", [&](std::ostream & stream) {
553  stream << "status report received for thread #" << threadnum << ", treating it ...";
554  });
555 
556  //
557  // If we're the first reporting thread, we need to initiate the status reporing
558  // procedure and initialize the general data
559  //
560  if (!shared_data->status_report_initialized) {
561 
562  //
563  // Check that we indeed have to submit a status report.
564  //
565  if (shared_data->status_report_underway) {
566  // status report already underway!
567  // Important: access the original logger, not the thread-safe-wrapped logger!!
568  // Otherwise this could lead to deadlocks because of nested critical blocks.
569  shared_data->logger.warning("OMP TaskDispatcher/taskmanageriface",
570  "status report already underway!");
571  ok = false;
572  }
573  if (!shared_data->status_report_user_fn) {
574  // no user handler set
575  // Important: access the original logger, not the thread-safe-wrapped logger!!
576  // Otherwise this could lead to deadlocks because of nested critical blocks.
577  shared_data->logger.warning("OMP TaskDispatcher/taskmanageriface",
578  "no user status report handler set!"
579  " call setStatusReportHandler() first.");
580  ok = false;
581  }
582 
583  // since we can't return out of a critical section, we use an if block.
584  if (ok) {
585 
586  shared_data->status_report_underway = true;
587  shared_data->status_report_initialized = true;
588  shared_data->status_report_ready = false;
589 
590  // initialize status report object & overall data
591  shared_data->status_report_full = FullStatusReportType();
592  shared_data->status_report_full.num_completed = shared_data->num_completed;
593  shared_data->status_report_full.num_total_runs = shared_data->num_total_runs;
594  shared_data->status_report_full.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
595  StdClockType::now() - shared_data->time_start
596  ).count() * 1e-3;
597  // shared_data->status_report_full.num_active_working_threads = shared_data->num_active_working_threads;
598  int num_threads = omp_get_num_threads();
599  // shared_data->status_report_full.num_threads = num_threads;
600 
601  // initialize task-specific reports
602  // fill our lists with default-constructed values & set all running to false.
603  shared_data->status_report_full.workers_running.clear();
604  shared_data->status_report_full.workers_reports.clear();
605  shared_data->status_report_full.workers_running.resize((std::size_t)num_threads, false);
606  shared_data->status_report_full.workers_reports.resize((std::size_t)num_threads);
607 
608  shared_data->status_report_numreportsrecieved = 0;
609 
610  // Important: access the original logger, not the thread-safe-wrapped logger!!
611  // Otherwise this could lead to deadlocks because of nested critical blocks.
612  shared_data->logger.debug("OMP TaskDispatcher/taskmanageriface", [&](std::ostream & stream) {
613  stream << "vectors resized to workers_running.size()="
614  << shared_data->status_report_full.workers_running.size()
615  << " and workers_reports.size()="
616  << shared_data->status_report_full.workers_reports.size()
617  << ".";
618  });
619  }
620 
621  } // status_report_initialized
622 
623  // if we're the first reporting thread, then maybe ok was set to false above, so
624  // check again.
625  if (ok) {
626 
627  //
628  // Report the data corresponding to this thread.
629  //
630  // Important: access the original logger, not the thread-safe-wrapped logger!!
631  // Otherwise this could lead to deadlocks because of nested critical blocks.
632  shared_data->logger.debug("OMP TaskDispatcher/taskmanageriface", "threadnum=%ld, workers_reports.size()=%ld",
633  (long)threadnum, (long)shared_data->status_report_full.workers_reports.size());
634 
635  tomographer_assert(0 <= threadnum &&
636  (std::size_t)threadnum < shared_data->status_report_full.workers_reports.size());
637 
638  shared_data->status_report_full.workers_running[(std::size_t)threadnum] = true;
639  shared_data->status_report_full.workers_reports[(std::size_t)threadnum] = statreport;
640 
641  ++ shared_data->status_report_numreportsrecieved;
642 
643  if (shared_data->status_report_numreportsrecieved == shared_data->num_active_working_threads) {
644  //
645  // the report is ready to be transmitted to the user: go!
646  //
647  // Don't send it quite yet, queue it for the master thread to send. We add
648  // this guarantee so that the status report handler can do things which only
649  // the master thread can do (e.g. in Python, call PyErr_CheckSignals()).
650  shared_data->status_report_ready = true;
651  }
652  } // if ok
653  } catch (...) {
654  // std::string msg(boost::current_exception_diagnostic_information());
655  // fprintf(stderr, "CAUGHT AN EXCEPTION: %s\n", msg.c_str());
656 
657  // if an exception occurred propagate it out to end the task and cause an
658  // interruption. The user may not know what caused the interrupt. Don't
659  // terminate or cause a huge fuss, as this might be actually controlled
660  // (e.g. boost::python::error_already_set) Also, the logger itself may have
661  // caused the exception, so don't use the logger here!!:
662  //shared_data->logger.debug("OMP TaskDispatcher/taskmanageriface", [&](std::ostream & stream) {
663  // stream << "Error while processing status report, exception caught: "
664  // << boost::current_exception_diagnostic_information();
665  // });
666  got_exception = true;
667  exception_str = std::string("Caught exception while processing status report: ")
668  + boost::current_exception_diagnostic_information();
669  }
670  } // omp critical
671  if (got_exception) {
672  throw TaskInnerException(exception_str);
673  }
674  }
675  };
676 
677  thread_shared_data shared_data;
678 
679 public:
698  TaskDispatcher(TaskCData * pcdata_, LoggerType & logger_, TaskCountIntType num_total_runs_,
699  TaskCountIntType n_chunk_ = 1)
700  : shared_data(pcdata_, logger_, num_total_runs_, n_chunk_)
701  {
702  }
703 
704  ~TaskDispatcher()
705  {
706  for (auto r : shared_data.results) {
707  if (r != NULL) {
708  delete r;
709  }
710  }
711  }
712 
717  void run()
718  {
719  shared_data.logger.debug("MultiProc::OMP::TaskDispatcher::run()", "Let's go!");
720  shared_data.time_start = StdClockType::now();
721 
722  shared_data.results.resize((std::size_t)shared_data.num_total_runs, NULL);
723 
724  shared_data.logger.debug("MultiProc::OMP::TaskDispatcher::run()", "preparing for parallel runs");
725 
726 #ifndef _OPENMP
727  shared_data.logger.warning("MultiProc::OMP::TaskDispatcher::run()", "OpenMP is disabled; tasks will run serially.");
728 #endif
729 
730  // declaring these as "const" causes a weird compiler error
731  // "`n_chunk' is predetermined `shared' for `shared'"
732  TaskCountIntType num_total_runs = shared_data.num_total_runs;
733  TaskCountIntType n_chunk = shared_data.n_chunk;
734  (void)n_chunk; // silence "unused variable" warning when compiling without OMP support
735 
736  TaskCountIntType k = 0;
737 
738  thread_shared_data *shdat = &shared_data;
739  thread_private_data privdat;
740 
741  shared_data.logger.debug("MultiProc::OMP::TaskDispatcher::run()", "About to start parallel section");
742 
743  int num_active_parallel = 0;
744 
745  std::string inner_exception;
746 
747 #pragma omp parallel default(none) private(k, privdat) shared(shdat, num_total_runs, n_chunk, num_active_parallel, inner_exception)
748  {
749  privdat.shared_data = shdat;
750  privdat.kiter = 0;
751 
752 #pragma omp atomic
753  ++num_active_parallel;
754 
755  //
756  // The main, parallel FOR loop:
757  //
758 #pragma omp for schedule(dynamic,n_chunk) nowait
759  for (k = 0; k < num_total_runs; ++k) {
760 
761  try {
762 
763  // make separate function call, so that we can tell GCC to realign the stack on
764  // platforms which don't do that automatically (yes, MinGW, it's you I'm looking
765  // at)
766  _run_task(privdat, shdat, k);
767 
768  } catch (...) {
769 #pragma omp critical
770  {
771  shdat->interrupt_requested = 1;
772  inner_exception += std::string("Exception caught inside task: ")
773  + boost::current_exception_diagnostic_information() + "\n";
774  }
775  }
776 
777  } // omp for
778 
779 #pragma omp atomic
780  --num_active_parallel;
781 
782 #pragma omp master
783  {
784  if (shdat->status_report_periodic_interval > 0) {
785  // master thread should continue providing regular status updates
786  while (num_active_parallel > 0) {
787  TOMOGRAPHER_SLEEP_FOR_MS(shdat->status_report_periodic_interval);
788  privdat._master_thread_update_status_report_periodic_interval_counter();
789  }
790  }
791  }
792 
793  } // omp parallel
794 
795  if (inner_exception.size()) {
796  // interrupt was requested because of an inner exception, not an explicit interrupt request
797  throw std::runtime_error(inner_exception);
798  }
799 
800  if (shared_data.interrupt_requested) {
802  }
803 
804  }
805 
809  inline TaskCountIntType numTaskRuns() const {
810  return shared_data.num_total_runs;
811  }
812 
817  return shared_data.results;
818  }
819 
823  inline const TaskResultType & collectedTaskResult(std::size_t k) const {
824  return *shared_data.results[k];
825  }
826 
827 private:
828  void _run_task(thread_private_data & privdat, thread_shared_data * shdat, TaskCountIntType k)
830  {
831 
832  // do not execute task if an interrupt was requested.
833  if (shdat->interrupt_requested) {
834  return;
835  }
836 
837 #pragma omp critical
838  {
839  ++ shdat->num_active_working_threads;
840  privdat.local_status_report_counter = shdat->status_report_counter;
841  }
842 
843  // construct a thread-safe logger we can use
844  TaskLoggerType threadsafelogger(shdat->logger, shdat->pcdata, k);
845 
846  // not sure an std::ostream would be safe here threadwise...?
847  threadsafelogger.longdebug("Tomographer::MultiProc::OMP::TaskDispatcher::_run_task()",
848  "Run #%lu: thread-safe logger set up", (unsigned long)k);
849 
850  // set up our thread-private data
851  privdat.kiter = k;
852  privdat.logger = &threadsafelogger;
853 
854  // not sure an std::ostream would be safe here threadwise...?
855  threadsafelogger.longdebug("Tomographer::MultiProc::OMP::TaskDispatcher::_run_task()",
856  "Run #%lu: querying CData for task input", (unsigned long)k);
857 
858  auto input = shdat->pcdata->getTaskInput(k);
859 
860  // not sure an std::ostream would be safe here threadwise...?
861  threadsafelogger.debug("Tomographer::MultiProc::OMP::TaskDispatcher::_run_task()",
862  "Running task #%lu ...", (unsigned long)k);
863 
864  // construct a new task instance
865  TaskType t(input, shdat->pcdata, threadsafelogger);
866 
867  // not sure an std::ostream would be safe here threadwise...?
868  threadsafelogger.longdebug("Tomographer::MultiProc::OMP::TaskDispatcher::_run_task()",
869  "Task #%lu set up.", (unsigned long)k);
870 
871  // and run it
872  try {
873  t.run(shdat->pcdata, threadsafelogger, &privdat);
874  } catch (const TaskInterruptedInnerException & ) {
875  return;
876  }
877 
878  bool got_exception = false;
879  std::string exception_str;
880 #pragma omp critical
881  {
882  try {
883  shdat->results[(std::size_t)k] = new TaskResultType(t.stealResult());
884  } catch (...) {
885  got_exception = true;
886  exception_str = std::string("Caught exception while storing result: ")
887  + boost::current_exception_diagnostic_information();
888  }
889 
890  if (privdat.local_status_report_counter != (int)shdat->status_report_counter) {
891  // status report request missed by task... do as if we had provided a
892  // report, but don't provide report.
893  ++ shdat->status_report_numreportsrecieved;
894  }
895 
896  ++ shdat->num_completed;
897  -- shdat->num_active_working_threads;
898  } // omp critical
899  if (got_exception) {
900  throw std::runtime_error(exception_str);
901  }
902 
903  }
904 
905 public:
906 
918  inline void setStatusReportHandler(FullStatusReportCallbackType fnstatus)
919  {
920 #pragma omp critical
921  {
922  shared_data.status_report_user_fn = fnstatus;
923  }
924  }
925 
936  inline void requestStatusReport()
937  {
938  //
939  // This function can be called from a signal handler. We essentially can't do
940  // anything here because the state of the program can be pretty much anything,
941  // including inside a malloc() or gomp lock. So can't call any function which needs
942  // malloc or a #pragma omp critical.
943  //
944  // So just increment an atomic int.
945  //
946 
947  shared_data.status_report_counter = (shared_data.status_report_counter + 1) & 0x7f;
948 
949  }
950 
958  inline void requestPeriodicStatusReport(int milliseconds)
959  {
960 #pragma omp critical
961  {
962  shared_data.status_report_periodic_interval = milliseconds;
963  }
964  }
965 
977  inline void requestInterrupt()
978  {
979  shared_data.interrupt_requested = 1;
980  }
981 
982 
983 }; // class TaskDispatcher
984 
985 
989 template<typename TaskType_, typename TaskCData_,
990  typename LoggerType_, typename TaskCountIntType_ = int>
991 inline TaskDispatcher<TaskType_, TaskCData_,
992  LoggerType_, TaskCountIntType_>
993 makeTaskDispatcher(TaskCData_ * pcdata_, LoggerType_ & logger_,
994  TaskCountIntType_ num_total_runs_, TaskCountIntType_ n_chunk_ = 1)
995 {
996  // RVO should be rather obvious to the compiler
997  return TaskDispatcher<TaskType_, TaskCData_,
998  LoggerType_, TaskCountIntType_>(
999  pcdata_, logger_, num_total_runs_, n_chunk_
1000  );
1001 }
1002 
1003 
1004 
1005 } // namespace OMP
1006 } // namespace MultiProc
1007 
1008 } // namespace Tomographer
1009 
1010 
1011 
1012 
1013 
1014 #endif
void requestPeriodicStatusReport(int milliseconds)
Request a periodic status report.
Definition: multiprocomp.h:958
void requestStatusReport()
Request a status report.
Definition: multiprocomp.h:936
ThreadSanitizerLogger(BaseLogger &logger, MoreArgs &&...)
Constructor.
Definition: multiprocomp.h:227
Base namespace for the Tomographer project.
Definition: densellh.h:45
const TaskResultType & collectedTaskResult(std::size_t k) const
Get the result of a specific given task.
Definition: multiprocomp.h:823
TaskCountIntType_ TaskCountIntType
Integer type used to count the number of tasks to run (or running)
Definition: multiprocomp.h:373
void run()
Run the specified tasks.
Definition: multiprocomp.h:717
LoggerType_ LoggerType
The logger type specified to the dispatcher (not necessarily thread-safe)
Definition: multiprocomp.h:371
void emitLog(int level, const char *origin, const std::string &msg)
Implementation of Logger::LoggerBase::emitLog()
Definition: multiprocomp.h:246
T duration_cast(T... args)
std::function< void(const FullStatusReportType &)> FullStatusReportCallbackType
The relevant type for a callback function (or callable) which is provided with the full status report...
Definition: multiprocomp.h:384
Base logger class.
Definition: loggers.h:437
TaskType::ResultType TaskResultType
The type representing the result of a single task run.
Definition: multiprocomp.h:365
STL class.
const std::vector< TaskResultType * > & collectedTaskResults() const
Get all the task results.
Definition: multiprocomp.h:816
#define TOMOGRAPHER_CXX_STACK_FORCE_REALIGN
Definition: cxxutil.h:433
FullStatusReport< TaskStatusReportType, TaskCountIntType > FullStatusReportType
The type to use to generate a full status report of all running tasks.
Definition: multiprocomp.h:377
void setStatusReportHandler(FullStatusReportCallbackType fnstatus)
assign a callable to be called whenever a status report is requested
Definition: multiprocomp.h:918
std::enable_if< dummy &&Logger::LoggerTraits< BaseLogger >::HasFilterByOrigin, bool >::type filterByOrigin(int level, const char *origin) const
Implementation of Logger::LoggerBase::filterByOrigin()
Definition: multiprocomp.h:259
TaskDispatcher< TaskType_, TaskCData_, LoggerType_, TaskCountIntType_ > makeTaskDispatcher(TaskCData_ *pcdata_, LoggerType_ &logger_, TaskCountIntType_ num_total_runs_, TaskCountIntType_ n_chunk_=1)
Create an OMP task dispatcher. Useful if you want C++&#39;s template argument deduction mechanism...
Definition: multiprocomp.h:993
TaskLoggerType_ TaskLoggerType
A thread-safe logger type which is passed on to the child tasks.
Definition: multiprocomp.h:375
Some C++ utilities, with a tad of C++11 tricks.
STL class.
T move(T... args)
TaskType::StatusReportType TaskStatusReportType
The type used by a single task when providing a status report.
Definition: multiprocomp.h:367
Managing the need for specific overrides to operator new() for some types (especially Eigen types) ...
T size(T... args)
STL class.
Some common definitions for multiprocessing interfaces.
Traits template struct to be specialized for specific Logger implementations.
Definition: loggers.h:357
T c_str(T... args)
Wrapper logger to call non-thread-safe loggers from a multithreaded environment.
Definition: multiprocomp.h:212
Dispatches tasks to parallel threads using OpenMP.
Definition: multiprocomp.h:359
TaskCountIntType numTaskRuns() const
Total number of task run instances.
Definition: multiprocomp.h:809
TaskCData_ TaskCData
The type which stores constant, shared data for all tasks to access.
Definition: multiprocomp.h:369
void requestInterrupt()
Request an immediate interruption of the tasks.
Definition: multiprocomp.h:977
STL class.
#define tomographer_assert(...)
Assertion test macro.
Definition: cxxdefs.h:84
TaskDispatcher(TaskCData *pcdata_, LoggerType &logger_, TaskCountIntType num_total_runs_, TaskCountIntType n_chunk_=1)
Task dispatcher constructor.
Definition: multiprocomp.h:698
Utilities for logging messages.
A complete status report, abstract version.
Definition: multiproc.h:85