Tomographer  v5.4
Tomographer C++ Framework Documentation
multiprocomp.h
Go to the documentation of this file.
1 /* This file is part of the Tomographer project, which is distributed under the
2  * terms of the MIT license.
3  *
4  * The MIT License (MIT)
5  *
6  * Copyright (c) 2016 ETH Zurich, Institute for Theoretical Physics, Philippe Faist
7  * Copyright (c) 2017 Caltech, Institute for Quantum Information and Matter, Philippe Faist
8  *
9  * Permission is hereby granted, free of charge, to any person obtaining a copy
10  * of this software and associated documentation files (the "Software"), to deal
11  * in the Software without restriction, including without limitation the rights
12  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
13  * copies of the Software, and to permit persons to whom the Software is
14  * furnished to do so, subject to the following conditions:
15  *
16  * The above copyright notice and this permission notice shall be included in
17  * all copies or substantial portions of the Software.
18  *
19  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
24  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
25  * SOFTWARE.
26  */
27 
28 #ifndef MULTIPROCOMP_H
29 #define MULTIPROCOMP_H
30 
31 #include <csignal>
32 #include <chrono>
33 #include <stdexcept>
34 
35 #ifdef _OPENMP
36 #include <omp.h>
37 #else
38 inline constexpr int omp_get_thread_num() { return 0; }
39 inline constexpr int omp_get_num_threads() { return 1; }
40 #endif
41 
42 #include <boost/exception/diagnostic_information.hpp>
43 
45 #include <tomographer/tools/cxxutil.h> // tomographer_assert()
47 #include <tomographer/multiproc.h>
49 
50 
60 namespace Tomographer {
61 namespace MultiProc {
62 namespace OMP {
63 
64 
116 template<typename BaseLogger>
117 class TOMOGRAPHER_EXPORT ThreadSanitizerLogger
118  : public Logger::LoggerBase<ThreadSanitizerLogger<BaseLogger> >
119 {
120 public:
121  static constexpr bool IsBaseLoggerThreadSafe = Logger::LoggerTraits<BaseLogger>::IsThreadSafe;
122 private:
123  BaseLogger & _baselogger;
124 
125 public:
126 
135  template<typename... MoreArgs>
136  ThreadSanitizerLogger(BaseLogger & logger, MoreArgs && ...)
137  // NOTE: pass the baselogger's level on here. The ThreadSanitizerLogger's level is
138  // this one, and is fixed and cannot be changed while running.
139  : Logger::LoggerBase<ThreadSanitizerLogger<BaseLogger> >(logger.level()),
140  _baselogger(logger)
141  {
142  }
143 
145  {
146  }
147 
148 
150  TOMOGRAPHER_ENABLED_IF(IsBaseLoggerThreadSafe)
151  inline void emitLog(int level, const char * origin, const std::string& msg)
152  {
153  _baselogger.emitLog(level, origin, msg);
154  }
155 
158  IsBaseLoggerThreadSafe)
159  bool filterByOrigin(int level, const char * origin) const
160  {
161  return _baselogger.filterByOrigin(level, origin);
162  }
163 
165  TOMOGRAPHER_ENABLED_IF(!IsBaseLoggerThreadSafe)
166  inline void emitLog(int level, const char * origin, const std::string& msg)
167  {
168 #pragma omp critical
169  {
170  _baselogger.emitLog(level, origin, msg);
171  }
172  }
173 
176  !IsBaseLoggerThreadSafe)
177  bool filterByOrigin(int level, const char * origin) const
178  {
179  bool ok;
180 #pragma omp critical
181  {
182  ok = _baselogger.filterByOrigin(level, origin);
183  }
184 
185  return ok;
186  }
187 
188 };
189 
190 } // namespace OMP
191 } // namespace MultiProc
192 
193 namespace Logger {
200 template<typename BaseLogger>
201 struct TOMOGRAPHER_EXPORT LoggerTraits<MultiProc::OMP::ThreadSanitizerLogger<BaseLogger> >
202  : public LoggerTraits<BaseLogger>
203 {
205  enum {
208  HasOwnGetLevel = 0,
210  IsThreadSafe = 1
211  };
212 };
213 } // namespace Logger
214 
215 
216 namespace MultiProc {
217 namespace OMP {
218 
219 
277 template<typename TaskType_, typename TaskCData_, typename LoggerType_,
278  typename TaskCountIntType_ = int,
279  typename TaskLoggerType_ = ThreadSanitizerLogger<LoggerType_> >
280 class TOMOGRAPHER_EXPORT TaskDispatcher
282  TaskType_,
283  TaskCountIntType_
284  >
285 {
286 public:
289 
291  using typename Base::TaskType;
293  using typename Base::TaskResultType;
295  using typename Base::TaskStatusReportType;
297  using typename Base::TaskCountIntType;
299  using typename Base::FullStatusReportType;
300 
302  typedef TaskCData_ TaskCData;
303 
305  typedef LoggerType_ LoggerType;
306 
308  typedef TaskLoggerType_ TaskLoggerType;
309 
315  using typename Base::FullStatusReportCallbackType;
316 
317 private:
318 
319  typedef typename Base::template ThreadSharedData<TaskCData, LoggerType>
320  ThreadSharedDataType;
321 
322  ThreadSharedDataType shared_data;
323  TaskCountIntType n_chunk;
324 
325  struct CriticalSectionManager {
326 
327  template<typename Fn>
328  inline void critical_status_report(Fn && fn) {
329 #pragma omp critical
330  {
331  fn();
332  }
333  }
334  template<typename Fn>
335  inline void critical_status_report_and_user_fn(Fn && fn) {
336 #pragma omp critical
337  {
338  fn();
339  }
340  }
341  template<typename Fn>
342  inline void critical_status_report_and_schedule(Fn && fn) {
343 #pragma omp critical
344  {
345  fn();
346  }
347  }
348  template<typename Fn>
349  inline void critical_schedule(Fn && fn) {
350 #pragma omp critical
351  {
352  fn();
353  }
354  }
355  };
356 
357  CriticalSectionManager critical;
358 
359  typedef typename Base::template ThreadPrivateData<
360  ThreadSharedDataType,
362  CriticalSectionManager
363  >
364  ThreadPrivateDataType;
365 
366 public:
387  TaskDispatcher(TaskCData * pcdata_, LoggerType & logger_,
388  TaskCountIntType num_total_runs_,
389  TaskCountIntType n_chunk_ = 1)
390  : shared_data(pcdata_, logger_, num_total_runs_, 0), n_chunk(n_chunk_)
391  {
392  }
393 
395  : shared_data(std::move(x.shared_data)),
396  n_chunk(x.n_chunk)
397  {
398  }
399 
400  ~TaskDispatcher()
401  {
402  }
403 
408  void run()
409  {
411  shared_data.logger);
412  logger.debug("Let's go!");
413 
414  shared_data.time_start = Base::StdClockType::now();
415 
416  logger.debug("Preparing for parallel runs");
417 
418 #ifndef _OPENMP
419  logger.warning("OpenMP is disabled; tasks will run serially.");
420 #endif
421 
422  // declaring these as "const" causes a weird compiler error
423  // "`n_chunk' is predetermined `shared' for `shared'"
424  TaskCountIntType num_total_runs = shared_data.schedule.num_total_runs;
425  TaskCountIntType n_chunk_ = n_chunk;
426  (void)n_chunk_; // silence "unused variable" warning when compiling without OMP support
427 
428  TaskCountIntType k = 0;
429 
430  ThreadSharedDataType * shdat = & shared_data;
431 
432  const std::string logger_prefix = logger.originPrefix()+logger.glue()+"worker";
433  const std::string * logger_prefix_ptr = &logger_prefix;
434 
435  logger.debug("About to start parallel section");
436 
437 #pragma omp parallel default(none) private(k) shared(shdat, logger_prefix_ptr, num_total_runs, n_chunk_)
438  {
439  // construct a thread-safe logger we can use
440  TaskLoggerType threadsafelogger(shared_data.logger, shdat->pcdata, k);
441 
442  Tomographer::Logger::LocalLogger<TaskLoggerType> locallogger(*logger_prefix_ptr, threadsafelogger);
443 
444  ThreadPrivateDataType private_data(omp_get_thread_num(), & shared_data, locallogger, critical);
445 
446  private_data.shared_data = shdat;
447  private_data.task_id = -1;
448 
449  // master thread sets shared_data.schedule.num_threads ...
450 #pragma omp master
451  {
452  shdat->schedule.num_threads = omp_get_num_threads();
453  }
454 
455  // ... while other threads wait for master to be done
456 #pragma omp barrier
457 #pragma omp flush
458 
459 
460  //
461  // Register new parallel worker
462  //
463  this->run_worker_enter(private_data, *shdat);
464 
465  //
466  // The main, parallel FOR loop:
467  //
468 #pragma omp for schedule(dynamic,n_chunk) nowait
469  for (k = 0; k < num_total_runs; ++k) {
470 
471  private_data.task_id = k;
472 
473  this->run_task(private_data, shared_data);
474 
475  } // omp for
476 
477  //
478  // De-register parallel worker
479  //
480  this->run_worker_exit(private_data, *shdat);
481 
482 #pragma omp master
483  {
484  this->master_continue_monitoring_status(private_data, *shdat) ;
485  }
486 
487  } // omp parallel
488 
489  logger.debug("OpenMP parallel section finished");
490 
491  this->run_epilog(shared_data, logger) ;
492 
493  logger.debug("Done.");
494  }
495 
499  inline TaskCountIntType numTaskRuns() const {
500  return shared_data.schedule.num_total_runs;
501  }
502 
507  return shared_data.results;
508  }
509 
514  return *shared_data.results[k];
515  }
516 
517 
518 
532  {
533 #pragma omp critical
534  {
535  shared_data.status_report.user_fn = fnstatus;
536  }
537  }
538 
550  inline void requestStatusReport()
551  {
552  //
553  // This function can be called from a signal handler. We essentially can't
554  // do anything here because the state of the program can be pretty much
555  // anything, including inside a malloc() or gomp lock. So can't call any
556  // function which needs malloc or a #pragma omp critical.
557  //
558  // So just increment an atomic int.
559  //
560 
561  ++ shared_data.status_report.event_counter_user;
562 
563  }
564 
572  inline void requestPeriodicStatusReport(int milliseconds)
573  {
574 #pragma omp critical
575  {
576  shared_data.status_report.periodic_interval = milliseconds;
577  }
578  }
579 
593  inline void requestInterrupt()
594  {
595  shared_data.schedule.interrupt_requested = 1;
596  }
597 
598 
599 }; // class TaskDispatcher
600 
601 
605 template<typename TaskType_, typename TaskCData_,
606  typename LoggerType_, typename TaskCountIntType_ = int>
607 inline TaskDispatcher<TaskType_, TaskCData_,
608  LoggerType_, TaskCountIntType_>
609 makeTaskDispatcher(TaskCData_ * pcdata_, LoggerType_ & logger_,
610  TaskCountIntType_ num_total_runs_, TaskCountIntType_ n_chunk_ = 1)
611 {
612  // RVO should be rather obvious to the compiler
613  return TaskDispatcher<TaskType_, TaskCData_,
614  LoggerType_, TaskCountIntType_>(
615  pcdata_, logger_, num_total_runs_, n_chunk_
616  );
617 }
618 
619 
620 
621 } // namespace OMP
622 } // namespace MultiProc
623 
624 } // namespace Tomographer
625 
626 
627 
628 
629 
630 #endif
void requestPeriodicStatusReport(int milliseconds)
Request a periodic status report.
Definition: multiprocomp.h:572
Local logger: avoid having to repeat origin at each emitted message.
Definition: loggers.h:1613
void requestStatusReport()
Request a status report.
Definition: multiprocomp.h:550
TaskCountIntType_ TaskCountIntType
Integer type used to count the number of tasks to run (or running)
ThreadSanitizerLogger(BaseLogger &logger, MoreArgs &&...)
Constructor.
Definition: multiprocomp.h:136
TaskType::ResultType TaskResultType
The task result type.
Base namespace for the Tomographer project.
Definition: densellh.h:45
const TaskResultType & collectedTaskResult(std::size_t k) const
Get the result of a specific given task.
Definition: multiprocomp.h:513
void run()
Run the specified tasks.
Definition: multiprocomp.h:408
LoggerType_ LoggerType
The logger type specified to the dispatcher (not necessarily thread-safe)
Definition: multiprocomp.h:305
thread-local variables and stuff — also serves as TaskManagerIface
STL namespace.
Tomographer::MultiProc::ThreadCommon::TaskDispatcherBase< TaskType_, TaskCountIntType_ > Base
Base class, provides common functionality to all thread-based MutliProc implementations.
Definition: multiprocomp.h:288
Base logger class.
Definition: loggers.h:444
TaskType::StatusReportType TaskStatusReportType
The type used by a single task when providing a status report.
#define TOMO_ORIGIN
Use this as argument for a Tomographer::Logger::LocalLogger constructor .
Definition: loggers.h:1608
STL class.
const std::vector< TaskResultType * > & collectedTaskResults() const
Get all the task results.
Definition: multiprocomp.h:506
TaskCountIntType_ TaskCountIntType
Integer type used to count the number of tasks to run (or running)
void setStatusReportHandler(FullStatusReportCallbackType fnstatus)
assign a callable to be called whenever a status report is requested
Definition: multiprocomp.h:531
Basic multiprocessing templates for thread-based Tomographer::MultiProc implementations.
LocalLogger< BaseLoggerType > makeLocalLogger(std::string origin_fn_name, BaseLoggerType &baselogger)
Create a local logger.
Definition: loggers.h:2040
TaskDispatcher< TaskType_, TaskCData_, LoggerType_, TaskCountIntType_ > makeTaskDispatcher(TaskCData_ *pcdata_, LoggerType_ &logger_, TaskCountIntType_ num_total_runs_, TaskCountIntType_ n_chunk_=1)
Create an OMP task dispatcher. Useful if you want C++&#39;s template argument deduction mechanism...
Definition: multiprocomp.h:609
TaskLoggerType_ TaskLoggerType
A thread-safe logger type which is passed on to the child tasks.
Definition: multiprocomp.h:308
Some C++ utilities, with a tad of C++11 tricks.
T move(T... args)
Managing the need for specific overrides to operator new() for some types (especially Eigen types) ...
STL class.
Some common definitions for multiprocessing interfaces.
Traits template struct to be specialized for specific Logger implementations.
Definition: loggers.h:351
Wrapper logger to call non-thread-safe loggers from a multithreaded environment.
Definition: multiprocomp.h:117
Dispatches tasks to parallel threads using OpenMP.
Definition: multiprocomp.h:280
TaskCountIntType numTaskRuns() const
Total number of task run instances.
Definition: multiprocomp.h:499
void emitLog(int level, const char *origin, const std::string &msg)
Implementation of Logger::LoggerBase::emitLog() for a base logger which is thread-safe.
Definition: multiprocomp.h:151
TaskCData_ TaskCData
The type which stores constant, shared data for all tasks to access.
Definition: multiprocomp.h:302
void requestInterrupt()
Request an immediate interruption of the tasks.
Definition: multiprocomp.h:593
Provide common functionality to thread-based MultiProc implementations.
TaskDispatcher(TaskCData *pcdata_, LoggerType &logger_, TaskCountIntType num_total_runs_, TaskCountIntType n_chunk_=1)
Task dispatcher constructor.
Definition: multiprocomp.h:387
Utilities for logging messages.
A complete status report, abstract version.
Definition: multiproc.h:97