28 #ifndef MULTIPROCOMP_H 29 #define MULTIPROCOMP_H 38 inline constexpr
int omp_get_thread_num() {
return 0; }
39 inline constexpr
int omp_get_num_threads() {
return 1; }
42 #include <boost/exception/diagnostic_information.hpp> 116 template<
typename BaseLogger>
123 BaseLogger & _baselogger;
135 template<
typename... MoreArgs>
150 TOMOGRAPHER_ENABLED_IF(IsBaseLoggerThreadSafe)
151 inline
void emitLog(
int level, const
char * origin, const
std::
string& msg)
153 _baselogger.
emitLog(level, origin, msg);
158 IsBaseLoggerThreadSafe)
159 bool filterByOrigin(
int level, const
char * origin)
const 161 return _baselogger.filterByOrigin(level, origin);
165 TOMOGRAPHER_ENABLED_IF(!IsBaseLoggerThreadSafe)
166 inline
void emitLog(
int level, const
char * origin, const
std::
string& msg)
170 _baselogger.emitLog(level, origin, msg);
176 !IsBaseLoggerThreadSafe)
177 bool filterByOrigin(
int level, const
char * origin)
const 182 ok = _baselogger.filterByOrigin(level, origin);
200 template<
typename BaseLogger>
201 struct TOMOGRAPHER_EXPORT
LoggerTraits<MultiProc::OMP::ThreadSanitizerLogger<BaseLogger> >
216 namespace MultiProc {
277 template<
typename TaskType_,
typename TaskCData_,
typename LoggerType_,
278 typename TaskCountIntType_ = int,
279 typename TaskLoggerType_ = ThreadSanitizerLogger<LoggerType_> >
319 typedef typename Base::template ThreadSharedData<TaskCData, LoggerType>
320 ThreadSharedDataType;
322 ThreadSharedDataType shared_data;
325 struct CriticalSectionManager {
327 template<
typename Fn>
328 inline void critical_status_report(Fn && fn) {
334 template<
typename Fn>
335 inline void critical_status_report_and_user_fn(Fn && fn) {
341 template<
typename Fn>
342 inline void critical_status_report_and_schedule(Fn && fn) {
348 template<
typename Fn>
349 inline void critical_schedule(Fn && fn) {
357 CriticalSectionManager critical;
360 ThreadSharedDataType,
362 CriticalSectionManager
364 ThreadPrivateDataType;
390 : shared_data(pcdata_, logger_, num_total_runs_, 0), n_chunk(n_chunk_)
412 logger.debug(
"Let's go!");
414 shared_data.time_start = Base::StdClockType::now();
416 logger.debug(
"Preparing for parallel runs");
419 logger.warning(
"OpenMP is disabled; tasks will run serially.");
430 ThreadSharedDataType * shdat = & shared_data;
432 const std::string logger_prefix = logger.originPrefix()+logger.glue()+
"worker";
433 const std::string * logger_prefix_ptr = &logger_prefix;
435 logger.debug(
"About to start parallel section");
437 #pragma omp parallel default(none) private(k) shared(shdat, logger_prefix_ptr, num_total_runs, n_chunk_) 440 TaskLoggerType threadsafelogger(shared_data.logger, shdat->pcdata, k);
442 Tomographer::Logger::LocalLogger<TaskLoggerType> locallogger(*logger_prefix_ptr, threadsafelogger);
444 ThreadPrivateDataType private_data(omp_get_thread_num(), & shared_data, locallogger, critical);
446 private_data.shared_data = shdat;
447 private_data.task_id = -1;
452 shdat->schedule.num_threads = omp_get_num_threads();
463 this->run_worker_enter(private_data, *shdat);
468 #pragma omp for schedule(dynamic,n_chunk) nowait 469 for (k = 0; k < num_total_runs; ++k) {
471 private_data.task_id = k;
473 this->run_task(private_data, shared_data);
480 this->run_worker_exit(private_data, *shdat);
484 this->master_continue_monitoring_status(private_data, *shdat) ;
489 logger.debug(
"OpenMP parallel section finished");
491 this->run_epilog(shared_data, logger) ;
493 logger.debug(
"Done.");
500 return shared_data.schedule.num_total_runs;
507 return shared_data.results;
514 return *shared_data.results[k];
535 shared_data.status_report.user_fn = fnstatus;
561 ++ shared_data.status_report.event_counter_user;
576 shared_data.status_report.periodic_interval = milliseconds;
595 shared_data.schedule.interrupt_requested = 1;
605 template<
typename TaskType_,
typename TaskCData_,
606 typename LoggerType_,
typename TaskCountIntType_ =
int>
608 LoggerType_, TaskCountIntType_>
610 TaskCountIntType_ num_total_runs_, TaskCountIntType_ n_chunk_ = 1)
614 LoggerType_, TaskCountIntType_>(
615 pcdata_, logger_, num_total_runs_, n_chunk_
void requestPeriodicStatusReport(int milliseconds)
Request a periodic status report.
Local logger: avoid having to repeat origin at each emitted message.
void requestStatusReport()
Request a status report.
TaskCountIntType_ TaskCountIntType
Integer type used to count the number of tasks to run (or running)
ThreadSanitizerLogger(BaseLogger &logger, MoreArgs &&...)
Constructor.
TaskType::ResultType TaskResultType
The task result type.
Base namespace for the Tomographer project.
const TaskResultType & collectedTaskResult(std::size_t k) const
Get the result of a specific given task.
TaskType_ TaskType
The task type.
void run()
Run the specified tasks.
LoggerType_ LoggerType
The logger type specified to the dispatcher (not necessarily thread-safe)
thread-local variables and stuff — also serves as TaskManagerIface
Tomographer::MultiProc::ThreadCommon::TaskDispatcherBase< TaskType_, TaskCountIntType_ > Base
Base class, provides common functionality to all thread-based MutliProc implementations.
TaskType::StatusReportType TaskStatusReportType
The type used by a single task when providing a status report.
#define TOMO_ORIGIN
Use this as argument for a Tomographer::Logger::LocalLogger constructor .
const std::vector< TaskResultType * > & collectedTaskResults() const
Get all the task results.
TaskCountIntType_ TaskCountIntType
Integer type used to count the number of tasks to run (or running)
void setStatusReportHandler(FullStatusReportCallbackType fnstatus)
assign a callable to be called whenever a status report is requested
Basic multiprocessing templates for thread-based Tomographer::MultiProc implementations.
LocalLogger< BaseLoggerType > makeLocalLogger(std::string origin_fn_name, BaseLoggerType &baselogger)
Create a local logger.
TaskDispatcher< TaskType_, TaskCData_, LoggerType_, TaskCountIntType_ > makeTaskDispatcher(TaskCData_ *pcdata_, LoggerType_ &logger_, TaskCountIntType_ num_total_runs_, TaskCountIntType_ n_chunk_=1)
Create an OMP task dispatcher. Useful if you want C++'s template argument deduction mechanism...
TaskLoggerType_ TaskLoggerType
A thread-safe logger type which is passed on to the child tasks.
Some C++ utilities, with a tad of C++11 tricks.
Managing the need for specific overrides to operator new() for some types (especially Eigen types) ...
Some common definitions for multiprocessing interfaces.
Traits template struct to be specialized for specific Logger implementations.
Wrapper logger to call non-thread-safe loggers from a multithreaded environment.
Dispatches tasks to parallel threads using OpenMP.
TaskCountIntType numTaskRuns() const
Total number of task run instances.
void emitLog(int level, const char *origin, const std::string &msg)
Implementation of Logger::LoggerBase::emitLog() for a base logger which is thread-safe.
TaskCData_ TaskCData
The type which stores constant, shared data for all tasks to access.
void requestInterrupt()
Request an immediate interruption of the tasks.
Provide common functionality to thread-based MultiProc implementations.
TaskDispatcher(TaskCData *pcdata_, LoggerType &logger_, TaskCountIntType num_total_runs_, TaskCountIntType n_chunk_=1)
Task dispatcher constructor.
Utilities for logging messages.
A complete status report, abstract version.