27 #ifndef MULTIPROCOMP_H 28 #define MULTIPROCOMP_H 35 inline constexpr
int omp_get_thread_num() {
return 0; }
36 inline constexpr
int omp_get_num_threads() {
return 1; }
59 namespace tomo_internal {
75 template<
typename BaseLogger,
bool baseLoggerIsThreadSafe>
76 struct ThreadSanitizerLoggerHelper
78 static inline void emitLog(BaseLogger & baselogger,
int level,
const char * origin,
const std::string & msg)
83 baselogger.emitLog(level, origin, msg);
86 static inline bool filterByOrigin(BaseLogger & baselogger,
int level,
const char * origin)
92 ok = baselogger.filterByOrigin(level, origin);
102 template<
typename BaseLogger>
103 struct ThreadSanitizerLoggerHelper<BaseLogger, true>
105 static inline void emitLog(BaseLogger & baselogger,
int level,
const char * origin,
const std::string & msg)
108 baselogger.emitLog(level, origin, msg);
110 static inline bool filterByOrigin(BaseLogger & baselogger,
int level,
const char * origin)
113 return baselogger.filterByOrigin(level, origin);
169 template<
typename BaseLogger>
172 BaseLogger & _baselogger;
183 template<
typename... MoreArgs>
206 tomo_internal::ThreadSanitizerLoggerHelper<BaseLogger,
209 _baselogger, level, origin, msg
214 template<
bool dummy = true>
218 return tomo_internal::ThreadSanitizerLoggerHelper<BaseLogger,
221 _baselogger, level, origin
236 template<
typename BaseLogger>
251 namespace MultiProc {
311 template<
typename TaskType_,
typename TaskCData_,
typename ResultsCollector_,
312 typename LoggerType_,
typename CountIntType_ = int,
313 typename TaskLoggerType_ = ThreadSanitizerLogger<LoggerType_> >
344 struct thread_shared_data {
345 thread_shared_data(
const TaskCData * pcdata_, ResultsCollector * results_, LoggerType & logger_,
346 CountIntType num_total_runs_, CountIntType n_chunk_)
350 status_report_underway(
false),
351 status_report_initialized(
false),
352 status_report_counter(0),
353 status_report_numreportsrecieved(0),
354 status_report_full(),
355 status_report_user_fn(),
356 num_total_runs(num_total_runs_), n_chunk(n_chunk_), num_completed(0),
357 num_active_working_threads(0)
360 const TaskCData * pcdata;
361 ResultsCollector * results;
364 bool status_report_underway;
365 bool status_report_initialized;
367 CountIntType status_report_numreportsrecieved;
369 FullStatusReportType status_report_full;
370 FullStatusReportCallbackType status_report_user_fn;
372 CountIntType num_total_runs;
373 CountIntType n_chunk;
374 CountIntType num_completed;
376 CountIntType num_active_working_threads;
379 struct thread_private_data
381 thread_shared_data * shared_data;
383 TaskLoggerType * logger;
386 CountIntType local_status_report_counter;
388 inline bool statusReportRequested()
391 return (
int)local_status_report_counter != (int)shared_data->status_report_counter;
394 inline void submitStatusReport(
const TaskStatusReportType &statreport)
396 if ((
int)local_status_report_counter == (
int)shared_data->status_report_counter) {
398 logger->warning(
"OMP TaskDispatcher/taskmanageriface",
"Task submitted unsollicited status report");
407 local_status_report_counter = shared_data->status_report_counter;
410 int threadnum = omp_get_thread_num();
414 shared_data->logger.longdebug(
"OMP TaskDispatcher/taskmanageriface", [&](
std::ostream & stream) {
415 stream <<
"status report received for thread #" << threadnum <<
", treating it ...";
422 if (!shared_data->status_report_initialized) {
427 if (shared_data->status_report_underway) {
431 shared_data->logger.warning(
"OMP TaskDispatcher/taskmanageriface",
432 "status report already underway!");
435 if (!shared_data->status_report_user_fn) {
439 shared_data->logger.warning(
"OMP TaskDispatcher/taskmanageriface",
440 "no user status report handler set!" 441 " call setStatusReportHandler() first.");
448 shared_data->status_report_underway =
true;
449 shared_data->status_report_initialized =
true;
452 shared_data->status_report_full = FullStatusReportType();
453 shared_data->status_report_full.num_completed = shared_data->num_completed;
454 shared_data->status_report_full.num_total_runs = shared_data->num_total_runs;
456 int num_threads = omp_get_num_threads();
461 shared_data->status_report_full.workers_running.clear();
462 shared_data->status_report_full.workers_reports.clear();
463 shared_data->status_report_full.workers_running.resize(num_threads,
false);
464 shared_data->status_report_full.workers_reports.resize(num_threads);
468 shared_data->logger.debug(
"OMP TaskDispatcher/taskmanageriface", [&](
std::ostream & stream) {
469 stream <<
"vectors resized to workers_running.size()=" 470 << shared_data->status_report_full.workers_running.size()
471 <<
" and workers_reports.size()=" 472 << shared_data->status_report_full.workers_reports.size()
476 shared_data->status_report_numreportsrecieved = 0;
490 shared_data->logger.debug(
"OMP TaskDispatcher/taskmanageriface",
"threadnum=%ld, workers_reports.size()=%ld",
491 (
long)threadnum, (
long)shared_data->status_report_full.workers_reports.size());
493 tomographer_assert(0 <= threadnum &&
494 (
std::size_t)threadnum < shared_data->status_report_full.workers_reports.size());
496 shared_data->status_report_full.workers_running[threadnum] =
true;
497 shared_data->status_report_full.workers_reports[threadnum] = statreport;
499 ++ shared_data->status_report_numreportsrecieved;
501 if (shared_data->status_report_numreportsrecieved == shared_data->num_active_working_threads) {
503 shared_data->status_report_user_fn(shared_data->status_report_full);
505 shared_data->status_report_numreportsrecieved = 0;
506 shared_data->status_report_underway =
false;
507 shared_data->status_report_initialized =
false;
508 shared_data->status_report_full.workers_running.clear();
509 shared_data->status_report_full.workers_reports.clear();
517 thread_shared_data shared_data;
541 TaskDispatcher(TaskCData * pcdata_, ResultsCollector * results_, LoggerType & logger_,
542 CountIntType num_total_runs_, CountIntType n_chunk_)
543 : shared_data(pcdata_, results_, logger_, num_total_runs_, n_chunk_)
554 shared_data.results->init(shared_data.num_total_runs, shared_data.n_chunk, shared_data.pcdata);
556 shared_data.logger.debug(
"MultiProc::OMP::TaskDispatcher::run()",
"preparing for parallel runs");
560 CountIntType num_total_runs = shared_data.num_total_runs;
561 CountIntType n_chunk = shared_data.n_chunk;
566 thread_shared_data *shdat = &shared_data;
567 thread_private_data privdat;
569 shared_data.logger.debug(
"MultiProc::OMP::TaskDispatcher::run()",
"About to start parallel section");
571 #pragma omp parallel default(none) private(k, privdat) shared(shdat, num_total_runs, n_chunk) 573 privdat.shared_data = shdat;
576 #pragma omp for schedule(dynamic,n_chunk) nowait 577 for (k = 0; k < num_total_runs; ++k) {
582 _run_task(privdat, shdat, k);
587 shared_data.results->runsFinished(num_total_runs, shared_data.pcdata);
591 void _run_task(thread_private_data & privdat, thread_shared_data * shdat, CountIntType k)
597 ++ shdat->num_active_working_threads;
598 privdat.local_status_report_counter = shdat->status_report_counter;
602 TaskLoggerType threadsafelogger(shdat->logger, shdat->pcdata, k);
605 threadsafelogger.longdebug(
"Tomographer::MultiProc::OMP::TaskDispatcher::_run_task()",
606 "Run #%lu: thread-safe logger set up", (
unsigned long)k);
610 privdat.logger = &threadsafelogger;
613 threadsafelogger.longdebug(
"Tomographer::MultiProc::OMP::TaskDispatcher::_run_task()",
614 "Run #%lu: querying CData for task input", (
unsigned long)k);
616 auto input = shdat->pcdata->getTaskInput(k);
619 threadsafelogger.debug(
"Tomographer::MultiProc::OMP::TaskDispatcher::_run_task()",
620 "Running task #%lu ...", (
unsigned long)k);
623 TaskType t(input, shdat->pcdata, threadsafelogger);
626 threadsafelogger.longdebug(
"Tomographer::MultiProc::OMP::TaskDispatcher::_run_task()",
627 "Task #%lu set up.", (
unsigned long)k);
630 t.run(shdat->pcdata, threadsafelogger, &privdat);
634 shdat->results->collectResult(k, t.getResult(), shdat->pcdata);
636 if ((
int)privdat.local_status_report_counter != (int)shdat->status_report_counter) {
639 ++ shdat->status_report_numreportsrecieved;
642 ++ shdat->num_completed;
643 -- shdat->num_active_working_threads;
663 shared_data.status_report_user_fn = fnstatus;
688 shared_data.status_report_counter = (shared_data.status_report_counter + 1) & 0x7f;
698 template<
typename TaskType_,
typename TaskCData_,
typename ResultsCollector_,
699 typename LoggerType_,
typename CountIntType_ =
int>
701 LoggerType_, CountIntType_>
703 CountIntType_ num_total_runs_, CountIntType_ n_chunk_)
707 LoggerType_, CountIntType_>(
708 pcdata_, results_, logger_, num_total_runs_, n_chunk_
TaskType_ TaskType
The task type.
ThreadSanitizerLogger(BaseLogger &logger, MoreArgs &&...)
Constructor.
LoggerType_ LoggerType
The logger type specified to the dispatcher (not necessarily thread-safe)
Base namespace for the Tomographer project.
void emitLog(int level, const char *origin, const std::string &msg)
Implementation of Logger::LoggerBase::emitLog()
TaskCData_ TaskCData
The type which stores constant, shared data for all tasks to access.
void setStatusReportHandler(FullStatusReportCallbackType fnstatus)
assign a callable to be called whenever a status report is requested
TaskLoggerType_ TaskLoggerType
A thread-safe logger type which is passed on to the child tasks.
CountIntType_ CountIntType
Integer type used to count the number of tasks to run (or running)
#define TOMOGRAPHER_CXX_STACK_FORCE_REALIGN
ResultsCollector_ ResultsCollector
The type which is responsible to collect the final results of the individual tasks.
TaskDispatcher< TaskType_, TaskCData_, ResultsCollector_, LoggerType_, CountIntType_ > makeTaskDispatcher(TaskCData_ *pcdata_, ResultsCollector_ *results_, LoggerType_ &logger_, CountIntType_ num_total_runs_, CountIntType_ n_chunk_)
Create an OMP task dispatcher. Useful if you want C++'s template argument deduction mechanism...
TaskDispatcher(TaskCData *pcdata_, ResultsCollector *results_, LoggerType &logger_, CountIntType num_total_runs_, CountIntType n_chunk_)
Task dispatcher constructor.
FullStatusReport< TaskStatusReportType > FullStatusReportType
The type to use to generate a full status report of all running tasks.
TaskType::StatusReportType TaskStatusReportType
The type used by a single task when providing a status report.
std::function< void(const FullStatusReportType &)> FullStatusReportCallbackType
The relevant type for a callback function (or callable) which is provided with the full status report...
Some C++ utilities, with a tad of C++11 tricks.
Managing the need for specific overrides to operator new() for some types (especially Eigen types) ...
void run() TOMOGRAPHER_CXX_STACK_FORCE_REALIGN
Run the specified tasks.
Some common definitions for multiprocessing interfaces.
Traits template struct to be specialized for specific Logger implementations.
Wrapper logger to call non-thread-safe loggers from a multithreaded environment.
Dispatches tasks to parallel threads using OpenMP.
std::enable_if< dummy &&Logger::LoggerTraits< BaseLogger >::HasFilterByOrigin, bool >::type filterByOrigin(int level, const char *origin) const
Implementation of Logger::LoggerBase::filterByOrigin()
Utilities for logging messages.
void requestStatusReport()
Request a status report.
A complete status report, abstract version.