28 #ifndef MULTIPROCTHREADS_H 29 #define MULTIPROCTHREADS_H 34 #include <boost/exception/diagnostic_information.hpp> 46 #ifdef TOMOGRAPHER_USE_MINGW_STD_THREAD 47 # include <mingw.thread.h> 48 # include <mingw.mutex.h> 64 namespace CxxThreads {
110 template<
typename BaseLogger>
117 BaseLogger & _baselogger;
130 template<
typename... MoreArgs>
146 TOMOGRAPHER_ENABLED_IF(IsBaseLoggerThreadSafe)
147 inline
void emitLog(
int level, const
char * origin, const
std::
string& msg)
149 _baselogger.
emitLog(level, origin, msg);
154 IsBaseLoggerThreadSafe)
155 bool filterByOrigin(
int level, const
char * origin)
const 157 return _baselogger.filterByOrigin(level, origin);
161 TOMOGRAPHER_ENABLED_IF(!IsBaseLoggerThreadSafe)
162 inline
void emitLog(
int level, const
char * origin, const
std::
string& msg)
165 _baselogger.emitLog(level, origin, msg);
170 !IsBaseLoggerThreadSafe)
171 bool filterByOrigin(
int level, const
char * origin)
const 174 return _baselogger.filterByOrigin(level, origin);
189 template<
typename BaseLogger>
190 struct TOMOGRAPHER_EXPORT
LoggerTraits<MultiProc::CxxThreads::ThreadSanitizerLogger<BaseLogger> >
205 namespace MultiProc {
206 namespace CxxThreads {
266 template<
typename TaskType_,
typename TaskCData_,
267 typename LoggerType_,
typename TaskCountIntType_ =
int>
298 #if defined(__GNUC__) && __GNUC__ == 4 && __GNUC_MINOR__ <= 6 && !defined(__clang__) 299 std::chrono::monotonic_clock
308 TaskInterruptedInnerException() : msg(
"Task Interrupted") { }
309 virtual ~TaskInterruptedInnerException()
throw() { };
310 const char * what()
const throw() {
return msg.
c_str(); }
315 TaskInnerException(
std::string msgexc) : msg(
"Task raised an exception: "+msgexc) { }
316 virtual ~TaskInnerException()
throw() { };
317 const char * what()
const throw() {
return msg.
c_str(); }
321 struct thread_shared_data {
322 thread_shared_data(
const TaskCData * pcdata_, LoggerType & logger_,
323 TaskCountIntType num_total_runs,
int num_threads)
329 schedule(num_total_runs, num_threads),
333 thread_shared_data(thread_shared_data && x)
340 status_report(
std::move(x.status_report))
343 const TaskCData * pcdata;
353 StdClockType::time_point time_start;
356 const int num_threads;
357 TaskCountIntType num_active_working_threads;
359 const TaskCountIntType num_total_runs;
360 TaskCountIntType num_completed;
361 TaskCountIntType num_launched;
368 Schedule(TaskCountIntType num_total_runs_,
int num_threads_)
369 : num_threads(num_threads_),
370 num_active_working_threads(0),
371 num_total_runs(num_total_runs_),
374 interrupt_requested(0),
381 num_active_working_threads(x.num_active_working_threads),
382 num_total_runs(x.num_total_runs),
383 num_completed(x.num_completed),
384 num_launched(x.num_launched),
385 interrupt_requested(x.interrupt_requested),
386 inner_exception(
std::move(x.inner_exception)),
397 int periodic_interval;
398 TaskCountIntType numreportsrecieved;
399 FullStatusReportType full_report;
400 FullStatusReportCallbackType user_fn;
411 periodic_interval(-1),
412 numreportsrecieved(0),
421 : underway(x.underway),
422 initialized(x.initialized),
424 periodic_interval(x.periodic_interval),
425 numreportsrecieved(x.numreportsrecieved),
428 counter_user(x.counter_user),
429 counter_periodic(x.counter_periodic),
436 template<
typename Struct,
typename Fn>
437 void with_lock(Struct & s, Fn fn) {
444 struct thread_private_data
448 thread_shared_data * shared_data;
450 TaskLoggerType & logger;
452 TaskCountIntType task_id;
453 int local_status_report_counter_user;
454 int local_status_report_counter_periodic;
456 thread_private_data(
int thread_id_, thread_shared_data * shared_data_, TaskLoggerType & logger_)
457 : thread_id(thread_id_),
458 shared_data(shared_data_),
461 local_status_report_counter_user(0),
462 local_status_report_counter_periodic(0)
466 inline bool statusReportRequested()
const 468 if (shared_data->schedule.interrupt_requested) {
469 logger.
longdebug(
"CxxThreads::thread_private_data::statusReportRequested()",
470 "tasks interrupt has been requested");
471 throw TaskInterruptedInnerException();
477 if (thread_id == 0) {
480 if (shared_data->status_report.periodic_interval > 0 && shared_data->status_report.user_fn) {
481 _master_thread_update_status_report_periodic_interval_counter();
486 if (shared_data->status_report.ready) {
487 logger.
longdebug(
"Tomographer::MultiProc::CxxThreads::thread_private_data::statusReportRequested()",
488 "Status report is ready.");
491 std::lock(shared_data->status_report.mutex, shared_data->user_mutex);
496 shared_data->status_report.user_fn(
std::move(shared_data->status_report.full_report));
498 shared_data->status_report.numreportsrecieved = 0;
499 shared_data->status_report.underway =
false;
500 shared_data->status_report.initialized =
false;
501 shared_data->status_report.ready =
false;
502 shared_data->status_report.full_report.workers_running.clear();
503 shared_data->status_report.full_report.workers_reports.clear();
507 return local_status_report_counter_user != (int)shared_data->status_report.counter_user ||
508 local_status_report_counter_periodic != (
int)shared_data->status_report.counter_periodic;
512 inline void _master_thread_update_status_report_periodic_interval_counter()
const 515 std::chrono::duration_cast<std::chrono::milliseconds>(
516 StdClockType::now().time_since_epoch()
517 ).count() / shared_data->status_report.periodic_interval
521 inline void submitStatusReport(
const TaskStatusReportType &statreport)
523 if (local_status_report_counter_user == (
int)shared_data->status_report.counter_user &&
524 local_status_report_counter_periodic == (
int)shared_data->status_report.counter_periodic) {
526 logger.
warning(
"CxxThreads TaskDispatcher/taskmanageriface",
"Task submitted unsollicited status report");
533 local_status_report_counter_user = shared_data->status_report.counter_user;
534 local_status_report_counter_periodic = shared_data->status_report.counter_periodic;
538 stream <<
"status report received for thread #" << thread_id <<
", treating it ... " 539 <<
"numreportsrecieved=" << shared_data->status_report.numreportsrecieved
540 <<
" num_active_working_threads=" << shared_data->schedule.num_active_working_threads ;
547 if (!shared_data->status_report.initialized) {
552 if (shared_data->status_report.underway) {
554 logger.
warning(
"CxxThreads TaskDispatcher/taskmanageriface",
"status report already underway!");
557 if (!shared_data->status_report.user_fn) {
559 logger.
warning(
"CxxThreads TaskDispatcher/taskmanageriface",
560 "no user status report handler set! Call setStatusReportHandler() first.");
564 shared_data->status_report.underway =
true;
565 shared_data->status_report.initialized =
true;
566 shared_data->status_report.ready =
false;
569 shared_data->status_report.full_report = FullStatusReportType();
570 shared_data->status_report.full_report.num_completed = shared_data->schedule.num_completed;
571 shared_data->status_report.full_report.num_total_runs = shared_data->schedule.num_total_runs;
573 StdClockType::now() - shared_data->time_start
575 int num_threads = shared_data->schedule.num_threads;
579 shared_data->status_report.full_report.workers_running.clear();
580 shared_data->status_report.full_report.workers_reports.clear();
581 shared_data->status_report.full_report.workers_running.resize((
std::size_t)num_threads,
false);
582 shared_data->status_report.full_report.workers_reports.resize((
std::size_t)num_threads);
584 shared_data->status_report.numreportsrecieved = 0;
586 logger.
debug(
"CxxThreads TaskDispatcher/taskmanageriface", [&](
std::ostream & stream) {
587 stream <<
"vectors resized to workers_running.size()=" 588 << shared_data->status_report.full_report.workers_running.size()
589 <<
" and workers_reports.size()=" 590 << shared_data->status_report.full_report.workers_reports.size()
598 logger.
debug(
"CxxThreads TaskDispatcher/taskmanageriface",
"thread_id=%ld, workers_reports.size()=%ld",
599 (
long)thread_id, (
long)shared_data->status_report.full_report.workers_reports.size());
602 (
std::size_t)thread_id < shared_data->status_report.full_report.workers_reports.size());
604 shared_data->status_report.full_report.workers_running[(
std::size_t)thread_id] =
true;
605 shared_data->status_report.full_report.workers_reports[(
std::size_t)thread_id] = statreport;
607 ++ shared_data->status_report.numreportsrecieved;
609 if (shared_data->status_report.numreportsrecieved == shared_data->schedule.num_active_working_threads) {
616 shared_data->status_report.ready =
true;
623 thread_shared_data shared_data;
638 TaskCountIntType num_total_runs_,
640 : shared_data(pcdata_, logger_, num_total_runs_, num_threads_)
645 : shared_data(
std::move(other.shared_data))
651 for (
auto r : shared_data.results) {
664 shared_data.logger.debug(
"MultiProc::CxxThreads::TaskDispatcher::run()",
"Let's go!");
665 shared_data.time_start = StdClockType::now();
667 shared_data.results.resize((
std::size_t)shared_data.schedule.num_total_runs, NULL);
669 shared_data.logger.debug(
"MultiProc::CxxThreads::TaskDispatcher::run()",
"preparing for parallel runs");
673 auto worker_fn_id = [&](
const int thread_id) noexcept(
true) {
676 TaskLoggerType threadsafelogger(shared_data.logger, & shared_data.user_mutex);
678 thread_private_data privdat(thread_id, & shared_data, threadsafelogger);
681 privdat.logger.longdebug(
"Tomographer::MultiProc::CxxThreads::TaskDispatcher::run()",
682 "Thread #%d: thread-safe logger and private thread data set up", thread_id);
687 shared_data.with_lock(shared_data.schedule, [](Schedule & schedule) {
688 ++ schedule.num_active_working_threads;
691 shared_data.with_lock(shared_data.schedule, [](Schedule & schedule) {
692 -- schedule.num_active_working_threads;
699 if (shared_data.schedule.interrupt_requested) {
704 shared_data.with_lock(shared_data.schedule, [&privdat](Schedule & schedule) {
705 if (schedule.num_launched == schedule.num_total_runs) {
706 privdat.task_id = -1;
709 privdat.task_id = schedule.num_launched;
710 ++ schedule.num_launched ;
713 if ( privdat.task_id < 0 ) {
722 privdat.local_status_report_counter_user = shared_data.status_report.counter_user;
723 privdat.local_status_report_counter_periodic = shared_data.status_report.counter_periodic;
728 _run_task(privdat, shared_data) ;
730 }
catch (TaskInterruptedInnerException & exc) {
731 privdat.logger.debug(
"CxxThreads::run()/worker",
"Tasks interrupted.") ;
733 shared_data.schedule.interrupt_requested =
true;
736 privdat.logger.debug(
"CxxThreads::run()/worker",
"Exception caught inside task! " 737 + boost::current_exception_diagnostic_information()) ;
739 shared_data.schedule.interrupt_requested =
true;
740 shared_data.schedule.inner_exception +=
std::string(
"Exception caught inside task: ")
741 + boost::current_exception_diagnostic_information() +
"\n";
746 ++ shared_data.schedule.num_completed;
755 if (thread_id == 0 && !shared_data.schedule.interrupt_requested) {
757 const int sleep_val =
std::max(shared_data.status_report.periodic_interval, 200);
759 while (shared_data.schedule.num_active_working_threads > 0) {
763 privdat.statusReportRequested();
765 privdat.logger.debug(
"CxxThreads::run()",
"[master] Exception caught inside task!") ;
767 shared_data.schedule.interrupt_requested =
true;
768 shared_data.schedule.inner_exception +=
std::string(
"Exception caught inside task: ")
769 + boost::current_exception_diagnostic_information() +
"\n";
770 privdat.logger.debug(
"CxxThreads::run()",
"[master] Exception caught inside task -- handled.") ;
783 shared_data.logger.debug(
"MultiProc::CxxThreads::TaskDispatcher::run()",
"About to launch threads");
788 for (
int thread_id = 1; thread_id < shared_data.schedule.num_threads; ++thread_id) {
790 worker_fn_id(thread_id);
799 shared_data.logger.debug(
"MultiProc::CxxThreads::TaskDispatcher::run()",
"Threads finished");
801 if (shared_data.schedule.inner_exception.size()) {
807 if (shared_data.schedule.interrupt_requested) {
811 shared_data.logger.debug(
"MultiProc::CxxThreads::TaskDispatcher::run()",
"Done.");
820 return shared_data.schedule.num_total_runs;
827 return shared_data.results;
839 void _run_task(thread_private_data & privdat, thread_shared_data & shared_data)
844 if (shared_data.schedule.interrupt_requested) {
849 privdat.logger.longdebug(
"Tomographer::MultiProc::CxxThreads::TaskDispatcher::_run_task()",
850 "Run #%lu: querying CData for task input", (
unsigned long)privdat.task_id);
858 const auto input = shared_data.pcdata->getTaskInput(privdat.task_id);
861 privdat.logger.debug(
"Tomographer::MultiProc::CxxThreads::TaskDispatcher::_run_task()",
862 "Running task #%lu ...", (
unsigned long)privdat.task_id);
865 TaskType t(input, shared_data.pcdata, privdat.logger);
868 privdat.logger.longdebug(
"Tomographer::MultiProc::CxxThreads::TaskDispatcher::_run_task()",
869 "Task #%lu set up.", (
unsigned long)privdat.task_id);
872 t.run(shared_data.pcdata, privdat.logger, &privdat);
874 privdat.logger.longdebug(
"Tomographer::MultiProc::CxxThreads::TaskDispatcher::_run_task()",
875 "Task #%lu finished, about to collect result.", (
unsigned long)privdat.task_id);
878 shared_data.results[(
std::size_t)privdat.task_id] =
new TaskResultType(t.stealResult());
880 privdat.logger.longdebug(
"Tomographer::MultiProc::CxxThreads::TaskDispatcher::_run_task()",
"task done") ;
899 shared_data.status_report.user_fn = fnstatus;
922 ++ shared_data.status_report.counter_user;
935 shared_data.status_report.periodic_interval = milliseconds;
952 shared_data.interrupt_requested = 1;
959 template<
typename TaskType_,
typename TaskCData_,
960 typename LoggerType_,
typename TaskCountIntType_ =
int>
963 mkTaskDispatcher(TaskCData_ * pcdata_,
964 LoggerType_ & logger_,
965 TaskCountIntType_ num_total_runs_,
969 pcdata_, logger_, num_total_runs_, num_threads_
ThreadSanitizerLogger< LoggerType_ > TaskLoggerType
A thread-safe logger type which is passed on to the child tasks.
const TaskResultType & collectedTaskResult(std::size_t k) const
Get the result of a specific given task.
void run()
Run the specified tasks.
FullStatusReport< TaskStatusReportType, TaskCountIntType > FullStatusReportType
The type to use to generate a full status report of all running tasks.
Base namespace for the Tomographer project.
TaskDispatcher(TaskCData *pcdata_, LoggerType &logger_, TaskCountIntType num_total_runs_, int num_threads_=(int) std::thread::hardware_concurrency())
Task dispatcher constructor.
void emitLog(int level, const char *origin, const std::string &msg)
Implementation of Logger::LoggerBase::emitLog() for a base logger which is thread-safe.
T duration_cast(T... args)
void setStatusReportHandler(FullStatusReportCallbackType fnstatus)
assign a callable to be called whenever a status report is requested
T hardware_concurrency(T... args)
void requestPeriodicStatusReport(int milliseconds)
Request a periodic status report.
ThreadSanitizerLogger(BaseLogger &logger, std::mutex *mutex)
Constructor.
void requestStatusReport()
Request a status report.
LoggerType_ LoggerType
The logger type specified to the dispatcher (not necessarily thread-safe)
#define TOMOGRAPHER_CXX_STACK_FORCE_REALIGN
TaskType_ TaskType
The task type.
TaskType::StatusReportType TaskStatusReportType
The type used by a single task when providing a status report.
void warning(const char *origin, const char *fmt,...)
emit a warning message
TaskCountIntType_ TaskCountIntType
Integer type used to count the number of tasks to run (or running)
Some C++ utilities, with a tad of C++11 tricks.
void debug(const char *origin, const char *fmt,...)
emit an debug message
Managing the need for specific overrides to operator new() for some types (especially Eigen types) ...
std::function< void(const FullStatusReportType &)> FullStatusReportCallbackType
The relevant type for a callback function (or callable) which is provided with the full status report...
void longdebug(const char *origin, const char *fmt,...)
emit a very verbose debugging message
Some common definitions for multiprocessing interfaces.
Traits template struct to be specialized for specific Logger implementations.
TaskType::ResultType TaskResultType
The task result type.
TaskCData_ TaskCData
The type which stores constant, shared data for all tasks to access.
Dispatches tasks to parallel threads using C++11 native threads.
TaskCountIntType numTaskRuns() const
Total number of task run instances.
void requestInterrupt()
Request an immediate interruption of the tasks.
#define tomographer_assert(...)
Assertion test macro.
const std::vector< TaskResultType * > & collectedTaskResults() const
Get all the task results.
Utilities for logging messages.
A complete status report, abstract version.
Wrapper logger to call non-thread-safe loggers from a multithreaded environment.