Loading [MathJax]/extensions/tex2jax.js
Tomographerv4.1
Tomographer C++ Framework Documentation
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
multiprocthreads.h
Go to the documentation of this file.
1 /* This file is part of the Tomographer project, which is distributed under the
2  * terms of the MIT license.
3  *
4  * The MIT License (MIT)
5  *
6  * Copyright (c) 2016 ETH Zurich, Institute for Theoretical Physics, Philippe Faist
7  * Copyright (c) 2017 Caltech, Institute for Quantum Information and Matter, Philippe Faist
8  *
9  * Permission is hereby granted, free of charge, to any person obtaining a copy
10  * of this software and associated documentation files (the "Software"), to deal
11  * in the Software without restriction, including without limitation the rights
12  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
13  * copies of the Software, and to permit persons to whom the Software is
14  * furnished to do so, subject to the following conditions:
15  *
16  * The above copyright notice and this permission notice shall be included in
17  * all copies or substantial portions of the Software.
18  *
19  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
24  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
25  * SOFTWARE.
26  */
27 
28 #ifndef MULTIPROCTHREADS_H
29 #define MULTIPROCTHREADS_H
30 
31 #include <chrono>
32 #include <stdexcept>
33 
34 #include <thread>
35 #include <mutex>
36 
37 #include <boost/exception/diagnostic_information.hpp>
38 
40 #include <tomographer/tools/cxxutil.h> // tomographer_assert()
42 #include <tomographer/multiproc.h>
43 
44 
45 
46 
56 namespace Tomographer {
57 namespace MultiProc {
58 namespace CxxThreads {
59 
60 
104 template<typename BaseLogger>
105 TOMOGRAPHER_EXPORT class ThreadSanitizerLogger
106  : public Logger::LoggerBase<ThreadSanitizerLogger<BaseLogger> >
107 {
108 public:
109  static constexpr bool IsBaseLoggerThreadSafe = Logger::LoggerTraits<BaseLogger>::IsThreadSafe;
110 private:
111  BaseLogger & _baselogger;
112 
113  std::mutex * _mutex;
114 public:
115 
124  template<typename... MoreArgs>
125  ThreadSanitizerLogger(BaseLogger & logger, std::mutex * mutex)
126  // NOTE: pass the baselogger's level on here. The ThreadSanitizerLogger's level is
127  // this one, and is fixed and cannot be changed while running.
128  : Logger::LoggerBase<ThreadSanitizerLogger<BaseLogger> >(logger.level()),
129  _baselogger(logger),
130  _mutex(mutex)
131  {
132  }
133 
135  {
136  }
137 
138 
140  TOMOGRAPHER_ENABLED_IF(IsBaseLoggerThreadSafe)
141  inline void emitLog(int level, const char * origin, const std::string& msg)
142  {
143  _baselogger.emitLog(level, origin, msg);
144  }
145 
148  IsBaseLoggerThreadSafe)
149  bool filterByOrigin(int level, const char * origin) const
150  {
151  return _baselogger.filterByOrigin(level, origin);
152  }
153 
155  TOMOGRAPHER_ENABLED_IF(!IsBaseLoggerThreadSafe)
156  inline void emitLog(int level, const char * origin, const std::string& msg)
157  {
158  std::lock_guard<std::mutex> lock(*_mutex);
159  _baselogger.emitLog(level, origin, msg);
160  }
161 
164  !IsBaseLoggerThreadSafe)
165  bool filterByOrigin(int level, const char * origin) const
166  {
167  std::lock_guard<std::mutex> lock(*_mutex);
168  return _baselogger.filterByOrigin(level, origin);
169  }
170 
171 };
172 
173 } // namespace CxxThreads
174 } // namespace MultiProc
175 
176 namespace Logger {
183 template<typename BaseLogger>
184 struct LoggerTraits<MultiProc::CxxThreads::ThreadSanitizerLogger<BaseLogger> > : public LoggerTraits<BaseLogger>
185 {
187  enum {
193  };
194 };
195 } // namespace Logger
196 
197 
198 namespace MultiProc {
199 namespace CxxThreads {
200 
201 
258 template<typename TaskType_, typename TaskCData_, typename ResultsCollector_,
259  typename LoggerType_, typename CountIntType_ = int>
260 TOMOGRAPHER_EXPORT class TaskDispatcher
261 {
262 public:
264  typedef TaskType_ TaskType;
266  typedef typename TaskType::StatusReportType TaskStatusReportType;
268  typedef TaskCData_ TaskCData;
270  typedef ResultsCollector_ ResultsCollector;
272  typedef LoggerType_ LoggerType;
274  typedef CountIntType_ CountIntType;
279 
286 
287 private:
288 
289  typedef
290 #if defined(__GNUC__) && __GNUC__ == 4 && __GNUC_MINOR__ <= 6 && !defined(__clang__)
291  std::chrono::monotonic_clock // for GCC/G++ 4.6
292 #else
294 #endif
295  StdClockType;
296 
297  struct TaskInterruptedInnerException : public std::exception {
298  std::string msg;
299  public:
300  TaskInterruptedInnerException() : msg("Task Interrupted") { }
301  virtual ~TaskInterruptedInnerException() throw() { };
302  const char * what() const throw() { return msg.c_str(); }
303  };
304  struct TaskInnerException : public std::exception {
305  std::string msg;
306  public:
307  TaskInnerException(std::string msgexc) : msg("Task raised an exception: "+msgexc) { }
308  virtual ~TaskInnerException() throw() { };
309  const char * what() const throw() { return msg.c_str(); }
310  };
311 
313  struct thread_shared_data {
314  thread_shared_data(const TaskCData * pcdata_, ResultsCollector * results_, LoggerType & logger_,
315  CountIntType num_total_runs, CountIntType num_threads)
316  : pcdata(pcdata_),
317  results(results_),
318  logger(logger_),
319  time_start(),
320  schedule(num_total_runs, num_threads),
321  status_report()
322  { }
323 
324  const TaskCData * pcdata;
325  std::mutex user_mutex; // mutex for IO, as well as interface user interaction (results collector etc.)
326 
327  ResultsCollector * results;
328  LoggerType & logger;
329 
330  StdClockType::time_point time_start;
331 
332  struct Schedule {
333  const CountIntType num_threads;
334  CountIntType num_active_working_threads;
335 
336  const CountIntType num_total_runs;
337  CountIntType num_completed;
338  CountIntType num_launched;
339 
340  std::sig_atomic_t interrupt_requested;
341  std::string inner_exception;
342 
343  std::mutex mutex;
344 
345  Schedule(CountIntType num_total_runs_, CountIntType num_threads_)
346  : num_threads(num_threads_),
347  num_active_working_threads(0),
348  num_total_runs(num_total_runs_),
349  num_completed(0),
350  num_launched(0),
351  interrupt_requested(0),
352  inner_exception(),
353  mutex()
354  {
355  }
356  };
357  Schedule schedule;
358 
359  struct StatusReport {
360  bool underway;
361  bool initialized;
362  bool ready;
363  int periodic_interval;
364  CountIntType numreportsrecieved;
365  FullStatusReportType full_report;
366  FullStatusReportCallbackType user_fn;
367 
368  std::sig_atomic_t counter;
369 
370  std::mutex mutex;
371 
372  StatusReport()
373  : underway(false),
374  initialized(false),
375  ready(false),
376  periodic_interval(-1),
377  numreportsrecieved(0),
378  full_report(),
379  user_fn(),
380  counter(0),
381  mutex()
382  {
383  }
384  };
385  StatusReport status_report;
386 
387  template<typename Struct, typename Fn>
388  void with_lock(Struct & s, Fn fn) {
389  std::lock_guard<std::mutex> lock(s.mutex);
390  fn(s);
391  }
392  };
393 
395  struct thread_private_data
396  {
397  const CountIntType thread_id;
398 
399  thread_shared_data * shared_data;
400 
401  TaskLoggerType & logger;
402 
403  CountIntType task_id;
404  CountIntType local_status_report_counter;
405 
406  thread_private_data(CountIntType thread_id_, thread_shared_data * shared_data_, TaskLoggerType & logger_)
407  : thread_id(thread_id_),
408  shared_data(shared_data_),
409  logger(logger_),
410  task_id(-1),
411  local_status_report_counter(0)
412  {
413  }
414 
415  inline bool statusReportRequested() const
416  {
417  if (shared_data->schedule.interrupt_requested) {
418  logger.longdebug("CxxThreads::thread_private_data::statusReportRequested()",
419  "tasks interrupt has been requested");
420  throw TaskInterruptedInnerException();
421  }
422 
423  //
424  // if we're the master thread, we have some admin to do.
425  //
426  if (thread_id == 0) {
427  // Update the status_report_counter according to whether
428  // we should provoke a periodic status report
429  if (shared_data->status_report.periodic_interval > 0 && shared_data->status_report.user_fn) {
430  _master_thread_update_status_report_periodic_interval_counter();
431  }
432 
433  // if we're the master thread, then also check if there is a status report ready
434  // to be sent.
435  if (shared_data->status_report.ready) {
436  logger.longdebug("Tomographer::MultiProc::CxxThreads::thread_private_data::statusReportRequested()",
437  "Status report is ready.");
438 
439  // guard this block for status_report access
440  std::lock(shared_data->status_report.mutex, shared_data->user_mutex);
441  std::lock_guard<std::mutex> lck1(shared_data->status_report.mutex, std::adopt_lock);
442  std::lock_guard<std::mutex> lck2(shared_data->user_mutex, std::adopt_lock);
443 
444  // call user-defined status report handler
445  shared_data->status_report.user_fn(shared_data->status_report.full_report);
446  // all reports recieved: done --> reset our status_report flags
447  shared_data->status_report.numreportsrecieved = 0;
448  shared_data->status_report.underway = false;
449  shared_data->status_report.initialized = false;
450  shared_data->status_report.ready = false;
451  shared_data->status_report.full_report.workers_running.clear();
452  shared_data->status_report.full_report.workers_reports.clear();
453  }
454  } // master thread
455 
456  return (int)local_status_report_counter != (int)shared_data->status_report.counter;
457  }
458 
459  // internal use only:
460  inline void _master_thread_update_status_report_periodic_interval_counter() const
461  {
462  shared_data->status_report.counter = (uint32_t)(
463  (std::chrono::duration_cast<std::chrono::milliseconds>(
464  StdClockType::now().time_since_epoch()
465  ).count() / shared_data->status_report.periodic_interval) & 0x00FFFFFF
466  ) << 6;
467  // the (x << 6) (equivalent to (x * 64)) allows individual increments from
468  // unrelated additional requestStatusReport() to be taken into account (allows 64
469  // such additional requests per periodic status report)
470  }
471 
472  inline void submitStatusReport(const TaskStatusReportType &statreport)
473  {
474  if ((int)local_status_report_counter == (int)shared_data->status_report.counter) {
475  // error: task submitted unsollicited report
476  logger.warning("CxxThreads TaskDispatcher/taskmanageriface", "Task submitted unsollicited status report");
477  return;
478  }
479 
480  std::lock_guard<std::mutex> lockguard(shared_data->status_report.mutex) ;
481 
482  // we've reacted to the given "signal"
483  local_status_report_counter = shared_data->status_report.counter;
484 
485  // access to the local logger is fine as a different mutex is used
486  logger.longdebug("CxxThreads TaskDispatcher/taskmanageriface", [&](std::ostream & stream) {
487  stream << "status report received for thread #" << thread_id << ", treating it ... "
488  << "numreportsrecieved=" << shared_data->status_report.numreportsrecieved
489  << " num_active_working_threads=" << shared_data->schedule.num_active_working_threads ;
490  });
491 
492  //
493  // If we're the first reporting thread, we need to initiate the status reporing
494  // procedure and initialize the general data
495  //
496  if (!shared_data->status_report.initialized) {
497 
498  //
499  // Check that we indeed have to submit a status report.
500  //
501  if (shared_data->status_report.underway) {
502  // status report already underway!
503  logger.warning("CxxThreads TaskDispatcher/taskmanageriface", "status report already underway!");
504  return;
505  }
506  if (!shared_data->status_report.user_fn) {
507  // no user handler set
508  logger.warning("CxxThreads TaskDispatcher/taskmanageriface",
509  "no user status report handler set! Call setStatusReportHandler() first.");
510  return;
511  }
512 
513  shared_data->status_report.underway = true;
514  shared_data->status_report.initialized = true;
515  shared_data->status_report.ready = false;
516 
517  // initialize status report object & overall data
518  shared_data->status_report.full_report = FullStatusReportType();
519  shared_data->status_report.full_report.num_completed = shared_data->schedule.num_completed;
520  shared_data->status_report.full_report.num_total_runs = shared_data->schedule.num_total_runs;
521  shared_data->status_report.full_report.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
522  StdClockType::now() - shared_data->time_start
523  ).count() * 1e-3;
524  int num_threads = shared_data->schedule.num_threads;
525 
526  // initialize task-specific reports
527  // fill our lists with default-constructed values & set all running to false.
528  shared_data->status_report.full_report.workers_running.clear();
529  shared_data->status_report.full_report.workers_reports.clear();
530  shared_data->status_report.full_report.workers_running.resize(num_threads, false);
531  shared_data->status_report.full_report.workers_reports.resize(num_threads);
532 
533  shared_data->status_report.numreportsrecieved = 0;
534 
535  logger.debug("CxxThreads TaskDispatcher/taskmanageriface", [&](std::ostream & stream) {
536  stream << "vectors resized to workers_running.size()="
537  << shared_data->status_report.full_report.workers_running.size()
538  << " and workers_reports.size()="
539  << shared_data->status_report.full_report.workers_reports.size()
540  << ".";
541  });
542  } // status_report.initialized
543 
544  //
545  // Report the data corresponding to this thread.
546  //
547  logger.debug("CxxThreads TaskDispatcher/taskmanageriface", "thread_id=%ld, workers_reports.size()=%ld",
548  (long)thread_id, (long)shared_data->status_report.full_report.workers_reports.size());
549 
550  tomographer_assert(0 <= thread_id &&
551  (std::size_t)thread_id < shared_data->status_report.full_report.workers_reports.size());
552 
553  shared_data->status_report.full_report.workers_running[thread_id] = true;
554  shared_data->status_report.full_report.workers_reports[thread_id] = statreport;
555 
556  ++ shared_data->status_report.numreportsrecieved;
557 
558  if (shared_data->status_report.numreportsrecieved == shared_data->schedule.num_active_working_threads) {
559  //
560  // the report is ready to be transmitted to the user: go!
561  //
562  // Don't send it quite yet, queue it for the master thread to send. We add
563  // this guarantee so that the status report handler can do things which only
564  // the master thread can do (e.g. in Python, call PyErr_CheckSignals()).
565  shared_data->status_report.ready = true;
566  }
567 
568  } // submitStatusReport()
569 
570  }; // thread_private_data
571 
572  thread_shared_data shared_data;
573 
574 public:
589  TaskDispatcher(TaskCData * pcdata_, ResultsCollector * results_, LoggerType & logger_,
590  CountIntType num_total_runs_,
591  CountIntType num_threads_ = std::thread::hardware_concurrency())
592  : shared_data(pcdata_, results_, logger_, num_total_runs_, num_threads_)
593  {
594  }
595 
600  void run()
601  {
602  shared_data.logger.debug("MultiProc::CxxThreads::TaskDispatcher::run()", "Let's go!");
603  shared_data.time_start = StdClockType::now();
604 
605  shared_data.results->init(shared_data.schedule.num_total_runs, 1, shared_data.pcdata);
606 
607  shared_data.logger.debug("MultiProc::CxxThreads::TaskDispatcher::run()", "preparing for parallel runs");
608 
609  typedef typename thread_shared_data::Schedule Schedule;
610 
611  auto worker_fn_id = [&](const int thread_id) noexcept(true) {
612 
613  // construct a thread-safe logger we can use
614  TaskLoggerType threadsafelogger(shared_data.logger, & shared_data.user_mutex);
615 
616  thread_private_data privdat(thread_id, & shared_data, threadsafelogger);
617 
618  // not sure an std::ostream would be safe here threadwise...?
619  privdat.logger.longdebug("Tomographer::MultiProc::CxxThreads::TaskDispatcher::run()",
620  "Thread #%d: thread-safe logger and private thread data set up", thread_id);
621 
622  {
623  // active working region. This thread takes care of handling tasks.
624 
625  shared_data.with_lock(shared_data.schedule, [](Schedule & schedule) {
626  ++ schedule.num_active_working_threads;
627  });
628  auto _f0 = Tools::finally([&](){
629  shared_data.with_lock(shared_data.schedule, [](Schedule & schedule) {
630  -- schedule.num_active_working_threads;
631  });
632  });
633 
634  for ( ;; ) {
635  // continue doing stuff until we stop
636 
637  if (shared_data.schedule.interrupt_requested) {
638  break;
639  }
640 
641  // get new task to perform
642  shared_data.with_lock(shared_data.schedule, [&privdat](Schedule & schedule) {
643  if (schedule.num_launched == schedule.num_total_runs) {
644  privdat.task_id = -1; // all tasks already launched -> nothing else to do
645  return;
646  }
647  privdat.task_id = schedule.num_launched;
648  ++ schedule.num_launched ;
649  }) ;
650 
651  if ( privdat.task_id < 0 ) {
652  // all tasks already launched -> nothing else to do
653  break;
654  }
655 
656  // run this task.
657 
658  {
659  std::lock_guard<std::mutex> lk2(shared_data.status_report.mutex);
660  privdat.local_status_report_counter = shared_data.status_report.counter;
661  }
662 
663  try {
664 
665  _run_task(privdat, shared_data) ;
666 
667  } catch (TaskInterruptedInnerException & exc) {
668  privdat.logger.debug("CxxThreads::run()/worker", "Tasks interrupted.") ;
669  std::lock_guard<std::mutex> lk(shared_data.schedule.mutex) ;
670  shared_data.schedule.interrupt_requested = true;
671  break;
672  } catch (...) {
673  privdat.logger.debug("CxxThreads::run()/worker", "Exception caught inside task! "
674  + boost::current_exception_diagnostic_information()) ;
675  std::lock_guard<std::mutex> lk(shared_data.schedule.mutex) ;
676  shared_data.schedule.interrupt_requested = true;
677  shared_data.schedule.inner_exception += std::string("Exception caught inside task: ")
678  + boost::current_exception_diagnostic_information() + "\n";
679  break;
680  }
681 
682  { std::lock_guard<std::mutex> lk(shared_data.schedule.mutex) ;
683  ++ shared_data.schedule.num_completed;
684  }
685 
686  } // for(;;)
687 
688  } // end of active working region, thread on longer serves to run tasks
689  // (--num_active_working_threads is executed at this point)
690 
691  // only master thread should make sure it continues to serve status report requests
692  if (thread_id == 0 && !shared_data.schedule.interrupt_requested) {
693 
694  const int sleep_val = std::max(shared_data.status_report.periodic_interval, 200);
695 
696  while (shared_data.schedule.num_active_working_threads > 0) {
697 
699  try {
700  privdat.statusReportRequested();
701  } catch (...) {
702  privdat.logger.debug("CxxThreads::run()", "[master] Exception caught inside task!") ;
703  std::lock_guard<std::mutex> lk(shared_data.schedule.mutex) ;
704  shared_data.schedule.interrupt_requested = true;
705  shared_data.schedule.inner_exception += std::string("Exception caught inside task: ")
706  + boost::current_exception_diagnostic_information() + "\n";
707  privdat.logger.debug("CxxThreads::run()", "[master] Exception caught inside task -- handled.") ;
708  break;
709  }
710 
711  }
712  }
713 
714  } ; // worker_fn_id
715 
716  //
717  // now, prepare & launch the workers
718  //
719 
720  shared_data.logger.debug("MultiProc::CxxThreads::TaskDispatcher::run()", "About to launch threads");
721 
722  std::vector<std::thread> threads;
723 
724  // thread_id = 0 is reserved for ourselves.
725  for (CountIntType thread_id = 1; thread_id < shared_data.schedule.num_threads; ++thread_id) {
726  threads.push_back( std::thread( [thread_id,worker_fn_id]() { // do NOT capture thread_id by reference!
727  worker_fn_id(thread_id);
728  } ) );
729  }
730 
731  // also run stuff as master thread
732  worker_fn_id(0);
733 
734  std::for_each(threads.begin(), threads.end(), [](std::thread & thread) { thread.join(); }) ;
735 
736  shared_data.logger.debug("MultiProc::CxxThreads::TaskDispatcher::run()", "Threads finished");
737 
738  if (shared_data.schedule.inner_exception.size()) {
739  // interrupt was requested because of an inner exception, not an explicit interrupt request
740  throw std::runtime_error(shared_data.schedule.inner_exception);
741  }
742 
743  // if tasks were interrupted, throw the corresponding exception
744  if (shared_data.schedule.interrupt_requested) {
746  }
747 
748  shared_data.results->runsFinished(shared_data.schedule.num_total_runs, shared_data.pcdata);
749 
750  shared_data.logger.debug("MultiProc::CxxThreads::TaskDispatcher::run()", "Done.");
751  } // run()
752 
753 
754 
755 private:
756  void _run_task(thread_private_data & privdat, thread_shared_data & shared_data)
758  {
759 
760  // do not execute task if an interrupt was requested.
761  if (shared_data.schedule.interrupt_requested) {
762  return;
763  }
764 
765  // not sure an std::ostream would be safe here threadwise...?
766  privdat.logger.longdebug("Tomographer::MultiProc::CxxThreads::TaskDispatcher::_run_task()",
767  "Run #%lu: querying CData for task input", (unsigned long)privdat.task_id);
768 
769  // See OMP implementation: getTaskInput() is not thread protected
770  //
771  // const auto input = [](thread_shared_data & x, task_id) {
772  // std::lock_guard<std::mutex> lck(x.user_mutex);
773  // return x.pcdata->getTaskInput(task_id);
774  // } (shared_data, privdat.task_id);
775  const auto input = shared_data.pcdata->getTaskInput(privdat.task_id);
776 
777  // not sure an std::ostream would be safe here threadwise...?
778  privdat.logger.debug("Tomographer::MultiProc::CxxThreads::TaskDispatcher::_run_task()",
779  "Running task #%lu ...", (unsigned long)privdat.task_id);
780 
781  // construct a new task instance
782  TaskType t(input, shared_data.pcdata, privdat.logger);
783 
784  // not sure an std::ostream would be safe here threadwise...?
785  privdat.logger.longdebug("Tomographer::MultiProc::CxxThreads::TaskDispatcher::_run_task()",
786  "Task #%lu set up.", (unsigned long)privdat.task_id);
787 
788  // and run it
789  t.run(shared_data.pcdata, privdat.logger, &privdat);
790 
791  privdat.logger.longdebug("Tomographer::MultiProc::CxxThreads::TaskDispatcher::_run_task()",
792  "Task #%lu finished, about to collect result.", (unsigned long)privdat.task_id);
793 
794  { std::lock_guard<std::mutex> lck(shared_data.user_mutex);
795  shared_data.results->collectResult(privdat.task_id, t.getResult(), shared_data.pcdata);
796  }
797 
798  // {
799  // std::lock_guard<std::mutex> lck(shared_data.status_report.mutex) ;
800  // if ((int)privdat.local_status_report_counter != (int)shared_data.status_report.counter) {
801  // // status report request missed by task... do as if we had provided a
802  // // report, but don't provide report.
803  // ++ shared_data.status_report.numreportsrecieved;
804  // }
805  // }
806 
807  privdat.logger.longdebug("Tomographer::MultiProc::CxxThreads::TaskDispatcher::_run_task()", "task done") ;
808  }
809 
810 public:
811 
822  inline void setStatusReportHandler(FullStatusReportCallbackType fnstatus)
823  {
824  std::lock_guard<std::mutex> lck(shared_data.status_report.mutex) ;
825  shared_data.status_report.user_fn = fnstatus;
826  }
827 
838  inline void requestStatusReport()
839  {
840  //
841  // This function can be called from a signal handler. We essentially can't do
842  // anything here because the state of the program can be pretty much anything,
843  // including inside a malloc() or thread lock.
844  //
845  // So just increment an atomic int.
846  //
847 
848  shared_data.status_report.counter = (shared_data.status_report.counter + 1) & 0x7f;
849  }
850 
858  inline void requestPeriodicStatusReport(int milliseconds)
859  {
860  std::lock_guard<std::mutex> lck(shared_data.status_report.mutex) ;
861  shared_data.status_report.periodic_interval = milliseconds;
862  }
863 
875  inline void requestInterrupt()
876  {
877  // set the atomic int
878  shared_data.interrupt_requested = 1;
879  }
880 
881 
882 }; // class TaskDispatcher
883 
884 
885 
886 
887 } // namespace CxxThreads
888 } // namespace MultiProc
889 
890 } // namespace Tomographer
891 
892 
893 
894 
895 
896 #endif
FullStatusReport< TaskStatusReportType > FullStatusReportType
The type to use to generate a full status report of all running tasks.
LoggerType_ LoggerType
The logger type specified to the dispatcher (not necessarily thread-safe)
TaskDispatcher(TaskCData *pcdata_, ResultsCollector *results_, LoggerType &logger_, CountIntType num_total_runs_, CountIntType num_threads_=std::thread::hardware_concurrency())
Task dispatcher constructor.
CountIntType_ CountIntType
Integer type used to count the number of tasks to run (or running)
Base namespace for the Tomographer project.
Definition: densellh.h:45
void emitLog(int level, const char *origin, const std::string &msg)
Implementation of Logger::LoggerBase::emitLog() for a base logger which is thread-safe.
TaskType::StatusReportType TaskStatusReportType
The type used by a single task when providing a status report.
STL namespace.
T sleep_for(T... args)
tomo_internal::FinalAction< F > finally(F f)
implementation of a finally clause, somewhat like in Python
Definition: cxxutil.h:83
T duration_cast(T... args)
T end(T... args)
Base logger class.
Definition: loggers.h:428
T hardware_concurrency(T... args)
STL class.
ThreadSanitizerLogger(BaseLogger &logger, std::mutex *mutex)
Constructor.
T push_back(T... args)
#define TOMOGRAPHER_CXX_STACK_FORCE_REALIGN
Definition: cxxutil.h:393
std::function< void(const FullStatusReportType &)> FullStatusReportCallbackType
The relevant type for a callback function (or callable) which is provided with the full status report...
bool filterByOrigin(int level, const char *origin) const
Implementation of Logger::LoggerBase::filterByOrigin() for a base logger which is thread-safe...
void requestPeriodicStatusReport(int milliseconds)
Request a periodic status report.
T join(T... args)
void setStatusReportHandler(FullStatusReportCallbackType fnstatus)
assign a callable to be called whenever a status report is requested
void requestInterrupt()
Request an immediate interruption of the tasks.
void requestStatusReport()
Request a status report.
T lock(T... args)
void warning(const char *origin, const char *fmt,...)
emit a warning message
Definition: loggers.h:1018
Some C++ utilities, with a tad of C++11 tricks.
void debug(const char *origin, const char *fmt,...)
emit an debug message
Definition: loggers.h:1064
T max(T... args)
STL class.
Managing the need for specific overrides to operator new() for some types (especially Eigen types) ...
ThreadSanitizerLogger< LoggerType_ > TaskLoggerType
A thread-safe logger type which is passed on to the child tasks.
STL class.
void longdebug(const char *origin, const char *fmt,...)
emit a very verbose debugging message
Definition: loggers.h:1087
Some common definitions for multiprocessing interfaces.
T begin(T... args)
Traits template struct to be specialized for specific Logger implementations.
Definition: loggers.h:352
T c_str(T... args)
Dispatches tasks to parallel threads using C++11 native threads.
LoggerBase(int level_=INFO)
Construct the base logger object.
Definition: loggers.h:456
ResultsCollector_ ResultsCollector
The type which is responsible to collect the final results of the individual tasks.
int level() const
Get the log level set for this logger.
Definition: loggers.h:781
TaskCData_ TaskCData
The type which stores constant, shared data for all tasks to access.
T for_each(T... args)
STL class.
STL class.
#define tomographer_assert(...)
Assertion test macro.
Definition: cxxdefs.h:83
Utilities for logging messages.
A complete status report, abstract version.
Definition: multiproc.h:85
Wrapper logger to call non-thread-safe loggers from a multithreaded environment.