27 #ifndef MULTIPROCOMP_H
28 #define MULTIPROCOMP_H
32 #ifdef TOMOGRAPHER_HAVE_OMP
35 inline constexpr
int omp_get_thread_num() {
return 0; }
36 inline constexpr
int omp_get_num_threads() {
return 1; }
59 namespace tomo_internal {
75 template<
typename BaseLogger,
bool baseLoggerIsThreadSafe>
76 struct ThreadSanitizerLoggerHelper
78 static inline void emit_log(BaseLogger & baselogger,
int level,
const char * origin,
const std::string & msg)
83 baselogger.emit_log(level, origin, msg);
86 static inline bool filter_by_origin(BaseLogger & baselogger,
int level,
const char * origin)
92 ok = baselogger.filter_by_origin(level, origin);
102 template<
typename BaseLogger>
103 struct ThreadSanitizerLoggerHelper<BaseLogger, true>
105 static inline void emit_log(BaseLogger & baselogger,
int level,
const char * origin,
const std::string & msg)
108 baselogger.emit_log(level, origin, msg);
110 static inline bool filter_by_origin(BaseLogger & baselogger,
int level,
const char * origin)
113 return baselogger.filter_by_origin(level, origin);
166 template<
typename BaseLogger>
169 BaseLogger & _baselogger;
172 template<
typename... MoreArgs>
186 ~ThreadSanitizerLogger()
190 inline void emit_log(
int level,
const char * origin,
const std::string& msg)
193 tomo_internal::ThreadSanitizerLoggerHelper<BaseLogger,
196 _baselogger, level, origin, msg
200 template<
bool dummy = true>
202 filter_by_origin(
int level,
const char * origin)
const
204 return tomo_internal::ThreadSanitizerLoggerHelper<BaseLogger,
207 _baselogger, level, origin
222 template<
typename BaseLogger>
235 namespace MultiProc {
240 template<
typename TaskStatusReportType>
327 template<
typename Task_,
typename ConstantDataType_,
typename ResultsCollector_,
328 typename Logger_,
typename CountIntType_ = int,
334 typedef typename Task::StatusReportType TaskStatusReportType;
335 typedef ConstantDataType_ ConstantDataType;
336 typedef ResultsCollector_ ResultsCollector;
337 typedef Logger_ Logger;
338 typedef CountIntType_ CountIntType;
339 typedef TaskLogger_ TaskLogger;
347 struct thread_shared_data {
348 thread_shared_data(
const ConstantDataType *pcdata_, ResultsCollector * results_, Logger & logger_,
349 CountIntType num_total_runs_, CountIntType n_chunk_)
353 status_report_underway(
false),
354 status_report_initialized(
false),
355 status_report_counter(0),
356 status_report_numreportsrecieved(0),
357 status_report_full(),
358 status_report_user_fn(),
359 num_total_runs(num_total_runs_), n_chunk(n_chunk_), num_completed(0),
360 num_active_working_threads(0)
363 const ConstantDataType * pcdata;
364 ResultsCollector * results;
367 bool status_report_underway;
368 bool status_report_initialized;
370 CountIntType status_report_numreportsrecieved;
373 FullStatusReportCallbackType status_report_user_fn;
375 CountIntType num_total_runs;
376 CountIntType n_chunk;
377 CountIntType num_completed;
379 CountIntType num_active_working_threads;
382 struct thread_private_data
384 thread_shared_data * shared_data;
389 CountIntType local_status_report_counter;
391 inline bool status_report_requested()
394 return (
int)local_status_report_counter != (int)shared_data->status_report_counter;
397 inline void submit_status_report(
const TaskStatusReportType &statreport)
399 if ((
int)local_status_report_counter == (
int)shared_data->status_report_counter) {
401 logger->warning(
"OMP TaskDispatcher/taskmanageriface",
"Task submitted unsollicited status report");
410 local_status_report_counter = shared_data->status_report_counter;
413 int threadnum = omp_get_thread_num();
419 if (!shared_data->status_report_initialized) {
424 if (shared_data->status_report_underway) {
426 logger->warning(
"OMP TaskDispatcher/taskmanageriface",
427 "status report already underway!");
430 if (!shared_data->status_report_user_fn) {
432 logger->warning(
"OMP TaskDispatcher/taskmanageriface",
433 "no user status report handler set!"
434 " call set_status_report_handler() first.");
441 shared_data->status_report_underway =
true;
442 shared_data->status_report_initialized =
true;
446 shared_data->status_report_full.num_completed = shared_data->num_completed;
447 shared_data->status_report_full.num_total_runs = shared_data->num_total_runs;
448 shared_data->status_report_full.num_active_working_threads = shared_data->num_active_working_threads;
449 int num_threads = omp_get_num_threads();
450 shared_data->status_report_full.num_threads = num_threads;
454 shared_data->status_report_full.tasks_running.clear();
455 shared_data->status_report_full.tasks_reports.clear();
456 shared_data->status_report_full.tasks_running.resize(num_threads,
false);
457 shared_data->status_report_full.tasks_reports.resize(num_threads);
458 logger->debug(
"OMP TaskDispatcher/taskmanageriface",
"vectors resized to %lu & %lu, resp.",
459 shared_data->status_report_full.tasks_running.size(),
460 shared_data->status_report_full.tasks_reports.size());
461 shared_data->status_report_numreportsrecieved = 0;
473 logger->debug(
"OMP TaskDispatcher/taskmanageriface",
"threadnum=%ld, tasks_reports.size()=%ld",
474 (
long)threadnum, (
long)shared_data->status_report_full.tasks_reports.size());
476 assert(0 <= threadnum && (
std::size_t)threadnum < shared_data->status_report_full.tasks_reports.size());
478 shared_data->status_report_full.tasks_running[threadnum] =
true;
479 shared_data->status_report_full.tasks_reports[threadnum] = statreport;
481 ++ shared_data->status_report_numreportsrecieved;
483 if (shared_data->status_report_numreportsrecieved == shared_data->num_active_working_threads) {
485 shared_data->status_report_user_fn(shared_data->status_report_full);
487 shared_data->status_report_numreportsrecieved = 0;
488 shared_data->status_report_underway =
false;
489 shared_data->status_report_initialized =
false;
490 shared_data->status_report_full.tasks_running.clear();
491 shared_data->status_report_full.tasks_reports.clear();
499 thread_shared_data shared_data;
502 TaskDispatcher(ConstantDataType * pcdata_, ResultsCollector * results_, Logger & logger_,
503 CountIntType num_total_runs_, CountIntType n_chunk_)
504 : shared_data(pcdata_, results_, logger_, num_total_runs_, n_chunk_)
510 shared_data.results->init(shared_data.num_total_runs, shared_data.n_chunk, shared_data.pcdata);
512 shared_data.logger.debug(
"run_omp_tasks()",
"About to start parallel section.");
516 CountIntType num_total_runs = shared_data.num_total_runs;
517 CountIntType n_chunk = shared_data.n_chunk;
522 thread_shared_data *shdat = &shared_data;
523 thread_private_data privdat;
525 #pragma omp parallel default(none) private(k, privdat) shared(shdat, num_total_runs, n_chunk)
527 privdat.shared_data = shdat;
530 #pragma omp for schedule(dynamic,n_chunk) nowait
531 for (k = 0; k < num_total_runs; ++k) {
535 ++ shdat->num_active_working_threads;
536 privdat.local_status_report_counter = shdat->status_report_counter;
540 TaskLogger threadsafelogger(shdat->logger, shdat->pcdata, k);
544 privdat.logger = &threadsafelogger;
546 threadsafelogger.debug(
"run_omp_tasks()",
"Running task #%lu ...", (
unsigned long)k);
549 Task t(Task::get_input(k, shdat->pcdata), shdat->pcdata, threadsafelogger);
552 t.run(shdat->pcdata, threadsafelogger, &privdat);
556 shdat->results->collect_result(k, t.getResult(), shdat->pcdata);
558 if ((
int)privdat.local_status_report_counter != (int)shdat->status_report_counter) {
561 ++ shdat->status_report_numreportsrecieved;
564 ++ shdat->num_completed;
565 -- shdat->num_active_working_threads;
570 shared_data.results->runs_finished(num_total_runs, shared_data.pcdata);
601 template<
typename Fn>
606 shared_data.status_report_user_fn = fnstatus;
610 inline void request_status_report()
621 shared_data.status_report_counter = (shared_data.status_report_counter + 1) & 0x7f;
631 template<
typename Task_,
typename ConstantDataType_,
typename ResultsCollector_,
632 typename Logger_,
typename CountIntType_ =
unsigned int>
633 inline TaskDispatcher<Task_, ConstantDataType_, ResultsCollector_,
634 Logger_, CountIntType_>
636 CountIntType_ num_total_runs_, CountIntType_ n_chunk_)
639 return TaskDispatcher<Task_, ConstantDataType_, ResultsCollector_,
640 Logger_, CountIntType_>(
641 pcdata_, results_, logger_, num_total_runs_, n_chunk_
int num_total_runs
Total number of tasks to perform.
Base namespace for the Tomographer project.
int num_completed
Number of completed tasks.
int level() const
Get the log level set for this logger.
TaskDispatcher< Task_, ConstantDataType_, ResultsCollector_, Logger_, CountIntType_ > makeTaskDispatcher(ConstantDataType_ *pcdata_, ResultsCollector_ *results_, Logger_ &logger_, CountIntType_ num_total_runs_, CountIntType_ n_chunk_)
Create an OMP task dispatcher. Useful if you want C++'s template parameter deduction mechanism...
std::vector< TaskStatusReportType > tasks_reports
List of length num_threads with the raw report submitted from each individual thread.
int num_threads
Number of spawned threads (some may be idle)
std::vector< bool > tasks_running
List of length num_threads, specifying for each spawned thread whether it is active or not...
Traits template struct to be specialized for specific Logger implementations.
A complete status report of currently running threads.
Wrapper logger to call non-thread-safe loggers from a multithreaded environment.
Dispatches tasks to parallel threads using OpenMP.
int num_active_working_threads
Number of currently active threads (which are actively solving a task)
void set_status_report_handler(Fn fnstatus)
assign a callable to be called whenever a status report is requested