Tomographer  v5.2
Tomographer C++ Framework Documentation
multiprocthreads.h
Go to the documentation of this file.
1 /* This file is part of the Tomographer project, which is distributed under the
2  * terms of the MIT license.
3  *
4  * The MIT License (MIT)
5  *
6  * Copyright (c) 2016 ETH Zurich, Institute for Theoretical Physics, Philippe Faist
7  * Copyright (c) 2017 Caltech, Institute for Quantum Information and Matter, Philippe Faist
8  *
9  * Permission is hereby granted, free of charge, to any person obtaining a copy
10  * of this software and associated documentation files (the "Software"), to deal
11  * in the Software without restriction, including without limitation the rights
12  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
13  * copies of the Software, and to permit persons to whom the Software is
14  * furnished to do so, subject to the following conditions:
15  *
16  * The above copyright notice and this permission notice shall be included in
17  * all copies or substantial portions of the Software.
18  *
19  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
24  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
25  * SOFTWARE.
26  */
27 
28 #ifndef MULTIPROCTHREADS_H
29 #define MULTIPROCTHREADS_H
30 
31 #include <chrono>
32 #include <stdexcept>
33 
34 #include <boost/exception/diagnostic_information.hpp>
35 
37 #include <tomographer/tools/cxxutil.h> // tomographer_assert()
39 #include <tomographer/multiproc.h>
41 
42 #include <thread>
43 #include <mutex>
44 // Include these here only, because on MinGW with mingw-std-threads, this
45 // includes windows headers which messes up tomographer/tools/logger.h by
46 // defining ERROR preprocessor symbols and other sh***t...
47 #ifdef TOMOGRAPHER_USE_MINGW_STD_THREAD
48 # include <mingw.thread.h>
49 # include <mingw.mutex.h>
50 #endif
51 
52 
53 
63 namespace Tomographer {
64 namespace MultiProc {
65 namespace CxxThreads {
66 
67 
111 template<typename BaseLogger>
112 class TOMOGRAPHER_EXPORT ThreadSanitizerLogger
113  : public Logger::LoggerBase<ThreadSanitizerLogger<BaseLogger> >
114 {
115 public:
116  static constexpr bool IsBaseLoggerThreadSafe = Logger::LoggerTraits<BaseLogger>::IsThreadSafe;
117 private:
118  BaseLogger & _baselogger;
119 
120  std::mutex * _mutex;
121 public:
122 
132  ThreadSanitizerLogger(BaseLogger & logger, std::mutex * mutex)
133  // NOTE: pass the baselogger's level on here. The ThreadSanitizerLogger's
134  // level is this one, and is fixed and cannot be changed while running.
135  : Logger::LoggerBase<ThreadSanitizerLogger<BaseLogger> >(logger.level()),
136  _baselogger(logger),
137  _mutex(mutex)
138  {
139  }
140 
142  {
143  }
144 
145 
147  TOMOGRAPHER_ENABLED_IF(IsBaseLoggerThreadSafe)
148  inline void emitLog(int level, const char * origin, const std::string& msg)
149  {
150  _baselogger.emitLog(level, origin, msg);
151  }
152 
155  IsBaseLoggerThreadSafe)
156  bool filterByOrigin(int level, const char * origin) const
157  {
158  return _baselogger.filterByOrigin(level, origin);
159  }
160 
162  TOMOGRAPHER_ENABLED_IF(!IsBaseLoggerThreadSafe)
163  inline void emitLog(int level, const char * origin, const std::string& msg)
164  {
165  std::lock_guard<std::mutex> lock(*_mutex);
166  _baselogger.emitLog(level, origin, msg);
167  }
168 
171  !IsBaseLoggerThreadSafe)
172  bool filterByOrigin(int level, const char * origin) const
173  {
174  std::lock_guard<std::mutex> lock(*_mutex);
175  return _baselogger.filterByOrigin(level, origin);
176  }
177 
178 };
179 
180 } // namespace CxxThreads
181 } // namespace MultiProc
182 
183 namespace Logger {
190 template<typename BaseLogger>
191 struct TOMOGRAPHER_EXPORT LoggerTraits<MultiProc::CxxThreads::ThreadSanitizerLogger<BaseLogger> >
192  : public LoggerTraits<BaseLogger>
193 {
195  enum {
198  HasOwnGetLevel = 0,
200  IsThreadSafe = 1
201  };
202 };
203 } // namespace Logger
204 
205 
206 namespace MultiProc {
207 namespace CxxThreads {
208 
209 
247 template<typename TaskType_, typename TaskCData_,
248  typename LoggerType_, typename TaskCountIntType_ = int>
249 class TOMOGRAPHER_EXPORT TaskDispatcher
251  TaskType_,
252  TaskCountIntType_
253  >
254 {
255 public:
258 
260  using typename Base::TaskType;
262  using typename Base::TaskResultType;
264  using typename Base::TaskStatusReportType;
266  using typename Base::TaskCountIntType;
268  using typename Base::FullStatusReportType;
269 
271  typedef TaskCData_ TaskCData;
272 
274  typedef LoggerType_ LoggerType;
275 
278 
284  using typename Base::FullStatusReportCallbackType;
285 
286 private:
287 
288  typedef typename Base::template ThreadSharedData<TaskCData, LoggerType>
289  ThreadSharedDataType;
290 
291  ThreadSharedDataType shared_data;
292 
293  struct CriticalSectionManager {
296  std::mutex user_mutex;
297 
299  std::mutex schedule_mutex;
300 
302  std::mutex status_report_mutex;
303 
304  template<typename Fn>
305  inline void critical_status_report(Fn && fn) {
306  std::lock_guard<std::mutex> lck(status_report_mutex);
307  fn();
308  }
309  template<typename Fn>
310  inline void critical_status_report_and_user_fn(Fn && fn) {
311  std::lock(status_report_mutex, user_mutex);
312  std::lock_guard<std::mutex> lck1(status_report_mutex, std::adopt_lock);
313  std::lock_guard<std::mutex> lck2(user_mutex, std::adopt_lock);
314  fn();
315  }
316  template<typename Fn>
317  inline void critical_status_report_and_schedule(Fn && fn) {
318  std::lock(status_report_mutex, schedule_mutex);
319  std::lock_guard<std::mutex> lck1(status_report_mutex, std::adopt_lock);
320  std::lock_guard<std::mutex> lck2(schedule_mutex, std::adopt_lock);
321  fn();
322  }
323  template<typename Fn>
324  inline void critical_schedule(Fn && fn) {
325  std::lock_guard<std::mutex> lck(schedule_mutex);
326  fn();
327  }
328  };
329 
330  CriticalSectionManager critical;
331 
332  typedef typename Base::template ThreadPrivateData<
333  ThreadSharedDataType,
335  CriticalSectionManager
336  >
337  ThreadPrivateDataType;
338 
339 public:
356  TaskDispatcher(TaskCData * pcdata, LoggerType & logger,
357  TaskCountIntType num_total_runs,
358  int num_threads = (int)std::thread::hardware_concurrency())
359  : shared_data(pcdata, logger, num_total_runs, num_threads)
360  {
361  }
362 
364  : shared_data(std::move(other.shared_data))
365  // critical(std::move(other.critical)) -- mutexes are not movable, so just
366  // use new ones... ugly :(
367  {
368  }
369 
370  ~TaskDispatcher()
371  {
372  }
373 
378  void run()
379  {
381  shared_data.logger);
382  logger.debug("Let's go!");
383 
384  shared_data.time_start = Base::StdClockType::now();
385 
386  logger.debug("Preparing for parallel runs");
387 
388  auto worker_fn_id = [&](const int thread_id) noexcept(true) {
389 
390  // construct a thread-safe logger we can use
391  TaskLoggerType threadsafelogger(shared_data.logger, & critical.user_mutex);
392 
393  Tomographer::Logger::LocalLogger<TaskLoggerType> locallogger(
394  logger.originPrefix()+logger.glue()+"worker",
395  threadsafelogger);
396 
397  ThreadPrivateDataType private_data(thread_id, & shared_data, locallogger,
398  critical);
399 
400  locallogger.longdebug([&](std::ostream & stream) {
401  stream << "Thread #" << thread_id
402  << ": thread-safe logger and private thread data set up";
403  }) ;
404 
405  {
406  // active working region. This thread now takes care of handling tasks.
407  this->run_worker_enter(private_data, shared_data);
408  auto _f0 = Tools::finally([&]() {
409  this->run_worker_exit(private_data, shared_data);
410  });
411 
412  for ( ;; ) {
413  // continue doing stuff until we stop
414 
415  if (shared_data.schedule.interrupt_requested) {
416  break;
417  }
418 
419  // get new task to perform
420  critical.critical_schedule([&]() {
421  if (shared_data.schedule.num_launched ==
422  shared_data.schedule.num_total_runs) {
423  private_data.task_id = -1; // all tasks already launched ->
424  // nothing else to do
425  return;
426  }
427  private_data.task_id = shared_data.schedule.num_launched;
428  ++ shared_data.schedule.num_launched ;
429  }) ;
430 
431  if ( private_data.task_id < 0 ) {
432  // all tasks already launched -> nothing else to do
433  break;
434  }
435 
436  // run this task.
437 
438  this->run_task(private_data, shared_data) ;
439 
440  } // for(;;)
441 
442  } // end of active working region, thread on longer serves to run tasks
443  // (--num_active_working_threads is executed at this point)
444 
445  // only master thread should make sure it continues to serve status report
446  // requests
447  if (thread_id == 0 && !shared_data.schedule.interrupt_requested) {
448 
449  this->master_continue_monitoring_status(private_data, shared_data) ;
450 
451  }
452 
453  } ; // worker_fn_id
454 
455  //
456  // now, prepare & launch the workers
457  //
458 
459  logger.debug("About to launch threads");
460 
461  std::vector<std::thread> threads;
462 
463  // thread_id = 0 is reserved for ourselves.
464  for (int thread_id = 1; thread_id < shared_data.schedule.num_threads;
465  ++thread_id) {
466  // NOTE: do NOT capture thread_id by reference!
467  threads.push_back( std::thread( [thread_id,worker_fn_id]() {
468  worker_fn_id(thread_id);
469  } ) );
470  }
471 
472  // also run stuff as master thread
473  worker_fn_id(0);
474 
475  std::for_each(threads.begin(), threads.end(),
476  [](std::thread & thread) { thread.join(); }) ;
477 
478  logger.debug("Threads finished");
479 
480  this->run_epilog(shared_data, logger);
481 
482  logger.debug("All done.");
483 
484  } // run()
485 
486 
487 
491  inline TaskCountIntType numTaskRuns() const {
492  return shared_data.schedule.num_total_runs;
493  }
494 
499  return shared_data.results;
500  }
501 
506  return *shared_data.results[(std::size_t)k];
507  }
508 
509 
522  {
523  std::lock_guard<std::mutex> lck(critical.status_report_mutex) ;
524  shared_data.status_report.user_fn = fnstatus;
525  }
526 
538  inline void requestStatusReport()
539  {
540  //
541  // This function can be called from a signal handler. We essentially can't
542  // do anything here because the state of the program can be pretty much
543  // anything, including inside a malloc() or thread lock.
544  //
545  // So just increment an atomic int.
546  //
547 
548  ++ shared_data.status_report.event_counter_user;
549  }
550 
558  inline void requestPeriodicStatusReport(int milliseconds)
559  {
560  std::lock_guard<std::mutex> lck(critical.status_report_mutex) ;
561  shared_data.status_report.periodic_interval = milliseconds;
562  }
563 
577  inline void requestInterrupt()
578  {
579  // set the atomic int
580  shared_data.schedule.interrupt_requested = 1;
581  }
582 
583 }; // class TaskDispatcher
584 
585 
586 
587 template<typename TaskType_, typename TaskCData_,
588  typename LoggerType_, typename TaskCountIntType_ = int>
589 inline
591 mkTaskDispatcher(TaskCData_ * pcdata_,
592  LoggerType_ & logger_,
593  TaskCountIntType_ num_total_runs_,
594  int num_threads_ = (int)std::thread::hardware_concurrency())
595 {
597  pcdata_, logger_, num_total_runs_, num_threads_
598  ) ;
599 }
600 
601 
602 
603 } // namespace CxxThreads
604 } // namespace MultiProc
605 
606 } // namespace Tomographer
607 
608 
609 
610 
611 
612 #endif
ThreadSanitizerLogger< LoggerType_ > TaskLoggerType
A thread-safe logger type which is passed on to the child tasks.
const TaskResultType & collectedTaskResult(std::size_t k) const
Get the result of a specific given task.
Local logger: avoid having to repeat origin at each emitted message.
Definition: loggers.h:1533
TaskCountIntType_ TaskCountIntType
Integer type used to count the number of tasks to run (or running)
TaskType::ResultType TaskResultType
The task result type.
Base namespace for the Tomographer project.
Definition: densellh.h:45
thread-local variables and stuff — also serves as TaskManagerIface
void emitLog(int level, const char *origin, const std::string &msg)
Implementation of Logger::LoggerBase::emitLog() for a base logger which is thread-safe.
STL namespace.
Tomographer::MultiProc::ThreadCommon::TaskDispatcherBase< TaskType_, TaskCountIntType_ > Base
Base class, provides common functionality to all thread-based MutliProc implementations.
tomo_internal::FinalAction< F > finally(F f)
implementation of a finally clause, somewhat like in Python
Definition: cxxutil.h:93
T end(T... args)
Base logger class.
Definition: loggers.h:444
void setStatusReportHandler(FullStatusReportCallbackType fnstatus)
assign a callable to be called whenever a status report is requested
TaskType::StatusReportType TaskStatusReportType
The type used by a single task when providing a status report.
T hardware_concurrency(T... args)
TaskDispatcher(TaskCData *pcdata, LoggerType &logger, TaskCountIntType num_total_runs, int num_threads=(int) std::thread::hardware_concurrency())
Task dispatcher constructor.
#define TOMO_ORIGIN
Use this as argument for a Tomographer::Logger::LocalLogger constructor .
Definition: loggers.h:1528
void requestPeriodicStatusReport(int milliseconds)
Request a periodic status report.
void requestStatusReport()
Request a one-time status report.
T push_back(T... args)
LoggerType_ LoggerType
The logger type specified to the dispatcher (not necessarily thread-safe)
T join(T... args)
Basic multiprocessing templates for thread-based Tomographer::MultiProc implementations.
LocalLogger< BaseLoggerType > makeLocalLogger(std::string origin_fn_name, BaseLoggerType &baselogger)
Create a local logger.
Definition: loggers.h:1960
T lock(T... args)
Some C++ utilities, with a tad of C++11 tricks.
T move(T... args)
Managing the need for specific overrides to operator new() for some types (especially Eigen types) ...
STL class.
Some common definitions for multiprocessing interfaces.
ThreadSanitizerLogger(BaseLogger &logger, std::mutex *mutex)
Constructor.
T begin(T... args)
Traits template struct to be specialized for specific Logger implementations.
Definition: loggers.h:351
TaskCData_ TaskCData
The type which stores constant, shared data for all tasks to access.
Dispatches tasks to parallel threads using C++11 native threads.
TaskCountIntType numTaskRuns() const
Total number of task run instances.
T for_each(T... args)
void requestInterrupt()
Request an immediate interruption of the tasks.
STL class.
STL class.
Provide common functionality to thread-based MultiProc implementations.
const std::vector< TaskResultType * > & collectedTaskResults() const
Get all the task results.
Utilities for logging messages.
A complete status report, abstract version.
Definition: multiproc.h:85
Wrapper logger to call non-thread-safe loggers from a multithreaded environment.