28 #ifndef MULTIPROCTHREADS_H 29 #define MULTIPROCTHREADS_H 34 #include <boost/exception/diagnostic_information.hpp> 47 #ifdef TOMOGRAPHER_USE_MINGW_STD_THREAD 48 # include <mingw.thread.h> 49 # include <mingw.mutex.h> 65 namespace CxxThreads {
111 template<
typename BaseLogger>
118 BaseLogger & _baselogger;
147 TOMOGRAPHER_ENABLED_IF(IsBaseLoggerThreadSafe)
148 inline
void emitLog(
int level, const
char * origin, const
std::
string& msg)
150 _baselogger.
emitLog(level, origin, msg);
155 IsBaseLoggerThreadSafe)
156 bool filterByOrigin(
int level, const
char * origin)
const 158 return _baselogger.filterByOrigin(level, origin);
162 TOMOGRAPHER_ENABLED_IF(!IsBaseLoggerThreadSafe)
163 inline
void emitLog(
int level, const
char * origin, const
std::
string& msg)
166 _baselogger.emitLog(level, origin, msg);
171 !IsBaseLoggerThreadSafe)
172 bool filterByOrigin(
int level, const
char * origin)
const 175 return _baselogger.filterByOrigin(level, origin);
190 template<
typename BaseLogger>
191 struct TOMOGRAPHER_EXPORT
LoggerTraits<MultiProc::CxxThreads::ThreadSanitizerLogger<BaseLogger> >
206 namespace MultiProc {
207 namespace CxxThreads {
247 template<
typename TaskType_,
typename TaskCData_,
248 typename LoggerType_,
typename TaskCountIntType_ =
int>
288 typedef typename Base::template ThreadSharedData<TaskCData, LoggerType>
289 ThreadSharedDataType;
291 ThreadSharedDataType shared_data;
293 struct CriticalSectionManager {
304 template<
typename Fn>
305 inline void critical_status_report(Fn && fn) {
309 template<
typename Fn>
310 inline void critical_status_report_and_user_fn(Fn && fn) {
311 std::lock(status_report_mutex, user_mutex);
316 template<
typename Fn>
317 inline void critical_status_report_and_schedule(Fn && fn) {
318 std::lock(status_report_mutex, schedule_mutex);
323 template<
typename Fn>
324 inline void critical_schedule(Fn && fn) {
330 CriticalSectionManager critical;
333 ThreadSharedDataType,
335 CriticalSectionManager
337 ThreadPrivateDataType;
359 : shared_data(pcdata, logger, num_total_runs, num_threads)
364 : shared_data(
std::move(other.shared_data))
382 logger.debug(
"Let's go!");
384 shared_data.time_start = Base::StdClockType::now();
386 logger.debug(
"Preparing for parallel runs");
388 auto worker_fn_id = [&](
const int thread_id) noexcept(
true) {
391 TaskLoggerType threadsafelogger(shared_data.logger, & critical.user_mutex);
393 Tomographer::Logger::LocalLogger<TaskLoggerType> locallogger(
394 logger.originPrefix()+logger.glue()+
"worker",
397 ThreadPrivateDataType private_data(thread_id, & shared_data, locallogger,
401 stream <<
"Thread #" << thread_id
402 <<
": thread-safe logger and private thread data set up";
407 this->run_worker_enter(private_data, shared_data);
409 this->run_worker_exit(private_data, shared_data);
415 if (shared_data.schedule.interrupt_requested) {
420 critical.critical_schedule([&]() {
421 if (shared_data.schedule.num_launched ==
422 shared_data.schedule.num_total_runs) {
423 private_data.task_id = -1;
427 private_data.task_id = shared_data.schedule.num_launched;
428 ++ shared_data.schedule.num_launched ;
431 if ( private_data.task_id < 0 ) {
438 this->run_task(private_data, shared_data) ;
447 if (thread_id == 0 && !shared_data.schedule.interrupt_requested) {
449 this->master_continue_monitoring_status(private_data, shared_data) ;
459 logger.debug(
"About to launch threads");
464 for (
int thread_id = 1; thread_id < shared_data.schedule.num_threads;
468 worker_fn_id(thread_id);
478 logger.debug(
"Threads finished");
480 this->run_epilog(shared_data, logger);
482 logger.debug(
"All done.");
492 return shared_data.schedule.num_total_runs;
499 return shared_data.results;
524 shared_data.status_report.user_fn = fnstatus;
548 ++ shared_data.status_report.event_counter_user;
561 shared_data.status_report.periodic_interval = milliseconds;
580 shared_data.schedule.interrupt_requested = 1;
587 template<
typename TaskType_,
typename TaskCData_,
588 typename LoggerType_,
typename TaskCountIntType_ =
int>
591 mkTaskDispatcher(TaskCData_ * pcdata_,
592 LoggerType_ & logger_,
593 TaskCountIntType_ num_total_runs_,
597 pcdata_, logger_, num_total_runs_, num_threads_
ThreadSanitizerLogger< LoggerType_ > TaskLoggerType
A thread-safe logger type which is passed on to the child tasks.
const TaskResultType & collectedTaskResult(std::size_t k) const
Get the result of a specific given task.
Local logger: avoid having to repeat origin at each emitted message.
TaskCountIntType_ TaskCountIntType
Integer type used to count the number of tasks to run (or running)
void run()
Run the specified tasks.
TaskType::ResultType TaskResultType
The task result type.
Base namespace for the Tomographer project.
TaskType_ TaskType
The task type.
thread-local variables and stuff — also serves as TaskManagerIface
void emitLog(int level, const char *origin, const std::string &msg)
Implementation of Logger::LoggerBase::emitLog() for a base logger which is thread-safe.
Tomographer::MultiProc::ThreadCommon::TaskDispatcherBase< TaskType_, TaskCountIntType_ > Base
Base class, provides common functionality to all thread-based MutliProc implementations.
void setStatusReportHandler(FullStatusReportCallbackType fnstatus)
assign a callable to be called whenever a status report is requested
TaskType::StatusReportType TaskStatusReportType
The type used by a single task when providing a status report.
T hardware_concurrency(T... args)
TaskDispatcher(TaskCData *pcdata, LoggerType &logger, TaskCountIntType num_total_runs, int num_threads=(int) std::thread::hardware_concurrency())
Task dispatcher constructor.
#define TOMO_ORIGIN
Use this as argument for a Tomographer::Logger::LocalLogger constructor .
void requestPeriodicStatusReport(int milliseconds)
Request a periodic status report.
void requestStatusReport()
Request a one-time status report.
LoggerType_ LoggerType
The logger type specified to the dispatcher (not necessarily thread-safe)
Basic multiprocessing templates for thread-based Tomographer::MultiProc implementations.
LocalLogger< BaseLoggerType > makeLocalLogger(std::string origin_fn_name, BaseLoggerType &baselogger)
Create a local logger.
Some C++ utilities, with a tad of C++11 tricks.
Managing the need for specific overrides to operator new() for some types (especially Eigen types) ...
Some common definitions for multiprocessing interfaces.
ThreadSanitizerLogger(BaseLogger &logger, std::mutex *mutex)
Constructor.
Traits template struct to be specialized for specific Logger implementations.
TaskCData_ TaskCData
The type which stores constant, shared data for all tasks to access.
Dispatches tasks to parallel threads using C++11 native threads.
TaskCountIntType numTaskRuns() const
Total number of task run instances.
void requestInterrupt()
Request an immediate interruption of the tasks.
Provide common functionality to thread-based MultiProc implementations.
const std::vector< TaskResultType * > & collectedTaskResults() const
Get all the task results.
Utilities for logging messages.
A complete status report, abstract version.
Wrapper logger to call non-thread-safe loggers from a multithreaded environment.