Loading [MathJax]/extensions/tex2jax.js
Tomographerv4.1
Tomographer C++ Framework Documentation
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
multiprocomp.h
Go to the documentation of this file.
1 /* This file is part of the Tomographer project, which is distributed under the
2  * terms of the MIT license.
3  *
4  * The MIT License (MIT)
5  *
6  * Copyright (c) 2016 ETH Zurich, Institute for Theoretical Physics, Philippe Faist
7  * Copyright (c) 2017 Caltech, Institute for Quantum Information and Matter, Philippe Faist
8  *
9  * Permission is hereby granted, free of charge, to any person obtaining a copy
10  * of this software and associated documentation files (the "Software"), to deal
11  * in the Software without restriction, including without limitation the rights
12  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
13  * copies of the Software, and to permit persons to whom the Software is
14  * furnished to do so, subject to the following conditions:
15  *
16  * The above copyright notice and this permission notice shall be included in
17  * all copies or substantial portions of the Software.
18  *
19  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
24  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
25  * SOFTWARE.
26  */
27 
28 #ifndef MULTIPROCOMP_H
29 #define MULTIPROCOMP_H
30 
31 #include <csignal>
32 #include <chrono>
33 #include <thread>
34 #include <stdexcept>
35 
36 #ifdef _OPENMP
37 #include <omp.h>
38 #else
39 inline constexpr int omp_get_thread_num() { return 0; }
40 inline constexpr int omp_get_num_threads() { return 1; }
41 #endif
42 
43 #include <boost/exception/diagnostic_information.hpp>
44 
46 #include <tomographer/tools/cxxutil.h> // tomographer_assert()
48 #include <tomographer/multiproc.h>
49 
50 #ifdef TOMOGRAPHER_USE_WINDOWS_SLEEP
51 // use MS Window's Sleep() function
52 #include <windows.h>
53 #define TOMOGRAPHER_SLEEP_FOR_MS(x) Sleep((x))
54 #else
55 // normal C++11 API function, not available on mingw32 w/ win threads
56 #define TOMOGRAPHER_SLEEP_FOR_MS(x) std::this_thread::sleep_for(std::chrono::milliseconds((x)))
57 #endif
58 
59 
69 namespace Tomographer {
70 namespace MultiProc {
71 namespace OMP {
72 
73 namespace tomo_internal {
74 
89 template<typename BaseLogger, bool baseLoggerIsThreadSafe>
90 struct ThreadSanitizerLoggerHelper
91 {
92  static inline void emitLog(BaseLogger & baselogger, int level, const char * origin, const std::string & msg)
93  {
94  bool got_exception = false;
95  std::string exception_str;
96 #pragma omp critical
97  {
98  //fprintf(stderr, "ThreadSanitizerLoggerHelper::emitLog(%d, %s, %s) -- OMP CRITICAL\n", level, origin, msg.c_str());
99  try {
100  baselogger.emitLog(level, origin, msg);
101  } catch (...) {
102  got_exception = true;
103  exception_str = std::string("Caught exception in emitLog: ") + boost::current_exception_diagnostic_information();
104  }
105  }
106  if (got_exception) {
107  throw std::runtime_error(exception_str);
108  }
109  }
110  static inline bool filterByOrigin(BaseLogger & baselogger, int level, const char * origin)
111  {
112  bool got_exception = false;
113  std::string exception_str;
114 
115  bool ok = true;
116 #pragma omp critical
117  {
118  //fprintf(stderr, "ThreadSanitizerLoggerHelper::filterByOrigin(%d, %s) -- OMP CRITICAL\n", level, origin);
119  try {
120  ok = baselogger.filterByOrigin(level, origin);
121  } catch (...) {
122  got_exception = true;
123  exception_str = std::string("Caught exception in filterByOrigni: ")
124  + boost::current_exception_diagnostic_information();
125  }
126  }
127  if (got_exception) {
128  throw std::runtime_error(exception_str);
129  }
130  return ok;
131  }
132 };
133 
134 //
135 // specialize the helper for when logging to a thread-safe base logger. No critical
136 // section needed because the logger is already thread-safe.
137 //
138 template<typename BaseLogger>
139 struct ThreadSanitizerLoggerHelper<BaseLogger, true>
140  {
141  static inline void emitLog(BaseLogger & baselogger, int level, const char * origin, const std::string & msg)
142  {
143  //fprintf(stderr, "ThreadSanitizerLoggerHelper::emitLog(%d, %s, %s) -- NORMAL\n", level, origin, msg.c_str());
144  baselogger.emitLog(level, origin, msg);
145  }
146  static inline bool filterByOrigin(BaseLogger & baselogger, int level, const char * origin)
147  {
148  //fprintf(stderr, "ThreadSanitizerLoggerHelper::filterByOrigin(%d, %s) -- NORMAL\n", level, origin);
149  return baselogger.filterByOrigin(level, origin);
150  }
151 };
152 
153 } // namespace tomo_internal
154 
155 
207 template<typename BaseLogger>
208 TOMOGRAPHER_EXPORT class ThreadSanitizerLogger
209  : public Logger::LoggerBase<ThreadSanitizerLogger<BaseLogger> >
210 {
211  BaseLogger & _baselogger;
212 public:
213 
222  template<typename... MoreArgs>
223  ThreadSanitizerLogger(BaseLogger & logger, MoreArgs&&...)
224  // NOTE: pass the baselogger's level on here. The ThreadSanitizerLogger's level is
225  // this one, and is fixed and cannot be changed while running.
226  : Logger::LoggerBase<ThreadSanitizerLogger<BaseLogger> >(logger.level()),
227  _baselogger(logger)
228  {
229  // when you have to debug the debug log mechanism... lol
230  //printf("ThreadSanitizerLogger(): object created\n");
231  //_baselogger.debug("ThreadSanitizerLogger()", "log from constructor.");
232  //emitLog(Logger::DEBUG, "ThreadSanitizerLogger!", "emitLog from constructor");
233  //LoggerBase<ThreadSanitizerLogger<BaseLogger> >::debug("ThreadSanitizerLogger", "debug from constructor");
234  }
235 
237  {
238  }
239 
240 
242  inline void emitLog(int level, const char * origin, const std::string& msg)
243  {
244  //printf("ThreadSanitizerLogger::emitLog(%d, %s, %s)\n", level, origin, msg.c_str());
245  tomo_internal::ThreadSanitizerLoggerHelper<BaseLogger,
247  ::emitLog(
248  _baselogger, level, origin, msg
249  );
250  }
251 
253  template<bool dummy = true>
255  filterByOrigin(int level, const char * origin) const
256  {
257  return tomo_internal::ThreadSanitizerLoggerHelper<BaseLogger,
259  ::filterByOrigin(
260  _baselogger, level, origin
261  );
262  }
263 };
264 
265 } // namespace OMP
266 } // namespace MultiProc
267 
268 namespace Logger {
275 template<typename BaseLogger>
276 struct LoggerTraits<MultiProc::OMP::ThreadSanitizerLogger<BaseLogger> > : public LoggerTraits<BaseLogger>
277 {
279  enum {
282  HasOwnGetLevel = 0,
284  IsThreadSafe = 1
285  };
286 };
287 } // namespace Logger
288 
289 
290 namespace MultiProc {
291 namespace OMP {
292 
293 
350 template<typename TaskType_, typename TaskCData_, typename ResultsCollector_,
351  typename LoggerType_, typename CountIntType_ = int,
352  typename TaskLoggerType_ = ThreadSanitizerLogger<LoggerType_> >
353 TOMOGRAPHER_EXPORT class TaskDispatcher
354 {
355 public:
357  typedef TaskType_ TaskType;
359  typedef typename TaskType::StatusReportType TaskStatusReportType;
361  typedef TaskCData_ TaskCData;
363  typedef ResultsCollector_ ResultsCollector;
365  typedef LoggerType_ LoggerType;
367  typedef CountIntType_ CountIntType;
369  typedef TaskLoggerType_ TaskLoggerType;
372 
379 
380 private:
381 
382  typedef
383 #if defined(__GNUC__) && __GNUC__ == 4 && __GNUC_MINOR__ <= 6 && !defined(__clang__)
384  std::chrono::monotonic_clock // for GCC/G++ 4.6
385 #else
387 #endif
388  StdClockType;
389 
390  struct TaskInterruptedInnerException : public std::exception {
391  std::string msg;
392  public:
393  TaskInterruptedInnerException() : msg("Task Interrupted") { }
394  virtual ~TaskInterruptedInnerException() throw() { };
395  const char * what() const throw() { return msg.c_str(); }
396  };
397  struct TaskInnerException : public std::exception {
398  std::string msg;
399  public:
400  TaskInnerException(std::string msgexc) : msg("Task raised an exception: "+msgexc) { }
401  virtual ~TaskInnerException() throw() { };
402  const char * what() const throw() { return msg.c_str(); }
403  };
404 
406  struct thread_shared_data {
407  thread_shared_data(const TaskCData * pcdata_, ResultsCollector * results_, LoggerType & logger_,
408  CountIntType num_total_runs_, CountIntType n_chunk_)
409  : pcdata(pcdata_),
410  results(results_),
411  logger(logger_),
412  time_start(),
413  status_report_underway(false),
414  status_report_initialized(false),
415  status_report_ready(false),
416  status_report_counter(0),
417  status_report_periodic_interval(-1),
418  status_report_numreportsrecieved(0),
419  status_report_full(),
420  status_report_user_fn(),
421  interrupt_requested(0),
422  num_total_runs(num_total_runs_), n_chunk(n_chunk_), num_completed(0),
423  num_active_working_threads(0)
424  { }
425 
426  const TaskCData * pcdata;
427  ResultsCollector * results;
428  LoggerType & logger;
429 
430  StdClockType::time_point time_start;
431 
432  bool status_report_underway;
433  bool status_report_initialized;
434  bool status_report_ready;
435  volatile std::sig_atomic_t status_report_counter;
436  int status_report_periodic_interval;
437  CountIntType status_report_numreportsrecieved;
438 
439  FullStatusReportType status_report_full;
440  FullStatusReportCallbackType status_report_user_fn;
441 
442  volatile std::sig_atomic_t interrupt_requested;
443 
444  CountIntType num_total_runs;
445  CountIntType n_chunk;
446  CountIntType num_completed;
447 
448  CountIntType num_active_working_threads;
449  };
451  struct thread_private_data
452  {
453  thread_shared_data * shared_data;
454 
455  TaskLoggerType * logger;
456 
457  CountIntType kiter;
458  CountIntType local_status_report_counter;
459 
460  inline bool statusReportRequested() const
461  {
462  if (shared_data->interrupt_requested) {
463  throw TaskInterruptedInnerException();
464  }
465 
466  //
467  // if we're the master thread, we have some admin to do.
468  //
469  // NOTE: #pragma omp master prevents us from throwing an exception! (at least on clang++3.8)
470  if (omp_get_thread_num() == 0) {
471  // Update the status_report_counter according to whether
472  // we should provoke a periodic status report
473  if (shared_data->status_report_periodic_interval > 0) {
474  _master_thread_update_status_report_periodic_interval_counter();
475  }
476 
477  // if we're the master thread, then also check if there is a status report ready
478  // to be sent.
479  if (shared_data->status_report_ready) {
480  bool got_exception = false;
481  std::string exception_str;
482 #pragma omp critical
483  {
484  try {
485  // call user-defined status report handler
486  shared_data->status_report_user_fn(shared_data->status_report_full);
487  // all reports recieved: done --> reset our status_report_* flags
488  shared_data->status_report_numreportsrecieved = 0;
489  shared_data->status_report_underway = false;
490  shared_data->status_report_initialized = false;
491  shared_data->status_report_ready = false;
492  shared_data->status_report_full.workers_running.clear();
493  shared_data->status_report_full.workers_reports.clear();
494  } catch (...) {
495  got_exception = true;
496  exception_str = std::string("Caught exception in status reporting callback: ")
497  + boost::current_exception_diagnostic_information();
498  }
499  }
500  if (got_exception) {
501  throw TaskInnerException(exception_str);
502  }
503  }
504  } // omp master
505 
506  return (int)local_status_report_counter != (int)shared_data->status_report_counter;
507  }
508 
509  // internal use only:
510  inline void _master_thread_update_status_report_periodic_interval_counter() const
511  {
512  shared_data->status_report_counter = (
514  StdClockType::now().time_since_epoch()
515  ).count() / shared_data->status_report_periodic_interval) & 0x00FFFFFF
516  ) << 6;
517  // the (x << 6) (equivalent to (x * 64)) allows individual increments from
518  // unrelated additional requestStatusReport() to be taken into account (allows 64
519  // such additional requests per periodic status report)
520  }
521 
522  inline void submitStatusReport(const TaskStatusReportType &statreport)
523  {
524  if ((int)local_status_report_counter == (int)shared_data->status_report_counter) {
525  // error: task submitted unsollicited report
526  logger->warning("OMP TaskDispatcher/taskmanageriface", "Task submitted unsollicited status report");
527  return;
528  }
529 
530 
531  bool got_exception = false;
532  std::string exception_str;
533 #pragma omp critical
534  {
535  try {
536  bool ok = true; // whether to proceed or not
537 
538  // we've reacted to the given "signal"
539  local_status_report_counter = shared_data->status_report_counter;
540 
541  // add our status report to being-prepared status report in the shared data
542  int threadnum = omp_get_thread_num();
543 
544  // Important: access the original logger, not the thread-safe-wrapped logger!!
545  // Otherwise this could lead to deadlocks because of nested critical blocks.
546  shared_data->logger.longdebug("OMP TaskDispatcher/taskmanageriface", [&](std::ostream & stream) {
547  stream << "status report received for thread #" << threadnum << ", treating it ...";
548  });
549 
550  //
551  // If we're the first reporting thread, we need to initiate the status reporing
552  // procedure and initialize the general data
553  //
554  if (!shared_data->status_report_initialized) {
555 
556  //
557  // Check that we indeed have to submit a status report.
558  //
559  if (shared_data->status_report_underway) {
560  // status report already underway!
561  // Important: access the original logger, not the thread-safe-wrapped logger!!
562  // Otherwise this could lead to deadlocks because of nested critical blocks.
563  shared_data->logger.warning("OMP TaskDispatcher/taskmanageriface",
564  "status report already underway!");
565  ok = false;
566  }
567  if (!shared_data->status_report_user_fn) {
568  // no user handler set
569  // Important: access the original logger, not the thread-safe-wrapped logger!!
570  // Otherwise this could lead to deadlocks because of nested critical blocks.
571  shared_data->logger.warning("OMP TaskDispatcher/taskmanageriface",
572  "no user status report handler set!"
573  " call setStatusReportHandler() first.");
574  ok = false;
575  }
576 
577  // since we can't return out of a critical section, we use an if block.
578  if (ok) {
579 
580  shared_data->status_report_underway = true;
581  shared_data->status_report_initialized = true;
582  shared_data->status_report_ready = false;
583 
584  // initialize status report object & overall data
585  shared_data->status_report_full = FullStatusReportType();
586  shared_data->status_report_full.num_completed = shared_data->num_completed;
587  shared_data->status_report_full.num_total_runs = shared_data->num_total_runs;
588  shared_data->status_report_full.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
589  StdClockType::now() - shared_data->time_start
590  ).count() * 1e-3;
591  // shared_data->status_report_full.num_active_working_threads = shared_data->num_active_working_threads;
592  int num_threads = omp_get_num_threads();
593  // shared_data->status_report_full.num_threads = num_threads;
594 
595  // initialize task-specific reports
596  // fill our lists with default-constructed values & set all running to false.
597  shared_data->status_report_full.workers_running.clear();
598  shared_data->status_report_full.workers_reports.clear();
599  shared_data->status_report_full.workers_running.resize(num_threads, false);
600  shared_data->status_report_full.workers_reports.resize(num_threads);
601 
602  shared_data->status_report_numreportsrecieved = 0;
603 
604  // Important: access the original logger, not the thread-safe-wrapped logger!!
605  // Otherwise this could lead to deadlocks because of nested critical blocks.
606  shared_data->logger.debug("OMP TaskDispatcher/taskmanageriface", [&](std::ostream & stream) {
607  stream << "vectors resized to workers_running.size()="
608  << shared_data->status_report_full.workers_running.size()
609  << " and workers_reports.size()="
610  << shared_data->status_report_full.workers_reports.size()
611  << ".";
612  });
613  }
614 
615  } // status_report_initialized
616 
617  // if we're the first reporting thread, then maybe ok was set to false above, so
618  // check again.
619  if (ok) {
620 
621  //
622  // Report the data corresponding to this thread.
623  //
624  // Important: access the original logger, not the thread-safe-wrapped logger!!
625  // Otherwise this could lead to deadlocks because of nested critical blocks.
626  shared_data->logger.debug("OMP TaskDispatcher/taskmanageriface", "threadnum=%ld, workers_reports.size()=%ld",
627  (long)threadnum, (long)shared_data->status_report_full.workers_reports.size());
628 
629  tomographer_assert(0 <= threadnum &&
630  (std::size_t)threadnum < shared_data->status_report_full.workers_reports.size());
631 
632  shared_data->status_report_full.workers_running[threadnum] = true;
633  shared_data->status_report_full.workers_reports[threadnum] = statreport;
634 
635  ++ shared_data->status_report_numreportsrecieved;
636 
637  if (shared_data->status_report_numreportsrecieved == shared_data->num_active_working_threads) {
638  //
639  // the report is ready to be transmitted to the user: go!
640  //
641  // Don't send it quite yet, queue it for the master thread to send. We add
642  // this guarantee so that the status report handler can do things which only
643  // the master thread can do (e.g. in Python, call PyErr_CheckSignals()).
644  shared_data->status_report_ready = true;
645  }
646  } // if ok
647  } catch (...) {
648  // std::string msg(boost::current_exception_diagnostic_information());
649  // fprintf(stderr, "CAUGHT AN EXCEPTION: %s\n", msg.c_str());
650 
651  // if an exception occurred propagate it out to end the task and cause an
652  // interruption. The user may not know what caused the interrupt. Don't
653  // terminate or cause a huge fuss, as this might be actually controlled
654  // (e.g. boost::python::error_already_set) Also, the logger itself may have
655  // caused the exception, so don't use the logger here!!:
656  //shared_data->logger.debug("OMP TaskDispatcher/taskmanageriface", [&](std::ostream & stream) {
657  // stream << "Error while processing status report, exception caught: "
658  // << boost::current_exception_diagnostic_information();
659  // });
660  got_exception = true;
661  exception_str = std::string("Caught exception while processing status report: ")
662  + boost::current_exception_diagnostic_information();
663  }
664  } // omp critical
665  if (got_exception) {
666  throw TaskInnerException(exception_str);
667  }
668  }
669  };
670 
671  thread_shared_data shared_data;
672 
673 public:
695  TaskDispatcher(TaskCData * pcdata_, ResultsCollector * results_, LoggerType & logger_,
696  CountIntType num_total_runs_, CountIntType n_chunk_ = 1)
697  : shared_data(pcdata_, results_, logger_, num_total_runs_, n_chunk_)
698  {
699  }
700 
705  void run()
706  {
707  shared_data.logger.debug("MultiProc::OMP::TaskDispatcher::run()", "Let's go!");
708  shared_data.time_start = StdClockType::now();
709 
710  shared_data.results->init(shared_data.num_total_runs, shared_data.n_chunk, shared_data.pcdata);
711 
712  shared_data.logger.debug("MultiProc::OMP::TaskDispatcher::run()", "preparing for parallel runs");
713 
714 #ifndef _OPENMP
715  shared_data.logger.warning("MultiProc::OMP::TaskDispatcher::run()", "OpenMP is disabled; tasks will run serially.");
716 #endif
717 
718  // declaring these as "const" causes a weird compiler error
719  // "`n_chunk' is predetermined `shared' for `shared'"
720  CountIntType num_total_runs = shared_data.num_total_runs;
721  CountIntType n_chunk = shared_data.n_chunk;
722  (void)n_chunk; // silence "unused variable" warning when compiling without OMP support
723 
724  CountIntType k = 0;
725 
726  thread_shared_data *shdat = &shared_data;
727  thread_private_data privdat;
728 
729  shared_data.logger.debug("MultiProc::OMP::TaskDispatcher::run()", "About to start parallel section");
730 
731  int num_active_parallel = 0;
732 
733  std::string inner_exception;
734 
735 #pragma omp parallel default(none) private(k, privdat) shared(shdat, num_total_runs, n_chunk, num_active_parallel, inner_exception)
736  {
737  privdat.shared_data = shdat;
738  privdat.kiter = 0;
739 
740 #pragma omp atomic
741  ++num_active_parallel;
742 
743  //
744  // The main, parallel FOR loop:
745  //
746 #pragma omp for schedule(dynamic,n_chunk) nowait
747  for (k = 0; k < num_total_runs; ++k) {
748 
749  try {
750 
751  // make separate function call, so that we can tell GCC to realign the stack on
752  // platforms which don't do that automatically (yes, MinGW, it's you I'm looking
753  // at)
754  _run_task(privdat, shdat, k);
755 
756  } catch (...) {
757 #pragma omp critical
758  {
759  shdat->interrupt_requested = 1;
760  inner_exception += std::string("Exception caught inside task: ")
761  + boost::current_exception_diagnostic_information() + "\n";
762  }
763  }
764 
765  } // omp for
766 
767 #pragma omp atomic
768  --num_active_parallel;
769 
770 #pragma omp master
771  {
772  if (shdat->status_report_periodic_interval > 0) {
773  // master thread should continue providing regular status updates
774  while (num_active_parallel > 0) {
775  TOMOGRAPHER_SLEEP_FOR_MS(shdat->status_report_periodic_interval);
776  privdat._master_thread_update_status_report_periodic_interval_counter();
777  }
778  }
779  }
780 
781  } // omp parallel
782 
783  if (inner_exception.size()) {
784  // interrupt was requested because of an inner exception, not an explicit interrupt request
785  throw std::runtime_error(inner_exception);
786  }
787 
788  if (shared_data.interrupt_requested) {
790  }
791 
792  shared_data.results->runsFinished(num_total_runs, shared_data.pcdata);
793  }
794 
795 private:
796  void _run_task(thread_private_data & privdat, thread_shared_data * shdat, CountIntType k)
798  {
799 
800  // do not execute task if an interrupt was requested.
801  if (shdat->interrupt_requested) {
802  return;
803  }
804 
805 #pragma omp critical
806  {
807  ++ shdat->num_active_working_threads;
808  privdat.local_status_report_counter = shdat->status_report_counter;
809  }
810 
811  // construct a thread-safe logger we can use
812  TaskLoggerType threadsafelogger(shdat->logger, shdat->pcdata, k);
813 
814  // not sure an std::ostream would be safe here threadwise...?
815  threadsafelogger.longdebug("Tomographer::MultiProc::OMP::TaskDispatcher::_run_task()",
816  "Run #%lu: thread-safe logger set up", (unsigned long)k);
817 
818  // set up our thread-private data
819  privdat.kiter = k;
820  privdat.logger = &threadsafelogger;
821 
822  // not sure an std::ostream would be safe here threadwise...?
823  threadsafelogger.longdebug("Tomographer::MultiProc::OMP::TaskDispatcher::_run_task()",
824  "Run #%lu: querying CData for task input", (unsigned long)k);
825 
826  auto input = shdat->pcdata->getTaskInput(k);
827 
828  // not sure an std::ostream would be safe here threadwise...?
829  threadsafelogger.debug("Tomographer::MultiProc::OMP::TaskDispatcher::_run_task()",
830  "Running task #%lu ...", (unsigned long)k);
831 
832  // construct a new task instance
833  TaskType t(input, shdat->pcdata, threadsafelogger);
834 
835  // not sure an std::ostream would be safe here threadwise...?
836  threadsafelogger.longdebug("Tomographer::MultiProc::OMP::TaskDispatcher::_run_task()",
837  "Task #%lu set up.", (unsigned long)k);
838 
839  // and run it
840  try {
841  t.run(shdat->pcdata, threadsafelogger, &privdat);
842  } catch (const TaskInterruptedInnerException & ) {
843  return;
844  }
845 
846  bool got_exception = false;
847  std::string exception_str;
848 #pragma omp critical
849  {
850  try {
851  shdat->results->collectResult(k, t.getResult(), shdat->pcdata);
852  } catch (...) {
853  got_exception = true;
854  exception_str = std::string("Caught exception while processing status report: ")
855  + boost::current_exception_diagnostic_information();
856  }
857 
858  if ((int)privdat.local_status_report_counter != (int)shdat->status_report_counter) {
859  // status report request missed by task... do as if we had provided a
860  // report, but don't provide report.
861  ++ shdat->status_report_numreportsrecieved;
862  }
863 
864  ++ shdat->num_completed;
865  -- shdat->num_active_working_threads;
866  } // omp critical
867  if (got_exception) {
868  throw std::runtime_error(exception_str);
869  }
870 
871  }
872 
873 public:
874 
886  inline void setStatusReportHandler(FullStatusReportCallbackType fnstatus)
887  {
888 #pragma omp critical
889  {
890  shared_data.status_report_user_fn = fnstatus;
891  }
892  }
893 
904  inline void requestStatusReport()
905  {
906  //
907  // This function can be called from a signal handler. We essentially can't do
908  // anything here because the state of the program can be pretty much anything,
909  // including inside a malloc() or gomp lock. So can't call any function which needs
910  // malloc or a #pragma omp critical.
911  //
912  // So just increment an atomic int.
913  //
914 
915  shared_data.status_report_counter = (shared_data.status_report_counter + 1) & 0x7f;
916 
917  }
918 
926  inline void requestPeriodicStatusReport(int milliseconds)
927  {
928 #pragma omp critical
929  {
930  shared_data.status_report_periodic_interval = milliseconds;
931  }
932  }
933 
945  inline void requestInterrupt()
946  {
947  shared_data.interrupt_requested = 1;
948  }
949 
950 
951 }; // class TaskDispatcher
952 
953 
957 template<typename TaskType_, typename TaskCData_, typename ResultsCollector_,
958  typename LoggerType_, typename CountIntType_ = int>
959 inline TaskDispatcher<TaskType_, TaskCData_, ResultsCollector_,
960  LoggerType_, CountIntType_>
961 makeTaskDispatcher(TaskCData_ * pcdata_, ResultsCollector_ * results_, LoggerType_ & logger_,
962  CountIntType_ num_total_runs_, CountIntType_ n_chunk_ = 1)
963 {
964  // RVO should be rather obvious to the compiler
965  return TaskDispatcher<TaskType_, TaskCData_, ResultsCollector_,
966  LoggerType_, CountIntType_>(
967  pcdata_, results_, logger_, num_total_runs_, n_chunk_
968  );
969 }
970 
971 
972 
973 } // namespace OMP
974 } // namespace MultiProc
975 
976 } // namespace Tomographer
977 
978 
979 
980 
981 
982 #endif
ThreadSanitizerLogger(BaseLogger &logger, MoreArgs &&...)
Constructor.
Definition: multiprocomp.h:223
LoggerType_ LoggerType
The logger type specified to the dispatcher (not necessarily thread-safe)
Definition: multiprocomp.h:365
Base namespace for the Tomographer project.
Definition: densellh.h:45
void run()
Run the specified tasks.
Definition: multiprocomp.h:705
void emitLog(int level, const char *origin, const std::string &msg)
Implementation of Logger::LoggerBase::emitLog()
Definition: multiprocomp.h:242
TaskCData_ TaskCData
The type which stores constant, shared data for all tasks to access.
Definition: multiprocomp.h:361
T duration_cast(T... args)
TaskDispatcher< TaskType_, TaskCData_, ResultsCollector_, LoggerType_, CountIntType_ > makeTaskDispatcher(TaskCData_ *pcdata_, ResultsCollector_ *results_, LoggerType_ &logger_, CountIntType_ num_total_runs_, CountIntType_ n_chunk_=1)
Create an OMP task dispatcher. Useful if you want C++&#39;s template argument deduction mechanism...
Definition: multiprocomp.h:961
void setStatusReportHandler(FullStatusReportCallbackType fnstatus)
assign a callable to be called whenever a status report is requested
Definition: multiprocomp.h:886
Base logger class.
Definition: loggers.h:428
TaskLoggerType_ TaskLoggerType
A thread-safe logger type which is passed on to the child tasks.
Definition: multiprocomp.h:369
CountIntType_ CountIntType
Integer type used to count the number of tasks to run (or running)
Definition: multiprocomp.h:367
STL class.
#define TOMOGRAPHER_CXX_STACK_FORCE_REALIGN
Definition: cxxutil.h:393
ResultsCollector_ ResultsCollector
The type which is responsible to collect the final results of the individual tasks.
Definition: multiprocomp.h:363
std::enable_if< dummy &&Logger::LoggerTraits< BaseLogger >::HasFilterByOrigin, bool >::type filterByOrigin(int level, const char *origin) const
Implementation of Logger::LoggerBase::filterByOrigin()
Definition: multiprocomp.h:255
void requestInterrupt()
Request an immediate interruption of the tasks.
Definition: multiprocomp.h:945
FullStatusReport< TaskStatusReportType > FullStatusReportType
The type to use to generate a full status report of all running tasks.
Definition: multiprocomp.h:371
TaskType::StatusReportType TaskStatusReportType
The type used by a single task when providing a status report.
Definition: multiprocomp.h:359
std::function< void(const FullStatusReportType &)> FullStatusReportCallbackType
The relevant type for a callback function (or callable) which is provided with the full status report...
Definition: multiprocomp.h:378
Some C++ utilities, with a tad of C++11 tricks.
STL class.
Managing the need for specific overrides to operator new() for some types (especially Eigen types) ...
T size(T... args)
Some common definitions for multiprocessing interfaces.
Traits template struct to be specialized for specific Logger implementations.
Definition: loggers.h:352
T c_str(T... args)
Wrapper logger to call non-thread-safe loggers from a multithreaded environment.
Definition: multiprocomp.h:208
Dispatches tasks to parallel threads using OpenMP.
Definition: multiprocomp.h:353
TaskDispatcher(TaskCData *pcdata_, ResultsCollector *results_, LoggerType &logger_, CountIntType num_total_runs_, CountIntType n_chunk_=1)
Task dispatcher constructor.
Definition: multiprocomp.h:695
void requestPeriodicStatusReport(int milliseconds)
Request a periodic status report.
Definition: multiprocomp.h:926
STL class.
#define tomographer_assert(...)
Assertion test macro.
Definition: cxxdefs.h:83
Utilities for logging messages.
void requestStatusReport()
Request a status report.
Definition: multiprocomp.h:904
A complete status report, abstract version.
Definition: multiproc.h:85