Tomographer  v2.0
Tomographer C++ Framework Documentation
multiprocomp.h
Go to the documentation of this file.
1 /* This file is part of the Tomographer project, which is distributed under the
2  * terms of the MIT license.
3  *
4  * The MIT License (MIT)
5  *
6  * Copyright (c) 2015 ETH Zurich, Institute for Theoretical Physics, Philippe Faist
7  *
8  * Permission is hereby granted, free of charge, to any person obtaining a copy
9  * of this software and associated documentation files (the "Software"), to deal
10  * in the Software without restriction, including without limitation the rights
11  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
12  * copies of the Software, and to permit persons to whom the Software is
13  * furnished to do so, subject to the following conditions:
14  *
15  * The above copyright notice and this permission notice shall be included in
16  * all copies or substantial portions of the Software.
17  *
18  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
19  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
20  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
21  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
22  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
23  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24  * SOFTWARE.
25  */
26 
27 #ifndef MULTIPROCOMP_H
28 #define MULTIPROCOMP_H
29 
30 #include <csignal>
31 
32 #ifdef _OPENMP
33 #include <omp.h>
34 #else
35 inline constexpr int omp_get_thread_num() { return 0; }
36 inline constexpr int omp_get_num_threads() { return 1; }
37 #endif
38 
39 
41 #include <tomographer2/tools/cxxutil.h> // tomographer_assert()
43 #include <tomographer2/multiproc.h>
44 
45 
55 namespace Tomographer {
56 namespace MultiProc {
57 namespace OMP {
58 
59 namespace tomo_internal {
60 
75 template<typename BaseLogger, bool baseLoggerIsThreadSafe>
76 struct ThreadSanitizerLoggerHelper
77 {
78  static inline void emitLog(BaseLogger & baselogger, int level, const char * origin, const std::string & msg)
79  {
80 #pragma omp critical
81  {
82  //printf("ThreadSanitizerLoggerHelper::emitLog(%d, %s, %s) -- OMP CRITICAL\n", level, origin, msg.c_str());
83  baselogger.emitLog(level, origin, msg);
84  }
85  }
86  static inline bool filterByOrigin(BaseLogger & baselogger, int level, const char * origin)
87  {
88  bool ok = true;
89 #pragma omp critical
90  {
91  //printf("ThreadSanitizerLoggerHelper::filterByOrigin(%d, %s) -- OMP CRITICAL\n", level, origin);
92  ok = baselogger.filterByOrigin(level, origin);
93  }
94  return ok;
95  }
96 };
97 
98 //
99 // specialize the helper for when logging to a thread-safe base logger. No critical
100 // section needed because the logger is already thread-safe.
101 //
102 template<typename BaseLogger>
103 struct ThreadSanitizerLoggerHelper<BaseLogger, true>
104 {
105  static inline void emitLog(BaseLogger & baselogger, int level, const char * origin, const std::string & msg)
106  {
107  //printf("ThreadSanitizerLoggerHelper::emitLog(%d, %s, %s) -- NORMAL\n", level, origin, msg.c_str());
108  baselogger.emitLog(level, origin, msg);
109  }
110  static inline bool filterByOrigin(BaseLogger & baselogger, int level, const char * origin)
111  {
112  //printf("ThreadSanitizerLoggerHelper::filterByOrigin(%d, %s) -- NORMAL\n", level, origin);
113  return baselogger.filterByOrigin(level, origin);
114  }
115 };
116 
117 } // namespace tomo_internal
118 
119 
169 template<typename BaseLogger>
170 class ThreadSanitizerLogger : public Logger::LoggerBase<ThreadSanitizerLogger<BaseLogger> >
171 {
172  BaseLogger & _baselogger;
173 public:
174 
183  template<typename... MoreArgs>
184  ThreadSanitizerLogger(BaseLogger & logger, MoreArgs&&...)
185  // NOTE: pass the baselogger's level on here. The ThreadSanitizerLogger's level is
186  // this one, and is fixed and cannot be changed while running.
187  : Logger::LoggerBase<ThreadSanitizerLogger<BaseLogger> >(logger.level()),
188  _baselogger(logger)
189  {
190  // when you have to debug the log mechanism.... lol
191  //printf("ThreadSanitizerLogger(): object created\n");
192  //_baselogger.debug("ThreadSanitizerLogger()", "log from constructor.");
193  //emitLog(Logger::DEBUG, "ThreadSanitizerLogger!", "emitLog from constructor");
194  //LoggerBase<ThreadSanitizerLogger<BaseLogger> >::debug("ThreadSanitizerLogger", "debug from constructor");
195  }
196 
198  {
199  }
200 
201 
203  inline void emitLog(int level, const char * origin, const std::string& msg)
204  {
205  //printf("ThreadSanitizerLogger::emitLog(%d, %s, %s)\n", level, origin, msg.c_str());
206  tomo_internal::ThreadSanitizerLoggerHelper<BaseLogger,
208  ::emitLog(
209  _baselogger, level, origin, msg
210  );
211  }
212 
214  template<bool dummy = true>
216  filterByOrigin(int level, const char * origin) const
217  {
218  return tomo_internal::ThreadSanitizerLoggerHelper<BaseLogger,
220  ::filterByOrigin(
221  _baselogger, level, origin
222  );
223  }
224 };
225 
226 } // namespace OMP
227 } // namespace MultiProc
228 
229 namespace Logger {
236 template<typename BaseLogger>
237 struct LoggerTraits<MultiProc::OMP::ThreadSanitizerLogger<BaseLogger> > : public LoggerTraits<BaseLogger>
238 {
240  enum {
243  HasOwnGetLevel = 0,
245  IsThreadSafe = 1
246  };
247 };
248 } // namespace Logger
249 
250 
251 namespace MultiProc {
252 namespace OMP {
253 
254 
311 template<typename TaskType_, typename TaskCData_, typename ResultsCollector_,
312  typename LoggerType_, typename CountIntType_ = int,
313  typename TaskLoggerType_ = ThreadSanitizerLogger<LoggerType_> >
315 {
316 public:
318  typedef TaskType_ TaskType;
320  typedef typename TaskType::StatusReportType TaskStatusReportType;
322  typedef TaskCData_ TaskCData;
324  typedef ResultsCollector_ ResultsCollector;
326  typedef LoggerType_ LoggerType;
328  typedef CountIntType_ CountIntType;
330  typedef TaskLoggerType_ TaskLoggerType;
333 
340 
341 private:
342 
344  struct thread_shared_data {
345  thread_shared_data(const TaskCData * pcdata_, ResultsCollector * results_, LoggerType & logger_,
346  CountIntType num_total_runs_, CountIntType n_chunk_)
347  : pcdata(pcdata_),
348  results(results_),
349  logger(logger_),
350  status_report_underway(false),
351  status_report_initialized(false),
352  status_report_counter(0),
353  status_report_numreportsrecieved(0),
354  status_report_full(),
355  status_report_user_fn(),
356  num_total_runs(num_total_runs_), n_chunk(n_chunk_), num_completed(0),
357  num_active_working_threads(0)
358  { }
359 
360  const TaskCData * pcdata;
361  ResultsCollector * results;
362  LoggerType & logger;
363 
364  bool status_report_underway;
365  bool status_report_initialized;
366  volatile std::sig_atomic_t status_report_counter;
367  CountIntType status_report_numreportsrecieved;
368 
369  FullStatusReportType status_report_full;
370  FullStatusReportCallbackType status_report_user_fn;
371 
372  CountIntType num_total_runs;
373  CountIntType n_chunk;
374  CountIntType num_completed;
375 
376  CountIntType num_active_working_threads;
377  };
379  struct thread_private_data
380  {
381  thread_shared_data * shared_data;
382 
383  TaskLoggerType * logger;
384 
385  CountIntType kiter;
386  CountIntType local_status_report_counter;
387 
388  inline bool statusReportRequested()
389  {
390  //fprintf(stderr, "statusReportRequested(), shared_data=%p\n", shared_data);
391  return (int)local_status_report_counter != (int)shared_data->status_report_counter;
392  }
393 
394  inline void submitStatusReport(const TaskStatusReportType &statreport)
395  {
396  if ((int)local_status_report_counter == (int)shared_data->status_report_counter) {
397  // error: task submitted unsollicited report
398  logger->warning("OMP TaskDispatcher/taskmanageriface", "Task submitted unsollicited status report");
399  return;
400  }
401 
402 #pragma omp critical
403  {
404  bool ok = true; // whether to proceed or not
405 
406  // we've reacted to the given "signal"
407  local_status_report_counter = shared_data->status_report_counter;
408 
409  // add our status report to being-prepared status report in the shared data
410  int threadnum = omp_get_thread_num();
411 
412  // Important: access the original logger, not the thread-safe-wrapped logger!!
413  // Otherwise this could lead to deadlocks because of nested critical blocks.
414  shared_data->logger.longdebug("OMP TaskDispatcher/taskmanageriface", [&](std::ostream & stream) {
415  stream << "status report received for thread #" << threadnum << ", treating it ...";
416  });
417 
418  //
419  // If we're the first reporting thread, we need to initiate the status reporing
420  // procedure and initialize the general data
421  //
422  if (!shared_data->status_report_initialized) {
423 
424  //
425  // Check that we indeed have to submit a status report.
426  //
427  if (shared_data->status_report_underway) {
428  // status report already underway!
429  // Important: access the original logger, not the thread-safe-wrapped logger!!
430  // Otherwise this could lead to deadlocks because of nested critical blocks.
431  shared_data->logger.warning("OMP TaskDispatcher/taskmanageriface",
432  "status report already underway!");
433  ok = false;
434  }
435  if (!shared_data->status_report_user_fn) {
436  // no user handler set
437  // Important: access the original logger, not the thread-safe-wrapped logger!!
438  // Otherwise this could lead to deadlocks because of nested critical blocks.
439  shared_data->logger.warning("OMP TaskDispatcher/taskmanageriface",
440  "no user status report handler set!"
441  " call setStatusReportHandler() first.");
442  ok = false;
443  }
444 
445  // since we can't return out of a critical section(?), we use an if block.
446  if (ok) {
447 
448  shared_data->status_report_underway = true;
449  shared_data->status_report_initialized = true;
450 
451  // initialize status report object & overall data
452  shared_data->status_report_full = FullStatusReportType();
453  shared_data->status_report_full.num_completed = shared_data->num_completed;
454  shared_data->status_report_full.num_total_runs = shared_data->num_total_runs;
455  // shared_data->status_report_full.num_active_working_threads = shared_data->num_active_working_threads;
456  int num_threads = omp_get_num_threads();
457  // shared_data->status_report_full.num_threads = num_threads;
458 
459  // initialize task-specific reports
460  // fill our lists with default-constructed values & set all running to false.
461  shared_data->status_report_full.workers_running.clear();
462  shared_data->status_report_full.workers_reports.clear();
463  shared_data->status_report_full.workers_running.resize(num_threads, false);
464  shared_data->status_report_full.workers_reports.resize(num_threads);
465 
466  // Important: access the original logger, not the thread-safe-wrapped logger!!
467  // Otherwise this could lead to deadlocks because of nested critical blocks.
468  shared_data->logger.debug("OMP TaskDispatcher/taskmanageriface", [&](std::ostream & stream) {
469  stream << "vectors resized to workers_running.size()="
470  << shared_data->status_report_full.workers_running.size()
471  << " and workers_reports.size()="
472  << shared_data->status_report_full.workers_reports.size()
473  << ".";
474  });
475 
476  shared_data->status_report_numreportsrecieved = 0;
477  }
478 
479  } // status_report_initialized
480 
481  // if we're the first reporting thread, then maybe ok was set to false above, so
482  // check again.
483  if (ok) {
484 
485  //
486  // Report the data corresponding to this thread.
487  //
488  // Important: access the original logger, not the thread-safe-wrapped logger!!
489  // Otherwise this could lead to deadlocks because of nested critical blocks.
490  shared_data->logger.debug("OMP TaskDispatcher/taskmanageriface", "threadnum=%ld, workers_reports.size()=%ld",
491  (long)threadnum, (long)shared_data->status_report_full.workers_reports.size());
492 
493  tomographer_assert(0 <= threadnum &&
494  (std::size_t)threadnum < shared_data->status_report_full.workers_reports.size());
495 
496  shared_data->status_report_full.workers_running[threadnum] = true;
497  shared_data->status_report_full.workers_reports[threadnum] = statreport;
498 
499  ++ shared_data->status_report_numreportsrecieved;
500 
501  if (shared_data->status_report_numreportsrecieved == shared_data->num_active_working_threads) {
502  // the report is ready to be transmitted to the user: go!
503  shared_data->status_report_user_fn(shared_data->status_report_full);
504  // all reports recieved: done --> reset our status_report_* flags
505  shared_data->status_report_numreportsrecieved = 0;
506  shared_data->status_report_underway = false;
507  shared_data->status_report_initialized = false;
508  shared_data->status_report_full.workers_running.clear();
509  shared_data->status_report_full.workers_reports.clear();
510  }
511  } // if ok
512  } // omp critical
513 
514  }
515  };
516 
517  thread_shared_data shared_data;
518 
519 public:
541  TaskDispatcher(TaskCData * pcdata_, ResultsCollector * results_, LoggerType & logger_,
542  CountIntType num_total_runs_, CountIntType n_chunk_)
543  : shared_data(pcdata_, results_, logger_, num_total_runs_, n_chunk_)
544  {
545  }
546 
551  void run()
553  {
554  shared_data.results->init(shared_data.num_total_runs, shared_data.n_chunk, shared_data.pcdata);
555 
556  shared_data.logger.debug("MultiProc::OMP::TaskDispatcher::run()", "preparing for parallel runs");
557 
558  // declaring these as "const" causes a weird compiler error
559  // "`n_chunk' is predetermined `shared' for `shared'"
560  CountIntType num_total_runs = shared_data.num_total_runs;
561  CountIntType n_chunk = shared_data.n_chunk;
562  (void)n_chunk; // silence "unused variable" warning when compiling without OMP support
563 
564  CountIntType k = 0;
565 
566  thread_shared_data *shdat = &shared_data;
567  thread_private_data privdat;
568 
569  shared_data.logger.debug("MultiProc::OMP::TaskDispatcher::run()", "About to start parallel section");
570 
571 #pragma omp parallel default(none) private(k, privdat) shared(shdat, num_total_runs, n_chunk)
572  {
573  privdat.shared_data = shdat;
574  privdat.kiter = 0;
575 
576 #pragma omp for schedule(dynamic,n_chunk) nowait
577  for (k = 0; k < num_total_runs; ++k) {
578 
579  // make separate function call, so that we can tell GCC to realign the stack on
580  // platforms which don't do that automatically (yes, MinGW, it's you I'm looking
581  // at)
582  _run_task(privdat, shdat, k);
583 
584  }
585  }
586 
587  shared_data.results->runsFinished(num_total_runs, shared_data.pcdata);
588  }
589 
590 private:
591  void _run_task(thread_private_data & privdat, thread_shared_data * shdat, CountIntType k)
593  {
594 
595 #pragma omp critical
596  {
597  ++ shdat->num_active_working_threads;
598  privdat.local_status_report_counter = shdat->status_report_counter;
599  }
600 
601  // construct a thread-safe logger we can use
602  TaskLoggerType threadsafelogger(shdat->logger, shdat->pcdata, k);
603 
604  // not sure an std::ostream would be safe here threadwise...?
605  threadsafelogger.longdebug("Tomographer::MultiProc::OMP::TaskDispatcher::_run_task()",
606  "Run #%lu: thread-safe logger set up", (unsigned long)k);
607 
608  // set up our thread-private data
609  privdat.kiter = k;
610  privdat.logger = &threadsafelogger;
611 
612  // not sure an std::ostream would be safe here threadwise...?
613  threadsafelogger.longdebug("Tomographer::MultiProc::OMP::TaskDispatcher::_run_task()",
614  "Run #%lu: querying CData for task input", (unsigned long)k);
615 
616  auto input = shdat->pcdata->getTaskInput(k);
617 
618  // not sure an std::ostream would be safe here threadwise...?
619  threadsafelogger.debug("Tomographer::MultiProc::OMP::TaskDispatcher::_run_task()",
620  "Running task #%lu ...", (unsigned long)k);
621 
622  // construct a new task instance
623  TaskType t(input, shdat->pcdata, threadsafelogger);
624 
625  // not sure an std::ostream would be safe here threadwise...?
626  threadsafelogger.longdebug("Tomographer::MultiProc::OMP::TaskDispatcher::_run_task()",
627  "Task #%lu set up.", (unsigned long)k);
628 
629  // and run it
630  t.run(shdat->pcdata, threadsafelogger, &privdat);
631 
632 #pragma omp critical
633  {
634  shdat->results->collectResult(k, t.getResult(), shdat->pcdata);
635 
636  if ((int)privdat.local_status_report_counter != (int)shdat->status_report_counter) {
637  // status report request missed by task... do as if we had provided a
638  // report, but don't provide report.
639  ++ shdat->status_report_numreportsrecieved;
640  }
641 
642  ++ shdat->num_completed;
643  -- shdat->num_active_working_threads;
644  }
645  }
646 
647 public:
648 
659  inline void setStatusReportHandler(FullStatusReportCallbackType fnstatus)
660  {
661 #pragma omp critical
662  {
663  shared_data.status_report_user_fn = fnstatus;
664  }
665  }
666 
677  inline void requestStatusReport()
678  {
679  //
680  // This function can be called from a signal handler. We essentially can't do
681  // anything here because the state of the program can be pretty much anything,
682  // including inside a malloc() or gomp lock. So can't call any function which needs
683  // malloc or a #pragma omp critical.
684  //
685  // So just increment an atomic int.
686  //
687 
688  shared_data.status_report_counter = (shared_data.status_report_counter + 1) & 0x7f;
689 
690  }
691 
692 }; // class TaskDispatcher
693 
694 
698 template<typename TaskType_, typename TaskCData_, typename ResultsCollector_,
699  typename LoggerType_, typename CountIntType_ = int>
700 inline TaskDispatcher<TaskType_, TaskCData_, ResultsCollector_,
701  LoggerType_, CountIntType_>
702 makeTaskDispatcher(TaskCData_ * pcdata_, ResultsCollector_ * results_, LoggerType_ & logger_,
703  CountIntType_ num_total_runs_, CountIntType_ n_chunk_)
704 {
705  // RVO should be rather obvious to the compiler
706  return TaskDispatcher<TaskType_, TaskCData_, ResultsCollector_,
707  LoggerType_, CountIntType_>(
708  pcdata_, results_, logger_, num_total_runs_, n_chunk_
709  );
710 }
711 
712 
713 
714 } // namespace OMP
715 } // namespace MultiProc
716 
717 } // namespace Tomographer
718 
719 
720 
721 
722 
723 #endif
ThreadSanitizerLogger(BaseLogger &logger, MoreArgs &&...)
Constructor.
Definition: multiprocomp.h:184
LoggerType_ LoggerType
The logger type specified to the dispatcher (not necessarily thread-safe)
Definition: multiprocomp.h:326
Base namespace for the Tomographer project.
Definition: densellh.h:44
void emitLog(int level, const char *origin, const std::string &msg)
Implementation of Logger::LoggerBase::emitLog()
Definition: multiprocomp.h:203
TaskCData_ TaskCData
The type which stores constant, shared data for all tasks to access.
Definition: multiprocomp.h:322
void setStatusReportHandler(FullStatusReportCallbackType fnstatus)
assign a callable to be called whenever a status report is requested
Definition: multiprocomp.h:659
Base logger class.
Definition: loggers.h:427
TaskLoggerType_ TaskLoggerType
A thread-safe logger type which is passed on to the child tasks.
Definition: multiprocomp.h:330
CountIntType_ CountIntType
Integer type used to count the number of tasks to run (or running)
Definition: multiprocomp.h:328
STL class.
#define TOMOGRAPHER_CXX_STACK_FORCE_REALIGN
Definition: cxxutil.h:448
ResultsCollector_ ResultsCollector
The type which is responsible to collect the final results of the individual tasks.
Definition: multiprocomp.h:324
TaskDispatcher< TaskType_, TaskCData_, ResultsCollector_, LoggerType_, CountIntType_ > makeTaskDispatcher(TaskCData_ *pcdata_, ResultsCollector_ *results_, LoggerType_ &logger_, CountIntType_ num_total_runs_, CountIntType_ n_chunk_)
Create an OMP task dispatcher. Useful if you want C++&#39;s template argument deduction mechanism...
Definition: multiprocomp.h:702
TaskDispatcher(TaskCData *pcdata_, ResultsCollector *results_, LoggerType &logger_, CountIntType num_total_runs_, CountIntType n_chunk_)
Task dispatcher constructor.
Definition: multiprocomp.h:541
FullStatusReport< TaskStatusReportType > FullStatusReportType
The type to use to generate a full status report of all running tasks.
Definition: multiprocomp.h:332
TaskType::StatusReportType TaskStatusReportType
The type used by a single task when providing a status report.
Definition: multiprocomp.h:320
std::function< void(const FullStatusReportType &)> FullStatusReportCallbackType
The relevant type for a callback function (or callable) which is provided with the full status report...
Definition: multiprocomp.h:339
Some C++ utilities, with a tad of C++11 tricks.
Managing the need for specific overrides to operator new() for some types (especially Eigen types) ...
void run() TOMOGRAPHER_CXX_STACK_FORCE_REALIGN
Run the specified tasks.
Definition: multiprocomp.h:551
Some common definitions for multiprocessing interfaces.
Traits template struct to be specialized for specific Logger implementations.
Definition: loggers.h:351
Wrapper logger to call non-thread-safe loggers from a multithreaded environment.
Definition: multiprocomp.h:170
Dispatches tasks to parallel threads using OpenMP.
Definition: multiprocomp.h:314
STL class.
std::enable_if< dummy &&Logger::LoggerTraits< BaseLogger >::HasFilterByOrigin, bool >::type filterByOrigin(int level, const char *origin) const
Implementation of Logger::LoggerBase::filterByOrigin()
Definition: multiprocomp.h:216
Utilities for logging messages.
void requestStatusReport()
Request a status report.
Definition: multiprocomp.h:677
A complete status report, abstract version.
Definition: multiproc.h:81