28 #ifndef MULTIPROCOMP_H 29 #define MULTIPROCOMP_H 39 inline constexpr
int omp_get_thread_num() {
return 0; }
40 inline constexpr
int omp_get_num_threads() {
return 1; }
43 #include <boost/exception/diagnostic_information.hpp> 50 #ifdef TOMOGRAPHER_USE_WINDOWS_SLEEP 53 #define TOMOGRAPHER_SLEEP_FOR_MS(x) Sleep((x)) 56 #define TOMOGRAPHER_SLEEP_FOR_MS(x) std::this_thread::sleep_for(std::chrono::milliseconds((x))) 73 namespace tomo_internal {
89 template<
typename BaseLogger,
bool baseLoggerIsThreadSafe>
90 struct ThreadSanitizerLoggerHelper
92 static inline void emitLog(BaseLogger & baselogger,
int level,
const char * origin,
const std::string & msg)
94 bool got_exception =
false;
100 baselogger.emitLog(level, origin, msg);
102 got_exception =
true;
103 exception_str =
std::string(
"Caught exception in emitLog: ") + boost::current_exception_diagnostic_information();
110 static inline bool filterByOrigin(BaseLogger & baselogger,
int level,
const char * origin)
112 bool got_exception =
false;
120 ok = baselogger.filterByOrigin(level, origin);
122 got_exception =
true;
123 exception_str =
std::string(
"Caught exception in filterByOrigni: ")
124 + boost::current_exception_diagnostic_information();
138 template<
typename BaseLogger>
139 struct ThreadSanitizerLoggerHelper<BaseLogger, true>
141 static inline void emitLog(BaseLogger & baselogger,
int level,
const char * origin,
const std::string & msg)
144 baselogger.emitLog(level, origin, msg);
146 static inline bool filterByOrigin(BaseLogger & baselogger,
int level,
const char * origin)
149 return baselogger.filterByOrigin(level, origin);
207 template<
typename BaseLogger>
210 BaseLogger & _baselogger;
221 template<
typename... MoreArgs>
244 tomo_internal::ThreadSanitizerLoggerHelper<BaseLogger,
247 _baselogger, level, origin, msg
252 template<
bool dummy = true>
256 return tomo_internal::ThreadSanitizerLoggerHelper<BaseLogger,
259 _baselogger, level, origin
274 template<
typename BaseLogger>
289 namespace MultiProc {
349 template<
typename TaskType_,
typename TaskCData_,
typename ResultsCollector_,
350 typename LoggerType_,
typename CountIntType_ = int,
351 typename TaskLoggerType_ = ThreadSanitizerLogger<LoggerType_> >
382 #if defined(__GNUC__) && __GNUC__ == 4 && __GNUC_MINOR__ <= 6 && !defined(__clang__) 383 std::chrono::monotonic_clock
392 TaskInterruptedInnerException() : msg(
"Task Interrupted") { }
393 virtual ~TaskInterruptedInnerException()
throw() { };
394 const char * what()
const throw() {
return msg.
c_str(); }
399 TaskInnerException(
std::string msgexc) : msg(
"Task raised an exception: "+msgexc) { }
400 virtual ~TaskInnerException()
throw() { };
401 const char * what()
const throw() {
return msg.
c_str(); }
405 struct thread_shared_data {
406 thread_shared_data(
const TaskCData * pcdata_, ResultsCollector * results_, LoggerType & logger_,
407 CountIntType num_total_runs_, CountIntType n_chunk_)
412 status_report_underway(
false),
413 status_report_initialized(
false),
414 status_report_ready(
false),
415 status_report_counter(0),
416 status_report_periodic_interval(-1),
417 status_report_numreportsrecieved(0),
418 status_report_full(),
419 status_report_user_fn(),
420 interrupt_requested(0),
421 num_total_runs(num_total_runs_), n_chunk(n_chunk_), num_completed(0),
422 num_active_working_threads(0)
425 const TaskCData * pcdata;
426 ResultsCollector * results;
429 StdClockType::time_point time_start;
431 bool status_report_underway;
432 bool status_report_initialized;
433 bool status_report_ready;
435 int status_report_periodic_interval;
436 CountIntType status_report_numreportsrecieved;
438 FullStatusReportType status_report_full;
439 FullStatusReportCallbackType status_report_user_fn;
443 CountIntType num_total_runs;
444 CountIntType n_chunk;
445 CountIntType num_completed;
447 CountIntType num_active_working_threads;
450 struct thread_private_data
452 thread_shared_data * shared_data;
454 TaskLoggerType * logger;
457 CountIntType local_status_report_counter;
459 inline bool statusReportRequested()
const 461 if (shared_data->interrupt_requested) {
462 throw TaskInterruptedInnerException();
469 if (omp_get_thread_num() == 0) {
472 if (shared_data->status_report_periodic_interval > 0) {
473 _master_thread_update_status_report_periodic_interval_counter();
478 if (shared_data->status_report_ready) {
479 bool got_exception =
false;
485 shared_data->status_report_user_fn(shared_data->status_report_full);
487 shared_data->status_report_numreportsrecieved = 0;
488 shared_data->status_report_underway =
false;
489 shared_data->status_report_initialized =
false;
490 shared_data->status_report_ready =
false;
491 shared_data->status_report_full.workers_running.clear();
492 shared_data->status_report_full.workers_reports.clear();
494 got_exception =
true;
495 exception_str =
std::string(
"Caught exception in status reporting callback: ")
496 + boost::current_exception_diagnostic_information();
500 throw TaskInnerException(exception_str);
505 return (
int)local_status_report_counter != (int)shared_data->status_report_counter;
509 inline void _master_thread_update_status_report_periodic_interval_counter()
const 511 shared_data->status_report_counter = (
513 StdClockType::now().time_since_epoch()
514 ).count() / shared_data->status_report_periodic_interval) & 0x00FFFFFF
521 inline void submitStatusReport(
const TaskStatusReportType &statreport)
523 if ((
int)local_status_report_counter == (
int)shared_data->status_report_counter) {
525 logger->warning(
"OMP TaskDispatcher/taskmanageriface",
"Task submitted unsollicited status report");
530 bool got_exception =
false;
538 local_status_report_counter = shared_data->status_report_counter;
541 int threadnum = omp_get_thread_num();
545 shared_data->logger.longdebug(
"OMP TaskDispatcher/taskmanageriface", [&](
std::ostream & stream) {
546 stream <<
"status report received for thread #" << threadnum <<
", treating it ...";
553 if (!shared_data->status_report_initialized) {
558 if (shared_data->status_report_underway) {
562 shared_data->logger.warning(
"OMP TaskDispatcher/taskmanageriface",
563 "status report already underway!");
566 if (!shared_data->status_report_user_fn) {
570 shared_data->logger.warning(
"OMP TaskDispatcher/taskmanageriface",
571 "no user status report handler set!" 572 " call setStatusReportHandler() first.");
579 shared_data->status_report_underway =
true;
580 shared_data->status_report_initialized =
true;
581 shared_data->status_report_ready =
false;
584 shared_data->status_report_full = FullStatusReportType();
585 shared_data->status_report_full.num_completed = shared_data->num_completed;
586 shared_data->status_report_full.num_total_runs = shared_data->num_total_runs;
588 StdClockType::now() - shared_data->time_start
591 int num_threads = omp_get_num_threads();
596 shared_data->status_report_full.workers_running.clear();
597 shared_data->status_report_full.workers_reports.clear();
598 shared_data->status_report_full.workers_running.resize(num_threads,
false);
599 shared_data->status_report_full.workers_reports.resize(num_threads);
601 shared_data->status_report_numreportsrecieved = 0;
605 shared_data->logger.debug(
"OMP TaskDispatcher/taskmanageriface", [&](
std::ostream & stream) {
606 stream <<
"vectors resized to workers_running.size()=" 607 << shared_data->status_report_full.workers_running.size()
608 <<
" and workers_reports.size()=" 609 << shared_data->status_report_full.workers_reports.size()
625 shared_data->logger.debug(
"OMP TaskDispatcher/taskmanageriface",
"threadnum=%ld, workers_reports.size()=%ld",
626 (
long)threadnum, (
long)shared_data->status_report_full.workers_reports.size());
628 tomographer_assert(0 <= threadnum &&
629 (
std::size_t)threadnum < shared_data->status_report_full.workers_reports.size());
631 shared_data->status_report_full.workers_running[threadnum] =
true;
632 shared_data->status_report_full.workers_reports[threadnum] = statreport;
634 ++ shared_data->status_report_numreportsrecieved;
636 if (shared_data->status_report_numreportsrecieved == shared_data->num_active_working_threads) {
643 shared_data->status_report_ready =
true;
659 got_exception =
true;
660 exception_str =
std::string(
"Caught exception while processing status report: ")
661 + boost::current_exception_diagnostic_information();
665 throw TaskInnerException(exception_str);
670 thread_shared_data shared_data;
694 TaskDispatcher(TaskCData * pcdata_, ResultsCollector * results_, LoggerType & logger_,
695 CountIntType num_total_runs_, CountIntType n_chunk_)
696 : shared_data(pcdata_, results_, logger_, num_total_runs_, n_chunk_)
706 shared_data.logger.debug(
"MultiProc::OMP::TaskDispatcher::run()",
"Let's go!");
707 shared_data.time_start = StdClockType::now();
709 shared_data.results->init(shared_data.num_total_runs, shared_data.n_chunk, shared_data.pcdata);
711 shared_data.logger.debug(
"MultiProc::OMP::TaskDispatcher::run()",
"preparing for parallel runs");
714 shared_data.logger.warning(
"MultiProc::OMP::TaskDispatcher::run()",
"OpenMP is disabled; tasks will run serially.");
719 CountIntType num_total_runs = shared_data.num_total_runs;
720 CountIntType n_chunk = shared_data.n_chunk;
725 thread_shared_data *shdat = &shared_data;
726 thread_private_data privdat;
728 shared_data.logger.debug(
"MultiProc::OMP::TaskDispatcher::run()",
"About to start parallel section");
730 int num_active_parallel = 0;
734 #pragma omp parallel default(none) private(k, privdat) shared(shdat, num_total_runs, n_chunk, num_active_parallel, inner_exception) 736 privdat.shared_data = shdat;
740 ++num_active_parallel;
745 #pragma omp for schedule(dynamic,n_chunk) nowait 746 for (k = 0; k < num_total_runs; ++k) {
753 _run_task(privdat, shdat, k);
758 shdat->interrupt_requested = 1;
759 inner_exception +=
std::string(
"Exception caught inside task: ")
760 + boost::current_exception_diagnostic_information() +
"\n";
767 --num_active_parallel;
771 if (shdat->status_report_periodic_interval > 0) {
773 while (num_active_parallel > 0) {
774 TOMOGRAPHER_SLEEP_FOR_MS(shdat->status_report_periodic_interval);
775 privdat._master_thread_update_status_report_periodic_interval_counter();
782 if (inner_exception.
size()) {
787 if (shared_data.interrupt_requested) {
791 shared_data.results->runsFinished(num_total_runs, shared_data.pcdata);
795 void _run_task(thread_private_data & privdat, thread_shared_data * shdat, CountIntType k)
800 if (shdat->interrupt_requested) {
806 ++ shdat->num_active_working_threads;
807 privdat.local_status_report_counter = shdat->status_report_counter;
811 TaskLoggerType threadsafelogger(shdat->logger, shdat->pcdata, k);
814 threadsafelogger.longdebug(
"Tomographer::MultiProc::OMP::TaskDispatcher::_run_task()",
815 "Run #%lu: thread-safe logger set up", (
unsigned long)k);
819 privdat.logger = &threadsafelogger;
822 threadsafelogger.longdebug(
"Tomographer::MultiProc::OMP::TaskDispatcher::_run_task()",
823 "Run #%lu: querying CData for task input", (
unsigned long)k);
825 auto input = shdat->pcdata->getTaskInput(k);
828 threadsafelogger.debug(
"Tomographer::MultiProc::OMP::TaskDispatcher::_run_task()",
829 "Running task #%lu ...", (
unsigned long)k);
832 TaskType t(input, shdat->pcdata, threadsafelogger);
835 threadsafelogger.longdebug(
"Tomographer::MultiProc::OMP::TaskDispatcher::_run_task()",
836 "Task #%lu set up.", (
unsigned long)k);
840 t.run(shdat->pcdata, threadsafelogger, &privdat);
841 }
catch (
const TaskInterruptedInnerException & ) {
845 bool got_exception =
false;
850 shdat->results->collectResult(k, t.getResult(), shdat->pcdata);
852 got_exception =
true;
853 exception_str =
std::string(
"Caught exception while processing status report: ")
854 + boost::current_exception_diagnostic_information();
857 if ((
int)privdat.local_status_report_counter != (
int)shdat->status_report_counter) {
860 ++ shdat->status_report_numreportsrecieved;
863 ++ shdat->num_completed;
864 -- shdat->num_active_working_threads;
889 shared_data.status_report_user_fn = fnstatus;
914 shared_data.status_report_counter = (shared_data.status_report_counter + 1) & 0x7f;
929 shared_data.status_report_periodic_interval = milliseconds;
946 shared_data.interrupt_requested = 1;
956 template<
typename TaskType_,
typename TaskCData_,
typename ResultsCollector_,
957 typename LoggerType_,
typename CountIntType_ =
int>
959 LoggerType_, CountIntType_>
961 CountIntType_ num_total_runs_, CountIntType_ n_chunk_)
965 LoggerType_, CountIntType_>(
966 pcdata_, results_, logger_, num_total_runs_, n_chunk_
TaskType_ TaskType
The task type.
ThreadSanitizerLogger(BaseLogger &logger, MoreArgs &&...)
Constructor.
LoggerType_ LoggerType
The logger type specified to the dispatcher (not necessarily thread-safe)
Base namespace for the Tomographer project.
void run()
Run the specified tasks.
void emitLog(int level, const char *origin, const std::string &msg)
Implementation of Logger::LoggerBase::emitLog()
TaskCData_ TaskCData
The type which stores constant, shared data for all tasks to access.
T duration_cast(T... args)
void setStatusReportHandler(FullStatusReportCallbackType fnstatus)
assign a callable to be called whenever a status report is requested
TaskLoggerType_ TaskLoggerType
A thread-safe logger type which is passed on to the child tasks.
CountIntType_ CountIntType
Integer type used to count the number of tasks to run (or running)
#define TOMOGRAPHER_CXX_STACK_FORCE_REALIGN
ResultsCollector_ ResultsCollector
The type which is responsible to collect the final results of the individual tasks.
std::enable_if< dummy &&Logger::LoggerTraits< BaseLogger >::HasFilterByOrigin, bool >::type filterByOrigin(int level, const char *origin) const
Implementation of Logger::LoggerBase::filterByOrigin()
TaskDispatcher< TaskType_, TaskCData_, ResultsCollector_, LoggerType_, CountIntType_ > makeTaskDispatcher(TaskCData_ *pcdata_, ResultsCollector_ *results_, LoggerType_ &logger_, CountIntType_ num_total_runs_, CountIntType_ n_chunk_)
Create an OMP task dispatcher. Useful if you want C++'s template argument deduction mechanism...
TaskDispatcher(TaskCData *pcdata_, ResultsCollector *results_, LoggerType &logger_, CountIntType num_total_runs_, CountIntType n_chunk_)
Task dispatcher constructor.
void requestInterrupt()
Request an immediate interruption of the tasks.
FullStatusReport< TaskStatusReportType > FullStatusReportType
The type to use to generate a full status report of all running tasks.
TaskType::StatusReportType TaskStatusReportType
The type used by a single task when providing a status report.
std::function< void(const FullStatusReportType &)> FullStatusReportCallbackType
The relevant type for a callback function (or callable) which is provided with the full status report...
Some C++ utilities, with a tad of C++11 tricks.
Managing the need for specific overrides to operator new() for some types (especially Eigen types) ...
Some common definitions for multiprocessing interfaces.
Traits template struct to be specialized for specific Logger implementations.
Wrapper logger to call non-thread-safe loggers from a multithreaded environment.
Dispatches tasks to parallel threads using OpenMP.
void requestPeriodicStatusReport(int milliseconds)
Request a periodic status report.
Utilities for logging messages.
void requestStatusReport()
Request a status report.
A complete status report, abstract version.