28 #ifndef MULTIPROCTHREADCOMMON_H 29 #define MULTIPROCTHREADCOMMON_H 34 #include <boost/exception/diagnostic_information.hpp> 54 #ifdef TOMOGRAPHER_USE_WINDOWS_SLEEP 57 # define TOMOGRAPHER_SLEEP_FOR_MS(x) Sleep((x)) 61 # ifdef TOMOGRAPHER_USE_MINGW_STD_THREAD 62 # include <mingw.thread.h> 64 # define TOMOGRAPHER_SLEEP_FOR_MS(x) \ 65 std::this_thread::sleep_for(std::chrono::milliseconds((x))) 72 namespace ThreadCommon {
97 template<
typename TaskType_,
typename TaskCountIntType_ =
int>
129 #if defined(__GNUC__) && __GNUC__ == 4 && __GNUC_MINOR__ <= 6 && !defined(__clang__) 130 std::chrono::monotonic_clock
141 const char * what()
const throw() {
return msg.
c_str(); }
145 template<
typename TaskCData,
typename LoggerType>
148 TaskCountIntType num_total_runs,
int num_threads)
152 time_start(StdClockType::now()),
153 schedule(num_total_runs, num_threads),
164 status_report(
std::move(x.status_report))
169 for (
auto r : results) {
176 const TaskCData * pcdata;
184 StdClockType::time_point time_start;
188 int num_active_working_threads;
190 const TaskCountIntType num_total_runs;
191 TaskCountIntType num_completed;
192 TaskCountIntType num_launched;
197 Schedule(TaskCountIntType num_total_runs_,
int num_threads_)
198 : num_threads(num_threads_),
199 num_active_working_threads(0),
200 num_total_runs(num_total_runs_),
203 interrupt_requested(0),
209 num_active_working_threads(x.num_active_working_threads),
210 num_total_runs(x.num_total_runs),
211 num_completed(x.num_completed),
212 num_launched(x.num_launched),
213 interrupt_requested(x.interrupt_requested),
214 inner_exception(
std::move(x.inner_exception))
223 int periodic_interval;
224 int num_waiting_reports;
226 FullStatusReportType full_report;
227 FullStatusReportCallbackType user_fn;
241 : in_preparation(false),
243 periodic_interval(-1),
244 num_waiting_reports(0),
247 event_counter_user(0),
248 event_counter_master(0u),
253 : in_preparation(x.in_preparation),
255 periodic_interval(x.periodic_interval),
256 num_waiting_reports(x.num_waiting_reports),
259 event_counter_user(x.event_counter_user),
260 event_counter_master(x.event_counter_master),
261 last_report_time(x.last_report_time)
269 template<
typename ThreadSharedDataType,
typename TaskLocalLoggerType,
typename CriticalExecutionLocker>
274 CriticalExecutionLocker & locker;
276 ThreadSharedDataType * shared_data;
278 TaskLocalLoggerType & llogger;
280 TaskCountIntType task_id;
283 int local_status_report_event_counter_user;
285 unsigned int local_status_report_event_counter;
289 TaskLocalLoggerType & llogger_, CriticalExecutionLocker & locker_)
290 : thread_id(thread_id_),
292 shared_data(shared_data_),
295 local_status_report_event_counter_user(0),
296 local_status_report_event_counter(0u)
300 : thread_id(x.thread_id),
302 shared_data(x.shared_data),
305 local_status_report_event_counter_user(x.local_status_report_event_counter_user),
306 local_status_report_event_counter(x.local_status_report_event_counter)
310 inline bool statusReportRequested()
312 if (shared_data->schedule.interrupt_requested) {
313 llogger.subLogger(
"/TaskManagerIface::statusReportRequested()")
314 .longdebug(
"Tasks interrupt has been requested");
321 if (thread_id == 0) {
323 _master_update_event_counter();
327 if (shared_data->status_report.ready) {
328 _master_send_status_report();
332 return local_status_report_event_counter != shared_data->status_report.event_counter_master;
336 inline void _master_update_event_counter()
338 if (local_status_report_event_counter_user != (
int)shared_data->status_report.event_counter_user) {
343 local_status_report_event_counter_user = (int)shared_data->status_report.event_counter_user;
345 _master_initiate_status_report();
347 }
else if ( shared_data->status_report.periodic_interval > 0 &&
348 ( std::chrono::duration_cast<std::chrono::milliseconds>(
349 StdClockType::now() - shared_data->status_report.last_report_time
350 ).count() > shared_data->status_report.periodic_interval ) ) {
353 _master_initiate_status_report();
358 inline void _master_initiate_status_report()
360 locker.critical_status_report([&]() {
364 llogger.originPrefix() + llogger.glue()
365 +
std::string(
"TaskManagerIface::statusReportRequested()"),
370 if (shared_data->status_report.in_preparation) {
371 logger.
longdebug(
"Still working on previous status report, ignoring new report due");
375 if (!shared_data->status_report.user_fn) {
377 logger.warning(
"no user status report handler set! Call setStatusReportHandler() first.");
381 shared_data->status_report.in_preparation =
true;
382 shared_data->status_report.ready =
false;
387 shared_data->status_report.last_report_time = StdClockType::now();
390 shared_data->status_report.full_report = FullStatusReportType();
391 shared_data->status_report.full_report.num_completed = shared_data->schedule.num_completed;
392 shared_data->status_report.full_report.num_total_runs = shared_data->schedule.num_total_runs;
394 StdClockType::now() - shared_data->time_start
396 int num_threads = shared_data->schedule.num_threads;
400 shared_data->status_report.full_report.workers_running.clear();
401 shared_data->status_report.full_report.workers_reports.clear();
402 shared_data->status_report.full_report.workers_running.resize((
std::size_t)num_threads,
false);
403 shared_data->status_report.full_report.workers_reports.resize((
std::size_t)num_threads);
405 shared_data->status_report.num_waiting_reports = shared_data->schedule.num_active_working_threads;
408 stream <<
"vectors resized to workers_running.size()=" 409 << shared_data->status_report.full_report.workers_running.size()
410 <<
" and workers_reports.size()=" 411 << shared_data->status_report.full_report.workers_reports.size()
416 ++ shared_data->status_report.event_counter_master;
420 inline void _master_send_status_report()
422 auto logger = llogger.subLogger(
"/TaskManagerIface::statusReportRequested()");
423 logger.longdebug(
"Status report is ready, sending to user function.");
425 locker.critical_status_report_and_user_fn([&](){
427 shared_data->status_report.user_fn(
std::move(shared_data->status_report.full_report));
429 shared_data->status_report.in_preparation =
false;
430 shared_data->status_report.ready =
false;
431 shared_data->status_report.num_waiting_reports = 0;
432 shared_data->status_report.full_report.workers_running.clear();
433 shared_data->status_report.full_report.workers_reports.clear();
437 inline void submitStatusReport(
const TaskStatusReportType & report)
439 local_status_report_event_counter = shared_data->status_report.event_counter_master;
443 stream <<
"status report received for thread #" << thread_id
444 <<
", treating it ... " 445 <<
"number of reports still expected=" 446 << shared_data->status_report.num_waiting_reports
447 <<
" num_active_working_threads=" 448 << shared_data->schedule.num_active_working_threads
449 <<
" workers_reports.size()=" 450 << shared_data->status_report.full_report.workers_reports.size();
453 locker.critical_status_report_and_schedule([&]() {
459 thread_id >= (
int)shared_data->status_report.full_report.workers_reports.size()) {
462 "Tomographer::MultiProc::ThreadCommon::TaskDispatcherBase::TaskPrivateData::submitStatusReport(): " 463 "Internal inconsistency: thread_id=%d out of range [0,%d]\n",
464 thread_id, (
int)shared_data->status_report.full_report.workers_reports.size()
466 -- shared_data->status_report.num_waiting_reports ;
470 shared_data->status_report.full_report.workers_running[(
std::size_t)thread_id] =
true;
471 shared_data->status_report.full_report.workers_reports[(
std::size_t)thread_id] = report;
473 int num_waiting = -- shared_data->status_report.num_waiting_reports ;
475 if (num_waiting <= 0) {
480 shared_data->status_report.ready =
true;
488 locker.critical_schedule([&]() {
489 shared_data->schedule.interrupt_requested = 1;
490 shared_data->schedule.inner_exception.push_back(exc);
517 template<
typename ThreadPrivateDataType,
typename ThreadSharedDataType>
518 void run_worker_enter(ThreadPrivateDataType & private_data, ThreadSharedDataType & shared_data)
520 private_data.locker.critical_status_report_and_schedule([&]() {
521 ++ shared_data.schedule.num_active_working_threads;
522 if (shared_data.status_report.in_preparation) {
525 private_data.local_status_report_event_counter =
526 shared_data.status_report.event_counter_master - 1;
528 private_data.local_status_report_event_counter =
529 shared_data.status_report.event_counter_master;
536 template<
typename ThreadPrivateDataType,
typename ThreadSharedDataType>
537 void run_worker_exit(ThreadPrivateDataType & private_data, ThreadSharedDataType & shared_data)
539 private_data.locker.critical_status_report_and_schedule([&]() {
540 -- shared_data.schedule.num_active_working_threads;
541 if (shared_data.status_report.in_preparation) {
545 if (private_data.local_status_report_event_counter
546 != shared_data.status_report.event_counter_master) {
550 if (shared_data.status_report.num_waiting_reports == 1) {
552 shared_data.status_report.num_waiting_reports = 0;
553 shared_data.status_report.ready = true;
568 template<
typename ThreadPrivateDataType,
typename ThreadSharedDataType>
570 run_task(ThreadPrivateDataType & private_data, ThreadSharedDataType & shared_data)
572 auto & logger = private_data.llogger;
578 if (shared_data.schedule.interrupt_requested) {
583 stream <<
"Run #" << private_data.task_id <<
": querying CData for task input";
586 const auto input = shared_data.pcdata->getTaskInput(private_data.task_id);
589 stream <<
"Running task #" << private_data.task_id;
593 TaskType t(input, shared_data.pcdata, logger.parentLogger());
597 stream <<
"Task #" << private_data.task_id <<
" set up.";
601 t.run(shared_data.pcdata, logger.parentLogger(), &private_data);
604 stream <<
"Task #" << private_data.task_id <<
" finished, about to collect result.";
611 private_data.locker.critical_schedule([&shared_data]() {
615 ++ shared_data.schedule.num_completed;
619 stream <<
"Task #" << private_data.task_id <<
" done.";
624 logger.debug(
"Task interrupted.") ;
638 template<
typename ThreadPrivateDataType,
typename ThreadSharedDataType>
644 TOMOGRAPHER_SLEEP_FOR_MS( 100 ) ;
648 private_data.statusReportRequested();
652 private_data.llogger.debug(
"[master] tasks were interrupted, returning") ;
657 private_data.llogger.debug(
"[master] Exception caught inside task!") ;
659 private_data.llogger.debug(
"[master] Exception caught inside task -- handled.") ;
664 }
while (shared_data.schedule.num_active_working_threads > 0) ;
671 template<
typename ThreadSharedDataType,
typename LocalLoggerType>
672 void run_epilog(ThreadSharedDataType & shared_data, LocalLoggerType & llogger)
674 if (shared_data.schedule.inner_exception.size()) {
676 if (shared_data.schedule.inner_exception.size() > 1) {
677 llogger.warning(
"Multiple exceptions caught in tasks, only the first one is re-thrown");
683 if (shared_data.schedule.interrupt_requested) {
void run_worker_exit(ThreadPrivateDataType &private_data, ThreadSharedDataType &shared_data)
A worker exits the game.
void run_worker_enter(ThreadPrivateDataType &private_data, ThreadSharedDataType &shared_data)
New worker in the game.
TaskCountIntType_ TaskCountIntType
Integer type used to count the number of tasks to run (or running)
TaskType::ResultType TaskResultType
The task result type.
Base namespace for the Tomographer project.
TaskType_ TaskType
The task type.
FullStatusReport< TaskStatusReportType, TaskCountIntType > FullStatusReportType
The type to use to generate a full status report of all running tasks.
thread-local variables and stuff — also serves as TaskManagerIface
T duration_cast(T... args)
TaskType::StatusReportType TaskStatusReportType
The type used by a single task when providing a status report.
T current_exception(T... args)
StdClockType::time_point last_report_time
Only used by master thread to detect when to send periodic status reports.
volatile std::sig_atomic_t event_counter_user
Gets incremented when user requests status report.
unsigned int event_counter_master
Master thread increments this whenever other threads should provide status report – use unsigned to ...
#define TOMOGRAPHER_CXX_STACK_FORCE_REALIGN
#define TOMO_STATIC_ASSERT_EXPR(...)
Tool for static assertions without message.
void TOMOGRAPHER_CXX_STACK_FORCE_REALIGN run_task(ThreadPrivateDataType &private_data, ThreadSharedDataType &shared_data)
Run a given task.
LocalLogger< BaseLoggerType > makeLocalLogger(std::string origin_fn_name, BaseLoggerType &baselogger)
Create a local logger.
void run_epilog(ThreadSharedDataType &shared_data, LocalLoggerType &llogger)
To be called after all workers are done, to e.g. throw proper exception if an error occurred...
Some C++ utilities, with a tad of C++11 tricks.
void longdebug(const char *fmt,...)
Generate a log message with level Logger::LONGDEBUG (printf-like syntax)
Managing the need for specific overrides to operator new() for some types (especially Eigen types) ...
Some common definitions for multiprocessing interfaces.
TaskDispatcherBase()
Basic constructor.
void master_continue_monitoring_status(ThreadPrivateDataType &private_data, ThreadSharedDataType &shared_data)
To be called by master thread only to continue monitoring for status reports.
std::function< void(const FullStatusReportType &)> FullStatusReportCallbackType
The relevant type for a callback function (or callable) which is provided with the full status report...
T rethrow_exception(T... args)
Provide common functionality to thread-based MultiProc implementations.
Utilities for logging messages.
A complete status report, abstract version.