Tomographer  v5.4
Tomographer C++ Framework Documentation
multiprocthreads.h
Go to the documentation of this file.
1 /* This file is part of the Tomographer project, which is distributed under the
2  * terms of the MIT license.
3  *
4  * The MIT License (MIT)
5  *
6  * Copyright (c) 2016 ETH Zurich, Institute for Theoretical Physics, Philippe Faist
7  * Copyright (c) 2017 Caltech, Institute for Quantum Information and Matter, Philippe Faist
8  *
9  * Permission is hereby granted, free of charge, to any person obtaining a copy
10  * of this software and associated documentation files (the "Software"), to deal
11  * in the Software without restriction, including without limitation the rights
12  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
13  * copies of the Software, and to permit persons to whom the Software is
14  * furnished to do so, subject to the following conditions:
15  *
16  * The above copyright notice and this permission notice shall be included in
17  * all copies or substantial portions of the Software.
18  *
19  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
24  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
25  * SOFTWARE.
26  */
27 
28 #ifndef MULTIPROCTHREADS_H
29 #define MULTIPROCTHREADS_H
30 
31 #include <chrono>
32 #include <stdexcept>
33 #include <algorithm> // std::min
34 
35 #include <boost/exception/diagnostic_information.hpp>
36 
38 #include <tomographer/tools/cxxutil.h> // tomographer_assert()
40 #include <tomographer/multiproc.h>
42 
43 #include <thread>
44 #include <mutex>
45 // Include these here only, because on MinGW with mingw-std-threads, this
46 // includes windows headers which messes up tomographer/tools/logger.h by
47 // defining ERROR preprocessor symbols and other sh***t...
48 #ifdef TOMOGRAPHER_USE_MINGW_STD_THREAD
49 # include <mingw.thread.h>
50 # include <mingw.mutex.h>
51 #endif
52 
53 
54 
64 namespace Tomographer {
65 namespace MultiProc {
66 namespace CxxThreads {
67 
68 
112 template<typename BaseLogger>
113 class TOMOGRAPHER_EXPORT ThreadSanitizerLogger
114  : public Logger::LoggerBase<ThreadSanitizerLogger<BaseLogger> >
115 {
116 public:
117  static constexpr bool IsBaseLoggerThreadSafe = Logger::LoggerTraits<BaseLogger>::IsThreadSafe;
118 private:
119  BaseLogger & _baselogger;
120 
121  std::mutex * _mutex;
122 public:
123 
133  ThreadSanitizerLogger(BaseLogger & logger, std::mutex * mutex)
134  // NOTE: pass the baselogger's level on here. The ThreadSanitizerLogger's
135  // level is this one, and is fixed and cannot be changed while running.
136  : Logger::LoggerBase<ThreadSanitizerLogger<BaseLogger> >(logger.level()),
137  _baselogger(logger),
138  _mutex(mutex)
139  {
140  }
141 
143  {
144  }
145 
146 
148  TOMOGRAPHER_ENABLED_IF(IsBaseLoggerThreadSafe)
149  inline void emitLog(int level, const char * origin, const std::string& msg)
150  {
151  _baselogger.emitLog(level, origin, msg);
152  }
153 
156  IsBaseLoggerThreadSafe)
157  bool filterByOrigin(int level, const char * origin) const
158  {
159  return _baselogger.filterByOrigin(level, origin);
160  }
161 
163  TOMOGRAPHER_ENABLED_IF(!IsBaseLoggerThreadSafe)
164  inline void emitLog(int level, const char * origin, const std::string& msg)
165  {
166  std::lock_guard<std::mutex> lock(*_mutex);
167  _baselogger.emitLog(level, origin, msg);
168  }
169 
172  !IsBaseLoggerThreadSafe)
173  bool filterByOrigin(int level, const char * origin) const
174  {
175  std::lock_guard<std::mutex> lock(*_mutex);
176  return _baselogger.filterByOrigin(level, origin);
177  }
178 
179 };
180 
181 } // namespace CxxThreads
182 } // namespace MultiProc
183 
184 namespace Logger {
191 template<typename BaseLogger>
192 struct TOMOGRAPHER_EXPORT LoggerTraits<MultiProc::CxxThreads::ThreadSanitizerLogger<BaseLogger> >
193  : public LoggerTraits<BaseLogger>
194 {
196  enum {
199  HasOwnGetLevel = 0,
201  IsThreadSafe = 1
202  };
203 };
204 } // namespace Logger
205 
206 
207 namespace MultiProc {
208 namespace CxxThreads {
209 
210 
248 template<typename TaskType_, typename TaskCData_,
249  typename LoggerType_, typename TaskCountIntType_ = int>
250 class TOMOGRAPHER_EXPORT TaskDispatcher
252  TaskType_,
253  TaskCountIntType_
254  >
255 {
256 public:
259 
261  using typename Base::TaskType;
263  using typename Base::TaskResultType;
265  using typename Base::TaskStatusReportType;
267  using typename Base::TaskCountIntType;
269  using typename Base::FullStatusReportType;
270 
272  typedef TaskCData_ TaskCData;
273 
275  typedef LoggerType_ LoggerType;
276 
279 
285  using typename Base::FullStatusReportCallbackType;
286 
287 private:
288 
289  typedef typename Base::template ThreadSharedData<TaskCData, LoggerType>
290  ThreadSharedDataType;
291 
292  ThreadSharedDataType shared_data;
293 
294  struct CriticalSectionManager {
297  std::mutex user_mutex;
298 
300  std::mutex schedule_mutex;
301 
303  std::mutex status_report_mutex;
304 
305  template<typename Fn>
306  inline void critical_status_report(Fn && fn) {
307  std::lock_guard<std::mutex> lck(status_report_mutex);
308  fn();
309  }
310  template<typename Fn>
311  inline void critical_status_report_and_user_fn(Fn && fn) {
312  std::lock(status_report_mutex, user_mutex);
313  std::lock_guard<std::mutex> lck1(status_report_mutex, std::adopt_lock);
314  std::lock_guard<std::mutex> lck2(user_mutex, std::adopt_lock);
315  fn();
316  }
317  template<typename Fn>
318  inline void critical_status_report_and_schedule(Fn && fn) {
319  std::lock(status_report_mutex, schedule_mutex);
320  std::lock_guard<std::mutex> lck1(status_report_mutex, std::adopt_lock);
321  std::lock_guard<std::mutex> lck2(schedule_mutex, std::adopt_lock);
322  fn();
323  }
324  template<typename Fn>
325  inline void critical_schedule(Fn && fn) {
326  std::lock_guard<std::mutex> lck(schedule_mutex);
327  fn();
328  }
329  };
330 
331  CriticalSectionManager critical;
332 
333  typedef typename Base::template ThreadPrivateData<
334  ThreadSharedDataType,
336  CriticalSectionManager
337  >
338  ThreadPrivateDataType;
339 
340 public:
358  TaskDispatcher(TaskCData * pcdata, LoggerType & logger,
359  TaskCountIntType num_total_runs,
360  int num_threads = 0)
361  : shared_data(pcdata, logger, num_total_runs,
362  ((num_threads > 0) ? num_threads
363  : (int)std::min(num_total_runs, (TaskCountIntType)std::thread::hardware_concurrency())) )
364  {
365  }
366 
368  : shared_data(std::move(other.shared_data))
369  // critical(std::move(other.critical)) -- mutexes are not movable, so just
370  // use new ones... ugly :(
371  {
372  }
373 
374  ~TaskDispatcher()
375  {
376  }
377 
382  void run()
383  {
385  shared_data.logger);
386  logger.debug("Let's go!");
387 
388  shared_data.time_start = Base::StdClockType::now();
389 
390  logger.debug("Preparing for parallel runs");
391 
392  auto worker_fn_id = [&](const int thread_id) noexcept(true) {
393 
394  // construct a thread-safe logger we can use
395  TaskLoggerType threadsafelogger(shared_data.logger, & critical.user_mutex);
396 
397  Tomographer::Logger::LocalLogger<TaskLoggerType> locallogger(
398  logger.originPrefix()+logger.glue()+"worker",
399  threadsafelogger);
400 
401  ThreadPrivateDataType private_data(thread_id, & shared_data, locallogger,
402  critical);
403 
404  locallogger.longdebug([&](std::ostream & stream) {
405  stream << "Thread #" << thread_id
406  << ": thread-safe logger and private thread data set up";
407  }) ;
408 
409  {
410  // active working region. This thread now takes care of handling tasks.
411  this->run_worker_enter(private_data, shared_data);
412  auto _f0 = Tools::finally([&]() {
413  this->run_worker_exit(private_data, shared_data);
414  });
415 
416  for ( ;; ) {
417  // continue doing stuff until we stop
418 
419  if (shared_data.schedule.interrupt_requested) {
420  break;
421  }
422 
423  // get new task to perform
424  critical.critical_schedule([&]() {
425  if (shared_data.schedule.num_launched ==
426  shared_data.schedule.num_total_runs) {
427  private_data.task_id = -1; // all tasks already launched ->
428  // nothing else to do
429  return;
430  }
431  private_data.task_id = shared_data.schedule.num_launched;
432  ++ shared_data.schedule.num_launched ;
433  }) ;
434 
435  if ( private_data.task_id < 0 ) {
436  // all tasks already launched -> nothing else to do
437  break;
438  }
439 
440  // run this task.
441 
442  this->run_task(private_data, shared_data) ;
443 
444  } // for(;;)
445 
446  } // end of active working region, thread on longer serves to run tasks
447  // (--num_active_working_threads is executed at this point)
448 
449  // only master thread should make sure it continues to serve status report
450  // requests
451  if (thread_id == 0 && !shared_data.schedule.interrupt_requested) {
452 
453  this->master_continue_monitoring_status(private_data, shared_data) ;
454 
455  }
456 
457  } ; // worker_fn_id
458 
459  //
460  // now, prepare & launch the workers
461  //
462 
463  logger.debug("About to launch threads");
464 
465  std::vector<std::thread> threads;
466 
467  // thread_id = 0 is reserved for ourselves.
468  for (int thread_id = 1; thread_id < shared_data.schedule.num_threads;
469  ++thread_id) {
470  // NOTE: do NOT capture thread_id by reference!
471  threads.push_back( std::thread( [thread_id,worker_fn_id]() {
472  worker_fn_id(thread_id);
473  } ) );
474  }
475 
476  // also run stuff as master thread
477  worker_fn_id(0);
478 
479  std::for_each(threads.begin(), threads.end(),
480  [](std::thread & thread) { thread.join(); }) ;
481 
482  logger.debug("Threads finished");
483 
484  this->run_epilog(shared_data, logger);
485 
486  logger.debug("All done.");
487 
488  } // run()
489 
490 
491 
495  inline TaskCountIntType numTaskRuns() const {
496  return shared_data.schedule.num_total_runs;
497  }
498 
503  return shared_data.results;
504  }
505 
510  return *shared_data.results[(std::size_t)k];
511  }
512 
513 
526  {
527  std::lock_guard<std::mutex> lck(critical.status_report_mutex) ;
528  shared_data.status_report.user_fn = fnstatus;
529  }
530 
542  inline void requestStatusReport()
543  {
544  //
545  // This function can be called from a signal handler. We essentially can't
546  // do anything here because the state of the program can be pretty much
547  // anything, including inside a malloc() or thread lock.
548  //
549  // So just increment an atomic int.
550  //
551 
552  ++ shared_data.status_report.event_counter_user;
553  }
554 
562  inline void requestPeriodicStatusReport(int milliseconds)
563  {
564  std::lock_guard<std::mutex> lck(critical.status_report_mutex) ;
565  shared_data.status_report.periodic_interval = milliseconds;
566  }
567 
581  inline void requestInterrupt()
582  {
583  // set the atomic int
584  shared_data.schedule.interrupt_requested = 1;
585  }
586 
587 }; // class TaskDispatcher
588 
589 
590 
591 template<typename TaskType_, typename TaskCData_,
592  typename LoggerType_, typename TaskCountIntType_ = int>
593 inline
595 mkTaskDispatcher(TaskCData_ * pcdata_,
596  LoggerType_ & logger_,
597  TaskCountIntType_ num_total_runs_,
598  int num_threads_ = (int)std::thread::hardware_concurrency())
599 {
601  pcdata_, logger_, num_total_runs_, num_threads_
602  ) ;
603 }
604 
605 
606 
607 } // namespace CxxThreads
608 } // namespace MultiProc
609 
610 } // namespace Tomographer
611 
612 
613 
614 
615 
616 #endif
ThreadSanitizerLogger< LoggerType_ > TaskLoggerType
A thread-safe logger type which is passed on to the child tasks.
const TaskResultType & collectedTaskResult(std::size_t k) const
Get the result of a specific given task.
Local logger: avoid having to repeat origin at each emitted message.
Definition: loggers.h:1613
TaskCountIntType_ TaskCountIntType
Integer type used to count the number of tasks to run (or running)
TaskType::ResultType TaskResultType
The task result type.
Base namespace for the Tomographer project.
Definition: densellh.h:45
thread-local variables and stuff — also serves as TaskManagerIface
void emitLog(int level, const char *origin, const std::string &msg)
Implementation of Logger::LoggerBase::emitLog() for a base logger which is thread-safe.
STL namespace.
Tomographer::MultiProc::ThreadCommon::TaskDispatcherBase< TaskType_, TaskCountIntType_ > Base
Base class, provides common functionality to all thread-based MutliProc implementations.
tomo_internal::FinalAction< F > finally(F f)
implementation of a finally clause, somewhat like in Python
Definition: cxxutil.h:93
T end(T... args)
Base logger class.
Definition: loggers.h:444
void setStatusReportHandler(FullStatusReportCallbackType fnstatus)
assign a callable to be called whenever a status report is requested
TaskType::StatusReportType TaskStatusReportType
The type used by a single task when providing a status report.
T hardware_concurrency(T... args)
#define TOMO_ORIGIN
Use this as argument for a Tomographer::Logger::LocalLogger constructor .
Definition: loggers.h:1608
void requestPeriodicStatusReport(int milliseconds)
Request a periodic status report.
TaskDispatcher(TaskCData *pcdata, LoggerType &logger, TaskCountIntType num_total_runs, int num_threads=0)
Task dispatcher constructor.
void requestStatusReport()
Request a one-time status report.
T push_back(T... args)
LoggerType_ LoggerType
The logger type specified to the dispatcher (not necessarily thread-safe)
T join(T... args)
Basic multiprocessing templates for thread-based Tomographer::MultiProc implementations.
LocalLogger< BaseLoggerType > makeLocalLogger(std::string origin_fn_name, BaseLoggerType &baselogger)
Create a local logger.
Definition: loggers.h:2040
T lock(T... args)
Some C++ utilities, with a tad of C++11 tricks.
T move(T... args)
Managing the need for specific overrides to operator new() for some types (especially Eigen types) ...
STL class.
Some common definitions for multiprocessing interfaces.
ThreadSanitizerLogger(BaseLogger &logger, std::mutex *mutex)
Constructor.
T begin(T... args)
Traits template struct to be specialized for specific Logger implementations.
Definition: loggers.h:351
TaskCData_ TaskCData
The type which stores constant, shared data for all tasks to access.
Dispatches tasks to parallel threads using C++11 native threads.
TaskCountIntType numTaskRuns() const
Total number of task run instances.
T for_each(T... args)
void requestInterrupt()
Request an immediate interruption of the tasks.
STL class.
STL class.
Provide common functionality to thread-based MultiProc implementations.
const std::vector< TaskResultType * > & collectedTaskResults() const
Get all the task results.
Utilities for logging messages.
A complete status report, abstract version.
Definition: multiproc.h:97
Wrapper logger to call non-thread-safe loggers from a multithreaded environment.