28 #ifndef MULTIPROCOMP_H 29 #define MULTIPROCOMP_H 38 inline constexpr
int omp_get_thread_num() {
return 0; }
39 inline constexpr
int omp_get_num_threads() {
return 1; }
42 #include <boost/exception/diagnostic_information.hpp> 49 #ifdef TOMOGRAPHER_USE_WINDOWS_SLEEP 52 # define TOMOGRAPHER_SLEEP_FOR_MS(x) Sleep((x)) 56 # ifdef TOMOGRAPHER_USE_MINGW_STD_THREAD 57 # include <mingw.thread.h> 59 # define TOMOGRAPHER_SLEEP_FOR_MS(x) \ 60 std::this_thread::sleep_for(std::chrono::milliseconds((x))) 77 namespace tomo_internal {
93 template<
typename BaseLogger,
bool baseLoggerIsThreadSafe>
94 struct ThreadSanitizerLoggerHelper
96 static inline void emitLog(BaseLogger & baselogger,
int level,
const char * origin,
const std::string & msg)
98 bool got_exception =
false;
104 baselogger.emitLog(level, origin, msg);
106 got_exception =
true;
107 exception_str =
std::string(
"Caught exception in emitLog: ") + boost::current_exception_diagnostic_information();
114 static inline bool filterByOrigin(BaseLogger & baselogger,
int level,
const char * origin)
116 bool got_exception =
false;
124 ok = baselogger.filterByOrigin(level, origin);
126 got_exception =
true;
127 exception_str =
std::string(
"Caught exception in filterByOrigni: ")
128 + boost::current_exception_diagnostic_information();
142 template<
typename BaseLogger>
143 struct ThreadSanitizerLoggerHelper<BaseLogger, true>
145 static inline void emitLog(BaseLogger & baselogger,
int level,
const char * origin,
const std::string & msg)
148 baselogger.emitLog(level, origin, msg);
150 static inline bool filterByOrigin(BaseLogger & baselogger,
int level,
const char * origin)
153 return baselogger.filterByOrigin(level, origin);
211 template<
typename BaseLogger>
215 BaseLogger & _baselogger;
226 template<
typename... MoreArgs>
249 tomo_internal::ThreadSanitizerLoggerHelper<BaseLogger,
252 _baselogger, level, origin, msg
257 template<
bool dummy = true>
261 return tomo_internal::ThreadSanitizerLoggerHelper<BaseLogger,
264 _baselogger, level, origin
279 template<
typename BaseLogger>
280 struct TOMOGRAPHER_EXPORT
LoggerTraits<MultiProc::OMP::ThreadSanitizerLogger<BaseLogger> >
295 namespace MultiProc {
356 template<
typename TaskType_,
typename TaskCData_,
typename LoggerType_,
357 typename TaskCountIntType_ = int,
358 typename TaskLoggerType_ = ThreadSanitizerLogger<LoggerType_> >
389 #if defined(__GNUC__) && __GNUC__ == 4 && __GNUC_MINOR__ <= 6 && !defined(__clang__) 390 std::chrono::monotonic_clock
399 TaskInterruptedInnerException() : msg(
"Task Interrupted") { }
400 virtual ~TaskInterruptedInnerException()
throw() { };
401 const char * what()
const throw() {
return msg.
c_str(); }
406 TaskInnerException(
std::string msgexc) : msg(
"Task raised an exception: "+msgexc) { }
407 virtual ~TaskInnerException()
throw() { };
408 const char * what()
const throw() {
return msg.
c_str(); }
412 struct thread_shared_data {
413 thread_shared_data(
const TaskCData * pcdata_, LoggerType & logger_,
414 TaskCountIntType num_total_runs_, TaskCountIntType n_chunk_)
419 status_report_underway(
false),
420 status_report_initialized(
false),
421 status_report_ready(
false),
422 status_report_counter(0),
423 status_report_periodic_interval(-1),
424 status_report_numreportsrecieved(0),
425 status_report_full(),
426 status_report_user_fn(),
427 interrupt_requested(0),
428 num_total_runs(num_total_runs_), n_chunk(n_chunk_), num_completed(0),
429 num_active_working_threads(0)
432 const TaskCData * pcdata;
436 StdClockType::time_point time_start;
438 bool status_report_underway;
439 bool status_report_initialized;
440 bool status_report_ready;
442 int status_report_periodic_interval;
443 TaskCountIntType status_report_numreportsrecieved;
445 FullStatusReportType status_report_full;
446 FullStatusReportCallbackType status_report_user_fn;
450 TaskCountIntType num_total_runs;
451 TaskCountIntType n_chunk;
452 TaskCountIntType num_completed;
454 TaskCountIntType num_active_working_threads;
457 struct thread_private_data
459 thread_shared_data * shared_data;
461 TaskLoggerType * logger;
463 TaskCountIntType kiter;
464 int local_status_report_counter;
466 inline bool statusReportRequested()
const 468 if (shared_data->interrupt_requested) {
469 throw TaskInterruptedInnerException();
476 if (omp_get_thread_num() == 0) {
479 if (shared_data->status_report_periodic_interval > 0) {
480 _master_thread_update_status_report_periodic_interval_counter();
485 if (shared_data->status_report_ready) {
486 bool got_exception =
false;
492 shared_data->status_report_user_fn(
std::move(shared_data->status_report_full));
494 shared_data->status_report_numreportsrecieved = 0;
495 shared_data->status_report_underway =
false;
496 shared_data->status_report_initialized =
false;
497 shared_data->status_report_ready =
false;
498 shared_data->status_report_full.workers_running.clear();
499 shared_data->status_report_full.workers_reports.clear();
501 got_exception =
true;
502 exception_str =
std::string(
"Caught exception in status reporting callback: ")
503 + boost::current_exception_diagnostic_information();
507 throw TaskInnerException(exception_str);
512 return local_status_report_counter != (int)shared_data->status_report_counter;
516 inline void _master_thread_update_status_report_periodic_interval_counter()
const 518 shared_data->status_report_counter = (
520 StdClockType::now().time_since_epoch()
521 ).count() / shared_data->status_report_periodic_interval) & 0x00FFFFFF
528 inline void submitStatusReport(
const TaskStatusReportType &statreport)
530 if (local_status_report_counter == (
int)shared_data->status_report_counter) {
532 logger->warning(
"OMP TaskDispatcher/taskmanageriface",
"Task submitted unsollicited status report");
537 bool got_exception =
false;
545 local_status_report_counter = shared_data->status_report_counter;
548 int threadnum = omp_get_thread_num();
552 shared_data->logger.longdebug(
"OMP TaskDispatcher/taskmanageriface", [&](
std::ostream & stream) {
553 stream <<
"status report received for thread #" << threadnum <<
", treating it ...";
560 if (!shared_data->status_report_initialized) {
565 if (shared_data->status_report_underway) {
569 shared_data->logger.warning(
"OMP TaskDispatcher/taskmanageriface",
570 "status report already underway!");
573 if (!shared_data->status_report_user_fn) {
577 shared_data->logger.warning(
"OMP TaskDispatcher/taskmanageriface",
578 "no user status report handler set!" 579 " call setStatusReportHandler() first.");
586 shared_data->status_report_underway =
true;
587 shared_data->status_report_initialized =
true;
588 shared_data->status_report_ready =
false;
591 shared_data->status_report_full = FullStatusReportType();
592 shared_data->status_report_full.num_completed = shared_data->num_completed;
593 shared_data->status_report_full.num_total_runs = shared_data->num_total_runs;
595 StdClockType::now() - shared_data->time_start
598 int num_threads = omp_get_num_threads();
603 shared_data->status_report_full.workers_running.clear();
604 shared_data->status_report_full.workers_reports.clear();
605 shared_data->status_report_full.workers_running.resize((
std::size_t)num_threads,
false);
606 shared_data->status_report_full.workers_reports.resize((
std::size_t)num_threads);
608 shared_data->status_report_numreportsrecieved = 0;
612 shared_data->logger.debug(
"OMP TaskDispatcher/taskmanageriface", [&](
std::ostream & stream) {
613 stream <<
"vectors resized to workers_running.size()=" 614 << shared_data->status_report_full.workers_running.size()
615 <<
" and workers_reports.size()=" 616 << shared_data->status_report_full.workers_reports.size()
632 shared_data->logger.debug(
"OMP TaskDispatcher/taskmanageriface",
"threadnum=%ld, workers_reports.size()=%ld",
633 (
long)threadnum, (
long)shared_data->status_report_full.workers_reports.size());
636 (
std::size_t)threadnum < shared_data->status_report_full.workers_reports.size());
638 shared_data->status_report_full.workers_running[(
std::size_t)threadnum] =
true;
639 shared_data->status_report_full.workers_reports[(
std::size_t)threadnum] = statreport;
641 ++ shared_data->status_report_numreportsrecieved;
643 if (shared_data->status_report_numreportsrecieved == shared_data->num_active_working_threads) {
650 shared_data->status_report_ready =
true;
666 got_exception =
true;
667 exception_str =
std::string(
"Caught exception while processing status report: ")
668 + boost::current_exception_diagnostic_information();
672 throw TaskInnerException(exception_str);
677 thread_shared_data shared_data;
698 TaskDispatcher(TaskCData * pcdata_, LoggerType & logger_, TaskCountIntType num_total_runs_,
699 TaskCountIntType n_chunk_ = 1)
700 : shared_data(pcdata_, logger_, num_total_runs_, n_chunk_)
706 for (
auto r : shared_data.results) {
719 shared_data.logger.debug(
"MultiProc::OMP::TaskDispatcher::run()",
"Let's go!");
720 shared_data.time_start = StdClockType::now();
722 shared_data.results.resize((
std::size_t)shared_data.num_total_runs, NULL);
724 shared_data.logger.debug(
"MultiProc::OMP::TaskDispatcher::run()",
"preparing for parallel runs");
727 shared_data.logger.warning(
"MultiProc::OMP::TaskDispatcher::run()",
"OpenMP is disabled; tasks will run serially.");
732 TaskCountIntType num_total_runs = shared_data.num_total_runs;
733 TaskCountIntType n_chunk = shared_data.n_chunk;
736 TaskCountIntType k = 0;
738 thread_shared_data *shdat = &shared_data;
739 thread_private_data privdat;
741 shared_data.logger.debug(
"MultiProc::OMP::TaskDispatcher::run()",
"About to start parallel section");
743 int num_active_parallel = 0;
747 #pragma omp parallel default(none) private(k, privdat) shared(shdat, num_total_runs, n_chunk, num_active_parallel, inner_exception) 749 privdat.shared_data = shdat;
753 ++num_active_parallel;
758 #pragma omp for schedule(dynamic,n_chunk) nowait 759 for (k = 0; k < num_total_runs; ++k) {
766 _run_task(privdat, shdat, k);
771 shdat->interrupt_requested = 1;
772 inner_exception +=
std::string(
"Exception caught inside task: ")
773 + boost::current_exception_diagnostic_information() +
"\n";
780 --num_active_parallel;
784 if (shdat->status_report_periodic_interval > 0) {
786 while (num_active_parallel > 0) {
787 TOMOGRAPHER_SLEEP_FOR_MS(shdat->status_report_periodic_interval);
788 privdat._master_thread_update_status_report_periodic_interval_counter();
795 if (inner_exception.
size()) {
800 if (shared_data.interrupt_requested) {
810 return shared_data.num_total_runs;
817 return shared_data.results;
824 return *shared_data.results[k];
828 void _run_task(thread_private_data & privdat, thread_shared_data * shdat, TaskCountIntType k)
833 if (shdat->interrupt_requested) {
839 ++ shdat->num_active_working_threads;
840 privdat.local_status_report_counter = shdat->status_report_counter;
844 TaskLoggerType threadsafelogger(shdat->logger, shdat->pcdata, k);
847 threadsafelogger.longdebug(
"Tomographer::MultiProc::OMP::TaskDispatcher::_run_task()",
848 "Run #%lu: thread-safe logger set up", (
unsigned long)k);
852 privdat.logger = &threadsafelogger;
855 threadsafelogger.longdebug(
"Tomographer::MultiProc::OMP::TaskDispatcher::_run_task()",
856 "Run #%lu: querying CData for task input", (
unsigned long)k);
858 auto input = shdat->pcdata->getTaskInput(k);
861 threadsafelogger.debug(
"Tomographer::MultiProc::OMP::TaskDispatcher::_run_task()",
862 "Running task #%lu ...", (
unsigned long)k);
865 TaskType t(input, shdat->pcdata, threadsafelogger);
868 threadsafelogger.longdebug(
"Tomographer::MultiProc::OMP::TaskDispatcher::_run_task()",
869 "Task #%lu set up.", (
unsigned long)k);
873 t.run(shdat->pcdata, threadsafelogger, &privdat);
874 }
catch (
const TaskInterruptedInnerException & ) {
878 bool got_exception =
false;
883 shdat->results[(
std::size_t)k] =
new TaskResultType(t.stealResult());
885 got_exception =
true;
886 exception_str =
std::string(
"Caught exception while storing result: ")
887 + boost::current_exception_diagnostic_information();
890 if (privdat.local_status_report_counter != (
int)shdat->status_report_counter) {
893 ++ shdat->status_report_numreportsrecieved;
896 ++ shdat->num_completed;
897 -- shdat->num_active_working_threads;
922 shared_data.status_report_user_fn = fnstatus;
947 shared_data.status_report_counter = (shared_data.status_report_counter + 1) & 0x7f;
962 shared_data.status_report_periodic_interval = milliseconds;
979 shared_data.interrupt_requested = 1;
989 template<
typename TaskType_,
typename TaskCData_,
990 typename LoggerType_,
typename TaskCountIntType_ =
int>
992 LoggerType_, TaskCountIntType_>
994 TaskCountIntType_ num_total_runs_, TaskCountIntType_ n_chunk_ = 1)
998 LoggerType_, TaskCountIntType_>(
999 pcdata_, logger_, num_total_runs_, n_chunk_
void requestPeriodicStatusReport(int milliseconds)
Request a periodic status report.
void requestStatusReport()
Request a status report.
ThreadSanitizerLogger(BaseLogger &logger, MoreArgs &&...)
Constructor.
Base namespace for the Tomographer project.
const TaskResultType & collectedTaskResult(std::size_t k) const
Get the result of a specific given task.
TaskCountIntType_ TaskCountIntType
Integer type used to count the number of tasks to run (or running)
void run()
Run the specified tasks.
LoggerType_ LoggerType
The logger type specified to the dispatcher (not necessarily thread-safe)
void emitLog(int level, const char *origin, const std::string &msg)
Implementation of Logger::LoggerBase::emitLog()
T duration_cast(T... args)
std::function< void(const FullStatusReportType &)> FullStatusReportCallbackType
The relevant type for a callback function (or callable) which is provided with the full status report...
TaskType::ResultType TaskResultType
The type representing the result of a single task run.
const std::vector< TaskResultType * > & collectedTaskResults() const
Get all the task results.
#define TOMOGRAPHER_CXX_STACK_FORCE_REALIGN
FullStatusReport< TaskStatusReportType, TaskCountIntType > FullStatusReportType
The type to use to generate a full status report of all running tasks.
void setStatusReportHandler(FullStatusReportCallbackType fnstatus)
assign a callable to be called whenever a status report is requested
std::enable_if< dummy &&Logger::LoggerTraits< BaseLogger >::HasFilterByOrigin, bool >::type filterByOrigin(int level, const char *origin) const
Implementation of Logger::LoggerBase::filterByOrigin()
TaskDispatcher< TaskType_, TaskCData_, LoggerType_, TaskCountIntType_ > makeTaskDispatcher(TaskCData_ *pcdata_, LoggerType_ &logger_, TaskCountIntType_ num_total_runs_, TaskCountIntType_ n_chunk_=1)
Create an OMP task dispatcher. Useful if you want C++'s template argument deduction mechanism...
TaskLoggerType_ TaskLoggerType
A thread-safe logger type which is passed on to the child tasks.
Some C++ utilities, with a tad of C++11 tricks.
TaskType::StatusReportType TaskStatusReportType
The type used by a single task when providing a status report.
Managing the need for specific overrides to operator new() for some types (especially Eigen types) ...
Some common definitions for multiprocessing interfaces.
Traits template struct to be specialized for specific Logger implementations.
Wrapper logger to call non-thread-safe loggers from a multithreaded environment.
Dispatches tasks to parallel threads using OpenMP.
TaskType_ TaskType
The task type.
TaskCountIntType numTaskRuns() const
Total number of task run instances.
TaskCData_ TaskCData
The type which stores constant, shared data for all tasks to access.
void requestInterrupt()
Request an immediate interruption of the tasks.
#define tomographer_assert(...)
Assertion test macro.
TaskDispatcher(TaskCData *pcdata_, LoggerType &logger_, TaskCountIntType num_total_runs_, TaskCountIntType n_chunk_=1)
Task dispatcher constructor.
Utilities for logging messages.
A complete status report, abstract version.