28 #ifndef MULTIPROCOMP_H 29 #define MULTIPROCOMP_H 39 inline constexpr
int omp_get_thread_num() {
return 0; }
40 inline constexpr
int omp_get_num_threads() {
return 1; }
43 #include <boost/exception/diagnostic_information.hpp> 50 #ifdef TOMOGRAPHER_USE_WINDOWS_SLEEP 53 #define TOMOGRAPHER_SLEEP_FOR_MS(x) Sleep((x)) 56 #define TOMOGRAPHER_SLEEP_FOR_MS(x) std::this_thread::sleep_for(std::chrono::milliseconds((x))) 73 namespace tomo_internal {
89 template<
typename BaseLogger,
bool baseLoggerIsThreadSafe>
90 struct ThreadSanitizerLoggerHelper
92 static inline void emitLog(BaseLogger & baselogger,
int level,
const char * origin,
const std::string & msg)
94 bool got_exception =
false;
100 baselogger.emitLog(level, origin, msg);
102 got_exception =
true;
103 exception_str =
std::string(
"Caught exception in emitLog: ") + boost::current_exception_diagnostic_information();
110 static inline bool filterByOrigin(BaseLogger & baselogger,
int level,
const char * origin)
112 bool got_exception =
false;
120 ok = baselogger.filterByOrigin(level, origin);
122 got_exception =
true;
123 exception_str =
std::string(
"Caught exception in filterByOrigni: ")
124 + boost::current_exception_diagnostic_information();
138 template<
typename BaseLogger>
139 struct ThreadSanitizerLoggerHelper<BaseLogger, true>
141 static inline void emitLog(BaseLogger & baselogger,
int level,
const char * origin,
const std::string & msg)
144 baselogger.emitLog(level, origin, msg);
146 static inline bool filterByOrigin(BaseLogger & baselogger,
int level,
const char * origin)
149 return baselogger.filterByOrigin(level, origin);
207 template<
typename BaseLogger>
211 BaseLogger & _baselogger;
222 template<
typename... MoreArgs>
245 tomo_internal::ThreadSanitizerLoggerHelper<BaseLogger,
248 _baselogger, level, origin, msg
253 template<
bool dummy = true>
257 return tomo_internal::ThreadSanitizerLoggerHelper<BaseLogger,
260 _baselogger, level, origin
275 template<
typename BaseLogger>
290 namespace MultiProc {
350 template<
typename TaskType_,
typename TaskCData_,
typename ResultsCollector_,
351 typename LoggerType_,
typename CountIntType_ = int,
352 typename TaskLoggerType_ = ThreadSanitizerLogger<LoggerType_> >
383 #if defined(__GNUC__) && __GNUC__ == 4 && __GNUC_MINOR__ <= 6 && !defined(__clang__) 384 std::chrono::monotonic_clock
393 TaskInterruptedInnerException() : msg(
"Task Interrupted") { }
394 virtual ~TaskInterruptedInnerException()
throw() { };
395 const char * what()
const throw() {
return msg.
c_str(); }
400 TaskInnerException(
std::string msgexc) : msg(
"Task raised an exception: "+msgexc) { }
401 virtual ~TaskInnerException()
throw() { };
402 const char * what()
const throw() {
return msg.
c_str(); }
406 struct thread_shared_data {
407 thread_shared_data(
const TaskCData * pcdata_, ResultsCollector * results_, LoggerType & logger_,
408 CountIntType num_total_runs_, CountIntType n_chunk_)
413 status_report_underway(
false),
414 status_report_initialized(
false),
415 status_report_ready(
false),
416 status_report_counter(0),
417 status_report_periodic_interval(-1),
418 status_report_numreportsrecieved(0),
419 status_report_full(),
420 status_report_user_fn(),
421 interrupt_requested(0),
422 num_total_runs(num_total_runs_), n_chunk(n_chunk_), num_completed(0),
423 num_active_working_threads(0)
426 const TaskCData * pcdata;
427 ResultsCollector * results;
430 StdClockType::time_point time_start;
432 bool status_report_underway;
433 bool status_report_initialized;
434 bool status_report_ready;
436 int status_report_periodic_interval;
437 CountIntType status_report_numreportsrecieved;
439 FullStatusReportType status_report_full;
440 FullStatusReportCallbackType status_report_user_fn;
444 CountIntType num_total_runs;
445 CountIntType n_chunk;
446 CountIntType num_completed;
448 CountIntType num_active_working_threads;
451 struct thread_private_data
453 thread_shared_data * shared_data;
455 TaskLoggerType * logger;
458 CountIntType local_status_report_counter;
460 inline bool statusReportRequested()
const 462 if (shared_data->interrupt_requested) {
463 throw TaskInterruptedInnerException();
470 if (omp_get_thread_num() == 0) {
473 if (shared_data->status_report_periodic_interval > 0) {
474 _master_thread_update_status_report_periodic_interval_counter();
479 if (shared_data->status_report_ready) {
480 bool got_exception =
false;
486 shared_data->status_report_user_fn(shared_data->status_report_full);
488 shared_data->status_report_numreportsrecieved = 0;
489 shared_data->status_report_underway =
false;
490 shared_data->status_report_initialized =
false;
491 shared_data->status_report_ready =
false;
492 shared_data->status_report_full.workers_running.clear();
493 shared_data->status_report_full.workers_reports.clear();
495 got_exception =
true;
496 exception_str =
std::string(
"Caught exception in status reporting callback: ")
497 + boost::current_exception_diagnostic_information();
501 throw TaskInnerException(exception_str);
506 return (
int)local_status_report_counter != (int)shared_data->status_report_counter;
510 inline void _master_thread_update_status_report_periodic_interval_counter()
const 512 shared_data->status_report_counter = (
514 StdClockType::now().time_since_epoch()
515 ).count() / shared_data->status_report_periodic_interval) & 0x00FFFFFF
522 inline void submitStatusReport(
const TaskStatusReportType &statreport)
524 if ((
int)local_status_report_counter == (
int)shared_data->status_report_counter) {
526 logger->warning(
"OMP TaskDispatcher/taskmanageriface",
"Task submitted unsollicited status report");
531 bool got_exception =
false;
539 local_status_report_counter = shared_data->status_report_counter;
542 int threadnum = omp_get_thread_num();
546 shared_data->logger.longdebug(
"OMP TaskDispatcher/taskmanageriface", [&](
std::ostream & stream) {
547 stream <<
"status report received for thread #" << threadnum <<
", treating it ...";
554 if (!shared_data->status_report_initialized) {
559 if (shared_data->status_report_underway) {
563 shared_data->logger.warning(
"OMP TaskDispatcher/taskmanageriface",
564 "status report already underway!");
567 if (!shared_data->status_report_user_fn) {
571 shared_data->logger.warning(
"OMP TaskDispatcher/taskmanageriface",
572 "no user status report handler set!" 573 " call setStatusReportHandler() first.");
580 shared_data->status_report_underway =
true;
581 shared_data->status_report_initialized =
true;
582 shared_data->status_report_ready =
false;
585 shared_data->status_report_full = FullStatusReportType();
586 shared_data->status_report_full.num_completed = shared_data->num_completed;
587 shared_data->status_report_full.num_total_runs = shared_data->num_total_runs;
589 StdClockType::now() - shared_data->time_start
592 int num_threads = omp_get_num_threads();
597 shared_data->status_report_full.workers_running.clear();
598 shared_data->status_report_full.workers_reports.clear();
599 shared_data->status_report_full.workers_running.resize(num_threads,
false);
600 shared_data->status_report_full.workers_reports.resize(num_threads);
602 shared_data->status_report_numreportsrecieved = 0;
606 shared_data->logger.debug(
"OMP TaskDispatcher/taskmanageriface", [&](
std::ostream & stream) {
607 stream <<
"vectors resized to workers_running.size()=" 608 << shared_data->status_report_full.workers_running.size()
609 <<
" and workers_reports.size()=" 610 << shared_data->status_report_full.workers_reports.size()
626 shared_data->logger.debug(
"OMP TaskDispatcher/taskmanageriface",
"threadnum=%ld, workers_reports.size()=%ld",
627 (
long)threadnum, (
long)shared_data->status_report_full.workers_reports.size());
630 (
std::size_t)threadnum < shared_data->status_report_full.workers_reports.size());
632 shared_data->status_report_full.workers_running[threadnum] =
true;
633 shared_data->status_report_full.workers_reports[threadnum] = statreport;
635 ++ shared_data->status_report_numreportsrecieved;
637 if (shared_data->status_report_numreportsrecieved == shared_data->num_active_working_threads) {
644 shared_data->status_report_ready =
true;
660 got_exception =
true;
661 exception_str =
std::string(
"Caught exception while processing status report: ")
662 + boost::current_exception_diagnostic_information();
666 throw TaskInnerException(exception_str);
671 thread_shared_data shared_data;
695 TaskDispatcher(TaskCData * pcdata_, ResultsCollector * results_, LoggerType & logger_,
696 CountIntType num_total_runs_, CountIntType n_chunk_ = 1)
697 : shared_data(pcdata_, results_, logger_, num_total_runs_, n_chunk_)
707 shared_data.logger.debug(
"MultiProc::OMP::TaskDispatcher::run()",
"Let's go!");
708 shared_data.time_start = StdClockType::now();
710 shared_data.results->init(shared_data.num_total_runs, shared_data.n_chunk, shared_data.pcdata);
712 shared_data.logger.debug(
"MultiProc::OMP::TaskDispatcher::run()",
"preparing for parallel runs");
715 shared_data.logger.warning(
"MultiProc::OMP::TaskDispatcher::run()",
"OpenMP is disabled; tasks will run serially.");
720 CountIntType num_total_runs = shared_data.num_total_runs;
721 CountIntType n_chunk = shared_data.n_chunk;
726 thread_shared_data *shdat = &shared_data;
727 thread_private_data privdat;
729 shared_data.logger.debug(
"MultiProc::OMP::TaskDispatcher::run()",
"About to start parallel section");
731 int num_active_parallel = 0;
735 #pragma omp parallel default(none) private(k, privdat) shared(shdat, num_total_runs, n_chunk, num_active_parallel, inner_exception) 737 privdat.shared_data = shdat;
741 ++num_active_parallel;
746 #pragma omp for schedule(dynamic,n_chunk) nowait 747 for (k = 0; k < num_total_runs; ++k) {
754 _run_task(privdat, shdat, k);
759 shdat->interrupt_requested = 1;
760 inner_exception +=
std::string(
"Exception caught inside task: ")
761 + boost::current_exception_diagnostic_information() +
"\n";
768 --num_active_parallel;
772 if (shdat->status_report_periodic_interval > 0) {
774 while (num_active_parallel > 0) {
775 TOMOGRAPHER_SLEEP_FOR_MS(shdat->status_report_periodic_interval);
776 privdat._master_thread_update_status_report_periodic_interval_counter();
783 if (inner_exception.
size()) {
788 if (shared_data.interrupt_requested) {
792 shared_data.results->runsFinished(num_total_runs, shared_data.pcdata);
796 void _run_task(thread_private_data & privdat, thread_shared_data * shdat, CountIntType k)
801 if (shdat->interrupt_requested) {
807 ++ shdat->num_active_working_threads;
808 privdat.local_status_report_counter = shdat->status_report_counter;
812 TaskLoggerType threadsafelogger(shdat->logger, shdat->pcdata, k);
815 threadsafelogger.longdebug(
"Tomographer::MultiProc::OMP::TaskDispatcher::_run_task()",
816 "Run #%lu: thread-safe logger set up", (
unsigned long)k);
820 privdat.logger = &threadsafelogger;
823 threadsafelogger.longdebug(
"Tomographer::MultiProc::OMP::TaskDispatcher::_run_task()",
824 "Run #%lu: querying CData for task input", (
unsigned long)k);
826 auto input = shdat->pcdata->getTaskInput(k);
829 threadsafelogger.debug(
"Tomographer::MultiProc::OMP::TaskDispatcher::_run_task()",
830 "Running task #%lu ...", (
unsigned long)k);
833 TaskType t(input, shdat->pcdata, threadsafelogger);
836 threadsafelogger.longdebug(
"Tomographer::MultiProc::OMP::TaskDispatcher::_run_task()",
837 "Task #%lu set up.", (
unsigned long)k);
841 t.run(shdat->pcdata, threadsafelogger, &privdat);
842 }
catch (
const TaskInterruptedInnerException & ) {
846 bool got_exception =
false;
851 shdat->results->collectResult(k, t.getResult(), shdat->pcdata);
853 got_exception =
true;
854 exception_str =
std::string(
"Caught exception while processing status report: ")
855 + boost::current_exception_diagnostic_information();
858 if ((
int)privdat.local_status_report_counter != (
int)shdat->status_report_counter) {
861 ++ shdat->status_report_numreportsrecieved;
864 ++ shdat->num_completed;
865 -- shdat->num_active_working_threads;
890 shared_data.status_report_user_fn = fnstatus;
915 shared_data.status_report_counter = (shared_data.status_report_counter + 1) & 0x7f;
930 shared_data.status_report_periodic_interval = milliseconds;
947 shared_data.interrupt_requested = 1;
957 template<
typename TaskType_,
typename TaskCData_,
typename ResultsCollector_,
958 typename LoggerType_,
typename CountIntType_ =
int>
960 LoggerType_, CountIntType_>
962 CountIntType_ num_total_runs_, CountIntType_ n_chunk_ = 1)
966 LoggerType_, CountIntType_>(
967 pcdata_, results_, logger_, num_total_runs_, n_chunk_
TaskType_ TaskType
The task type.
ThreadSanitizerLogger(BaseLogger &logger, MoreArgs &&...)
Constructor.
LoggerType_ LoggerType
The logger type specified to the dispatcher (not necessarily thread-safe)
Base namespace for the Tomographer project.
void run()
Run the specified tasks.
void emitLog(int level, const char *origin, const std::string &msg)
Implementation of Logger::LoggerBase::emitLog()
TaskCData_ TaskCData
The type which stores constant, shared data for all tasks to access.
T duration_cast(T... args)
TaskDispatcher< TaskType_, TaskCData_, ResultsCollector_, LoggerType_, CountIntType_ > makeTaskDispatcher(TaskCData_ *pcdata_, ResultsCollector_ *results_, LoggerType_ &logger_, CountIntType_ num_total_runs_, CountIntType_ n_chunk_=1)
Create an OMP task dispatcher. Useful if you want C++'s template argument deduction mechanism...
void setStatusReportHandler(FullStatusReportCallbackType fnstatus)
assign a callable to be called whenever a status report is requested
TaskLoggerType_ TaskLoggerType
A thread-safe logger type which is passed on to the child tasks.
CountIntType_ CountIntType
Integer type used to count the number of tasks to run (or running)
#define TOMOGRAPHER_CXX_STACK_FORCE_REALIGN
ResultsCollector_ ResultsCollector
The type which is responsible to collect the final results of the individual tasks.
std::enable_if< dummy &&Logger::LoggerTraits< BaseLogger >::HasFilterByOrigin, bool >::type filterByOrigin(int level, const char *origin) const
Implementation of Logger::LoggerBase::filterByOrigin()
void requestInterrupt()
Request an immediate interruption of the tasks.
FullStatusReport< TaskStatusReportType > FullStatusReportType
The type to use to generate a full status report of all running tasks.
TaskType::StatusReportType TaskStatusReportType
The type used by a single task when providing a status report.
std::function< void(const FullStatusReportType &)> FullStatusReportCallbackType
The relevant type for a callback function (or callable) which is provided with the full status report...
Some C++ utilities, with a tad of C++11 tricks.
Managing the need for specific overrides to operator new() for some types (especially Eigen types) ...
Some common definitions for multiprocessing interfaces.
Traits template struct to be specialized for specific Logger implementations.
Wrapper logger to call non-thread-safe loggers from a multithreaded environment.
Dispatches tasks to parallel threads using OpenMP.
TaskDispatcher(TaskCData *pcdata_, ResultsCollector *results_, LoggerType &logger_, CountIntType num_total_runs_, CountIntType n_chunk_=1)
Task dispatcher constructor.
void requestPeriodicStatusReport(int milliseconds)
Request a periodic status report.
#define tomographer_assert(...)
Assertion test macro.
Utilities for logging messages.
void requestStatusReport()
Request a status report.
A complete status report, abstract version.