28 #ifndef MULTIPROCTHREADS_H 29 #define MULTIPROCTHREADS_H 35 #include <boost/exception/diagnostic_information.hpp> 48 #ifdef TOMOGRAPHER_USE_MINGW_STD_THREAD 49 # include <mingw.thread.h> 50 # include <mingw.mutex.h> 66 namespace CxxThreads {
112 template<
typename BaseLogger>
119 BaseLogger & _baselogger;
148 TOMOGRAPHER_ENABLED_IF(IsBaseLoggerThreadSafe)
149 inline
void emitLog(
int level, const
char * origin, const
std::
string& msg)
151 _baselogger.
emitLog(level, origin, msg);
156 IsBaseLoggerThreadSafe)
157 bool filterByOrigin(
int level, const
char * origin)
const 159 return _baselogger.filterByOrigin(level, origin);
163 TOMOGRAPHER_ENABLED_IF(!IsBaseLoggerThreadSafe)
164 inline
void emitLog(
int level, const
char * origin, const
std::
string& msg)
167 _baselogger.emitLog(level, origin, msg);
172 !IsBaseLoggerThreadSafe)
173 bool filterByOrigin(
int level, const
char * origin)
const 176 return _baselogger.filterByOrigin(level, origin);
191 template<
typename BaseLogger>
192 struct TOMOGRAPHER_EXPORT
LoggerTraits<MultiProc::CxxThreads::ThreadSanitizerLogger<BaseLogger> >
207 namespace MultiProc {
208 namespace CxxThreads {
248 template<
typename TaskType_,
typename TaskCData_,
249 typename LoggerType_,
typename TaskCountIntType_ =
int>
289 typedef typename Base::template ThreadSharedData<TaskCData, LoggerType>
290 ThreadSharedDataType;
292 ThreadSharedDataType shared_data;
294 struct CriticalSectionManager {
305 template<
typename Fn>
306 inline void critical_status_report(Fn && fn) {
310 template<
typename Fn>
311 inline void critical_status_report_and_user_fn(Fn && fn) {
312 std::lock(status_report_mutex, user_mutex);
317 template<
typename Fn>
318 inline void critical_status_report_and_schedule(Fn && fn) {
319 std::lock(status_report_mutex, schedule_mutex);
324 template<
typename Fn>
325 inline void critical_schedule(Fn && fn) {
331 CriticalSectionManager critical;
334 ThreadSharedDataType,
336 CriticalSectionManager
338 ThreadPrivateDataType;
361 : shared_data(pcdata, logger, num_total_runs,
362 ((num_threads > 0) ? num_threads
368 : shared_data(
std::move(other.shared_data))
386 logger.debug(
"Let's go!");
388 shared_data.time_start = Base::StdClockType::now();
390 logger.debug(
"Preparing for parallel runs");
392 auto worker_fn_id = [&](
const int thread_id) noexcept(
true) {
395 TaskLoggerType threadsafelogger(shared_data.logger, & critical.user_mutex);
397 Tomographer::Logger::LocalLogger<TaskLoggerType> locallogger(
398 logger.originPrefix()+logger.glue()+
"worker",
401 ThreadPrivateDataType private_data(thread_id, & shared_data, locallogger,
405 stream <<
"Thread #" << thread_id
406 <<
": thread-safe logger and private thread data set up";
411 this->run_worker_enter(private_data, shared_data);
413 this->run_worker_exit(private_data, shared_data);
419 if (shared_data.schedule.interrupt_requested) {
424 critical.critical_schedule([&]() {
425 if (shared_data.schedule.num_launched ==
426 shared_data.schedule.num_total_runs) {
427 private_data.task_id = -1;
431 private_data.task_id = shared_data.schedule.num_launched;
432 ++ shared_data.schedule.num_launched ;
435 if ( private_data.task_id < 0 ) {
442 this->run_task(private_data, shared_data) ;
451 if (thread_id == 0 && !shared_data.schedule.interrupt_requested) {
453 this->master_continue_monitoring_status(private_data, shared_data) ;
463 logger.debug(
"About to launch threads");
468 for (
int thread_id = 1; thread_id < shared_data.schedule.num_threads;
472 worker_fn_id(thread_id);
482 logger.debug(
"Threads finished");
484 this->run_epilog(shared_data, logger);
486 logger.debug(
"All done.");
496 return shared_data.schedule.num_total_runs;
503 return shared_data.results;
528 shared_data.status_report.user_fn = fnstatus;
552 ++ shared_data.status_report.event_counter_user;
565 shared_data.status_report.periodic_interval = milliseconds;
584 shared_data.schedule.interrupt_requested = 1;
591 template<
typename TaskType_,
typename TaskCData_,
592 typename LoggerType_,
typename TaskCountIntType_ =
int>
595 mkTaskDispatcher(TaskCData_ * pcdata_,
596 LoggerType_ & logger_,
597 TaskCountIntType_ num_total_runs_,
601 pcdata_, logger_, num_total_runs_, num_threads_
ThreadSanitizerLogger< LoggerType_ > TaskLoggerType
A thread-safe logger type which is passed on to the child tasks.
const TaskResultType & collectedTaskResult(std::size_t k) const
Get the result of a specific given task.
Local logger: avoid having to repeat origin at each emitted message.
TaskCountIntType_ TaskCountIntType
Integer type used to count the number of tasks to run (or running)
void run()
Run the specified tasks.
TaskType::ResultType TaskResultType
The task result type.
Base namespace for the Tomographer project.
TaskType_ TaskType
The task type.
thread-local variables and stuff — also serves as TaskManagerIface
void emitLog(int level, const char *origin, const std::string &msg)
Implementation of Logger::LoggerBase::emitLog() for a base logger which is thread-safe.
Tomographer::MultiProc::ThreadCommon::TaskDispatcherBase< TaskType_, TaskCountIntType_ > Base
Base class, provides common functionality to all thread-based MutliProc implementations.
void setStatusReportHandler(FullStatusReportCallbackType fnstatus)
assign a callable to be called whenever a status report is requested
TaskType::StatusReportType TaskStatusReportType
The type used by a single task when providing a status report.
T hardware_concurrency(T... args)
#define TOMO_ORIGIN
Use this as argument for a Tomographer::Logger::LocalLogger constructor .
void requestPeriodicStatusReport(int milliseconds)
Request a periodic status report.
TaskDispatcher(TaskCData *pcdata, LoggerType &logger, TaskCountIntType num_total_runs, int num_threads=0)
Task dispatcher constructor.
void requestStatusReport()
Request a one-time status report.
LoggerType_ LoggerType
The logger type specified to the dispatcher (not necessarily thread-safe)
Basic multiprocessing templates for thread-based Tomographer::MultiProc implementations.
LocalLogger< BaseLoggerType > makeLocalLogger(std::string origin_fn_name, BaseLoggerType &baselogger)
Create a local logger.
Some C++ utilities, with a tad of C++11 tricks.
Managing the need for specific overrides to operator new() for some types (especially Eigen types) ...
Some common definitions for multiprocessing interfaces.
ThreadSanitizerLogger(BaseLogger &logger, std::mutex *mutex)
Constructor.
Traits template struct to be specialized for specific Logger implementations.
TaskCData_ TaskCData
The type which stores constant, shared data for all tasks to access.
Dispatches tasks to parallel threads using C++11 native threads.
TaskCountIntType numTaskRuns() const
Total number of task run instances.
void requestInterrupt()
Request an immediate interruption of the tasks.
Provide common functionality to thread-based MultiProc implementations.
const std::vector< TaskResultType * > & collectedTaskResults() const
Get all the task results.
Utilities for logging messages.
A complete status report, abstract version.
Wrapper logger to call non-thread-safe loggers from a multithreaded environment.