28 #ifndef MULTIPROCTHREADS_H 29 #define MULTIPROCTHREADS_H 37 #include <boost/exception/diagnostic_information.hpp> 58 namespace CxxThreads {
104 template<
typename BaseLogger>
111 BaseLogger & _baselogger;
124 template<
typename... MoreArgs>
140 TOMOGRAPHER_ENABLED_IF(IsBaseLoggerThreadSafe)
143 _baselogger.emitLog(level, origin, msg);
148 IsBaseLoggerThreadSafe)
151 return _baselogger.filterByOrigin(level, origin);
155 TOMOGRAPHER_ENABLED_IF(!IsBaseLoggerThreadSafe)
159 _baselogger.emitLog(level, origin, msg);
164 !IsBaseLoggerThreadSafe)
168 return _baselogger.filterByOrigin(level, origin);
183 template<
typename BaseLogger>
198 namespace MultiProc {
199 namespace CxxThreads {
258 template<
typename TaskType_,
typename TaskCData_,
typename ResultsCollector_,
259 typename LoggerType_,
typename CountIntType_ =
int>
290 #if defined(__GNUC__) && __GNUC__ == 4 && __GNUC_MINOR__ <= 6 && !defined(__clang__) 291 std::chrono::monotonic_clock
300 TaskInterruptedInnerException() : msg(
"Task Interrupted") { }
301 virtual ~TaskInterruptedInnerException()
throw() { };
302 const char * what()
const throw() {
return msg.
c_str(); }
307 TaskInnerException(
std::string msgexc) : msg(
"Task raised an exception: "+msgexc) { }
308 virtual ~TaskInnerException()
throw() { };
309 const char * what()
const throw() {
return msg.
c_str(); }
313 struct thread_shared_data {
314 thread_shared_data(
const TaskCData * pcdata_, ResultsCollector * results_, LoggerType & logger_,
315 CountIntType num_total_runs, CountIntType num_threads)
320 schedule(num_total_runs, num_threads),
324 const TaskCData * pcdata;
327 ResultsCollector * results;
330 StdClockType::time_point time_start;
333 const CountIntType num_threads;
334 CountIntType num_active_working_threads;
336 const CountIntType num_total_runs;
337 CountIntType num_completed;
338 CountIntType num_launched;
345 Schedule(CountIntType num_total_runs_, CountIntType num_threads_)
346 : num_threads(num_threads_),
347 num_active_working_threads(0),
348 num_total_runs(num_total_runs_),
351 interrupt_requested(0),
363 int periodic_interval;
364 CountIntType numreportsrecieved;
365 FullStatusReportType full_report;
366 FullStatusReportCallbackType user_fn;
376 periodic_interval(-1),
377 numreportsrecieved(0),
387 template<
typename Struct,
typename Fn>
388 void with_lock(Struct & s, Fn fn) {
395 struct thread_private_data
397 const CountIntType thread_id;
399 thread_shared_data * shared_data;
401 TaskLoggerType & logger;
403 CountIntType task_id;
404 CountIntType local_status_report_counter;
406 thread_private_data(CountIntType thread_id_, thread_shared_data * shared_data_, TaskLoggerType & logger_)
407 : thread_id(thread_id_),
408 shared_data(shared_data_),
411 local_status_report_counter(0)
415 inline bool statusReportRequested()
const 417 if (shared_data->schedule.interrupt_requested) {
418 logger.
longdebug(
"CxxThreads::thread_private_data::statusReportRequested()",
419 "tasks interrupt has been requested");
420 throw TaskInterruptedInnerException();
426 if (thread_id == 0) {
429 if (shared_data->status_report.periodic_interval > 0 && shared_data->status_report.user_fn) {
430 _master_thread_update_status_report_periodic_interval_counter();
435 if (shared_data->status_report.ready) {
436 logger.
longdebug(
"Tomographer::MultiProc::CxxThreads::thread_private_data::statusReportRequested()",
437 "Status report is ready.");
440 std::lock(shared_data->status_report.mutex, shared_data->user_mutex);
445 shared_data->status_report.user_fn(shared_data->status_report.full_report);
447 shared_data->status_report.numreportsrecieved = 0;
448 shared_data->status_report.underway =
false;
449 shared_data->status_report.initialized =
false;
450 shared_data->status_report.ready =
false;
451 shared_data->status_report.full_report.workers_running.clear();
452 shared_data->status_report.full_report.workers_reports.clear();
456 return (
int)local_status_report_counter != (int)shared_data->status_report.counter;
460 inline void _master_thread_update_status_report_periodic_interval_counter()
const 462 shared_data->status_report.counter = (uint32_t)(
463 (std::chrono::duration_cast<std::chrono::milliseconds>(
464 StdClockType::now().time_since_epoch()
465 ).count() / shared_data->status_report.periodic_interval) & 0x00FFFFFF
472 inline void submitStatusReport(
const TaskStatusReportType &statreport)
474 if ((
int)local_status_report_counter == (
int)shared_data->status_report.counter) {
476 logger.
warning(
"CxxThreads TaskDispatcher/taskmanageriface",
"Task submitted unsollicited status report");
483 local_status_report_counter = shared_data->status_report.counter;
487 stream <<
"status report received for thread #" << thread_id <<
", treating it ... " 488 <<
"numreportsrecieved=" << shared_data->status_report.numreportsrecieved
489 <<
" num_active_working_threads=" << shared_data->schedule.num_active_working_threads ;
496 if (!shared_data->status_report.initialized) {
501 if (shared_data->status_report.underway) {
503 logger.
warning(
"CxxThreads TaskDispatcher/taskmanageriface",
"status report already underway!");
506 if (!shared_data->status_report.user_fn) {
508 logger.
warning(
"CxxThreads TaskDispatcher/taskmanageriface",
509 "no user status report handler set! Call setStatusReportHandler() first.");
513 shared_data->status_report.underway =
true;
514 shared_data->status_report.initialized =
true;
515 shared_data->status_report.ready =
false;
518 shared_data->status_report.full_report = FullStatusReportType();
519 shared_data->status_report.full_report.num_completed = shared_data->schedule.num_completed;
520 shared_data->status_report.full_report.num_total_runs = shared_data->schedule.num_total_runs;
522 StdClockType::now() - shared_data->time_start
524 int num_threads = shared_data->schedule.num_threads;
528 shared_data->status_report.full_report.workers_running.clear();
529 shared_data->status_report.full_report.workers_reports.clear();
530 shared_data->status_report.full_report.workers_running.resize(num_threads,
false);
531 shared_data->status_report.full_report.workers_reports.resize(num_threads);
533 shared_data->status_report.numreportsrecieved = 0;
535 logger.
debug(
"CxxThreads TaskDispatcher/taskmanageriface", [&](
std::ostream & stream) {
536 stream <<
"vectors resized to workers_running.size()=" 537 << shared_data->status_report.full_report.workers_running.size()
538 <<
" and workers_reports.size()=" 539 << shared_data->status_report.full_report.workers_reports.size()
547 logger.
debug(
"CxxThreads TaskDispatcher/taskmanageriface",
"thread_id=%ld, workers_reports.size()=%ld",
548 (
long)thread_id, (
long)shared_data->status_report.full_report.workers_reports.size());
551 (
std::size_t)thread_id < shared_data->status_report.full_report.workers_reports.size());
553 shared_data->status_report.full_report.workers_running[thread_id] =
true;
554 shared_data->status_report.full_report.workers_reports[thread_id] = statreport;
556 ++ shared_data->status_report.numreportsrecieved;
558 if (shared_data->status_report.numreportsrecieved == shared_data->schedule.num_active_working_threads) {
565 shared_data->status_report.ready =
true;
572 thread_shared_data shared_data;
589 TaskDispatcher(TaskCData * pcdata_, ResultsCollector * results_, LoggerType & logger_,
590 CountIntType num_total_runs_,
592 : shared_data(pcdata_, results_, logger_, num_total_runs_, num_threads_)
602 shared_data.logger.debug(
"MultiProc::CxxThreads::TaskDispatcher::run()",
"Let's go!");
603 shared_data.time_start = StdClockType::now();
605 shared_data.results->init(shared_data.schedule.num_total_runs, 1, shared_data.pcdata);
607 shared_data.logger.debug(
"MultiProc::CxxThreads::TaskDispatcher::run()",
"preparing for parallel runs");
611 auto worker_fn_id = [&](
const int thread_id) noexcept(
true) {
614 TaskLoggerType threadsafelogger(shared_data.logger, & shared_data.user_mutex);
616 thread_private_data privdat(thread_id, & shared_data, threadsafelogger);
619 privdat.logger.longdebug(
"Tomographer::MultiProc::CxxThreads::TaskDispatcher::run()",
620 "Thread #%d: thread-safe logger and private thread data set up", thread_id);
625 shared_data.with_lock(shared_data.schedule, [](Schedule & schedule) {
626 ++ schedule.num_active_working_threads;
629 shared_data.with_lock(shared_data.schedule, [](Schedule & schedule) {
630 -- schedule.num_active_working_threads;
637 if (shared_data.schedule.interrupt_requested) {
642 shared_data.with_lock(shared_data.schedule, [&privdat](Schedule & schedule) {
643 if (schedule.num_launched == schedule.num_total_runs) {
644 privdat.task_id = -1;
647 privdat.task_id = schedule.num_launched;
648 ++ schedule.num_launched ;
651 if ( privdat.task_id < 0 ) {
660 privdat.local_status_report_counter = shared_data.status_report.counter;
665 _run_task(privdat, shared_data) ;
667 }
catch (TaskInterruptedInnerException & exc) {
668 privdat.logger.debug(
"CxxThreads::run()/worker",
"Tasks interrupted.") ;
670 shared_data.schedule.interrupt_requested =
true;
673 privdat.logger.debug(
"CxxThreads::run()/worker",
"Exception caught inside task! " 674 + boost::current_exception_diagnostic_information()) ;
676 shared_data.schedule.interrupt_requested =
true;
677 shared_data.schedule.inner_exception +=
std::string(
"Exception caught inside task: ")
678 + boost::current_exception_diagnostic_information() +
"\n";
683 ++ shared_data.schedule.num_completed;
692 if (thread_id == 0 && !shared_data.schedule.interrupt_requested) {
694 const int sleep_val =
std::max(shared_data.status_report.periodic_interval, 200);
696 while (shared_data.schedule.num_active_working_threads > 0) {
700 privdat.statusReportRequested();
702 privdat.logger.debug(
"CxxThreads::run()",
"[master] Exception caught inside task!") ;
704 shared_data.schedule.interrupt_requested =
true;
705 shared_data.schedule.inner_exception +=
std::string(
"Exception caught inside task: ")
706 + boost::current_exception_diagnostic_information() +
"\n";
707 privdat.logger.debug(
"CxxThreads::run()",
"[master] Exception caught inside task -- handled.") ;
720 shared_data.logger.debug(
"MultiProc::CxxThreads::TaskDispatcher::run()",
"About to launch threads");
725 for (CountIntType thread_id = 1; thread_id < shared_data.schedule.num_threads; ++thread_id) {
727 worker_fn_id(thread_id);
736 shared_data.logger.debug(
"MultiProc::CxxThreads::TaskDispatcher::run()",
"Threads finished");
738 if (shared_data.schedule.inner_exception.size()) {
744 if (shared_data.schedule.interrupt_requested) {
748 shared_data.results->runsFinished(shared_data.schedule.num_total_runs, shared_data.pcdata);
750 shared_data.logger.debug(
"MultiProc::CxxThreads::TaskDispatcher::run()",
"Done.");
756 void _run_task(thread_private_data & privdat, thread_shared_data & shared_data)
761 if (shared_data.schedule.interrupt_requested) {
766 privdat.logger.longdebug(
"Tomographer::MultiProc::CxxThreads::TaskDispatcher::_run_task()",
767 "Run #%lu: querying CData for task input", (
unsigned long)privdat.task_id);
775 const auto input = shared_data.pcdata->getTaskInput(privdat.task_id);
778 privdat.logger.debug(
"Tomographer::MultiProc::CxxThreads::TaskDispatcher::_run_task()",
779 "Running task #%lu ...", (
unsigned long)privdat.task_id);
782 TaskType t(input, shared_data.pcdata, privdat.logger);
785 privdat.logger.longdebug(
"Tomographer::MultiProc::CxxThreads::TaskDispatcher::_run_task()",
786 "Task #%lu set up.", (
unsigned long)privdat.task_id);
789 t.run(shared_data.pcdata, privdat.logger, &privdat);
791 privdat.logger.longdebug(
"Tomographer::MultiProc::CxxThreads::TaskDispatcher::_run_task()",
792 "Task #%lu finished, about to collect result.", (
unsigned long)privdat.task_id);
795 shared_data.results->collectResult(privdat.task_id, t.getResult(), shared_data.pcdata);
807 privdat.logger.longdebug(
"Tomographer::MultiProc::CxxThreads::TaskDispatcher::_run_task()",
"task done") ;
825 shared_data.status_report.user_fn = fnstatus;
848 shared_data.status_report.counter = (shared_data.status_report.counter + 1) & 0x7f;
861 shared_data.status_report.periodic_interval = milliseconds;
878 shared_data.interrupt_requested = 1;
FullStatusReport< TaskStatusReportType > FullStatusReportType
The type to use to generate a full status report of all running tasks.
LoggerType_ LoggerType
The logger type specified to the dispatcher (not necessarily thread-safe)
void run()
Run the specified tasks.
TaskDispatcher(TaskCData *pcdata_, ResultsCollector *results_, LoggerType &logger_, CountIntType num_total_runs_, CountIntType num_threads_=std::thread::hardware_concurrency())
Task dispatcher constructor.
CountIntType_ CountIntType
Integer type used to count the number of tasks to run (or running)
Base namespace for the Tomographer project.
void emitLog(int level, const char *origin, const std::string &msg)
Implementation of Logger::LoggerBase::emitLog() for a base logger which is thread-safe.
TaskType::StatusReportType TaskStatusReportType
The type used by a single task when providing a status report.
T duration_cast(T... args)
T hardware_concurrency(T... args)
ThreadSanitizerLogger(BaseLogger &logger, std::mutex *mutex)
Constructor.
TaskType_ TaskType
The task type.
#define TOMOGRAPHER_CXX_STACK_FORCE_REALIGN
std::function< void(const FullStatusReportType &)> FullStatusReportCallbackType
The relevant type for a callback function (or callable) which is provided with the full status report...
bool filterByOrigin(int level, const char *origin) const
Implementation of Logger::LoggerBase::filterByOrigin() for a base logger which is thread-safe...
void requestPeriodicStatusReport(int milliseconds)
Request a periodic status report.
See LoggerTraits<Derived> and DefaultLoggerTraits.
void setStatusReportHandler(FullStatusReportCallbackType fnstatus)
assign a callable to be called whenever a status report is requested
void requestInterrupt()
Request an immediate interruption of the tasks.
void requestStatusReport()
Request a status report.
See LoggerTraits<Derived> and DefaultLoggerTraits.
void warning(const char *origin, const char *fmt,...)
emit a warning message
Some C++ utilities, with a tad of C++11 tricks.
void debug(const char *origin, const char *fmt,...)
emit an debug message
Managing the need for specific overrides to operator new() for some types (especially Eigen types) ...
ThreadSanitizerLogger< LoggerType_ > TaskLoggerType
A thread-safe logger type which is passed on to the child tasks.
void longdebug(const char *origin, const char *fmt,...)
emit a very verbose debugging message
Some common definitions for multiprocessing interfaces.
Traits template struct to be specialized for specific Logger implementations.
Dispatches tasks to parallel threads using C++11 native threads.
LoggerBase(int level_=INFO)
Construct the base logger object.
ResultsCollector_ ResultsCollector
The type which is responsible to collect the final results of the individual tasks.
int level() const
Get the log level set for this logger.
TaskCData_ TaskCData
The type which stores constant, shared data for all tasks to access.
#define tomographer_assert(...)
Assertion test macro.
Utilities for logging messages.
A complete status report, abstract version.
Wrapper logger to call non-thread-safe loggers from a multithreaded environment.