Tomographer  v1.0a
Tomographer C++ Framework Documentation
multiprocomp.h
Go to the documentation of this file.
1 /* This file is part of the Tomographer project, which is distributed under the
2  * terms of the MIT license.
3  *
4  * The MIT License (MIT)
5  *
6  * Copyright (c) 2015 ETH Zurich, Institute for Theoretical Physics, Philippe Faist
7  *
8  * Permission is hereby granted, free of charge, to any person obtaining a copy
9  * of this software and associated documentation files (the "Software"), to deal
10  * in the Software without restriction, including without limitation the rights
11  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
12  * copies of the Software, and to permit persons to whom the Software is
13  * furnished to do so, subject to the following conditions:
14  *
15  * The above copyright notice and this permission notice shall be included in
16  * all copies or substantial portions of the Software.
17  *
18  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
19  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
20  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
21  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
22  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
23  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24  * SOFTWARE.
25  */
26 
27 #ifndef MULTIPROCOMP_H
28 #define MULTIPROCOMP_H
29 
30 #include <csignal>
31 
32 #ifdef TOMOGRAPHER_HAVE_OMP
33 #include <omp.h>
34 #else
35 inline constexpr int omp_get_thread_num() { return 0; }
36 inline constexpr int omp_get_num_threads() { return 1; }
37 #endif
38 
39 
49 namespace Tomographer {
50 
51 
52 
53 namespace MultiProc
54 {
56 namespace OMP
57 {
58 
59  namespace tomo_internal {
60 
75  template<typename BaseLogger, bool baseLoggerIsThreadSafe>
76  struct ThreadSanitizerLoggerHelper
77  {
78  static inline void emit_log(BaseLogger & baselogger, int level, const char * origin, const std::string & msg)
79  {
80 #pragma omp critical
81  {
82  //printf("ThreadSanitizerLoggerHelper::emit_log(%d, %s, %s) -- OMP CRITICAL\n", level, origin, msg.c_str());
83  baselogger.emit_log(level, origin, msg);
84  }
85  }
86  static inline bool filter_by_origin(BaseLogger & baselogger, int level, const char * origin)
87  {
88  bool ok = true;
89 #pragma omp critical
90  {
91  //printf("ThreadSanitizerLoggerHelper::filter_by_origin(%d, %s) -- OMP CRITICAL\n", level, origin);
92  ok = baselogger.filter_by_origin(level, origin);
93  }
94  return ok;
95  }
96  };
97 
98  //
99  // specialize the helper for when logging to a thread-safe base logger. No critical
100  // section needed because the logger is already thread-safe.
101  //
102  template<typename BaseLogger>
103  struct ThreadSanitizerLoggerHelper<BaseLogger, true>
104  {
105  static inline void emit_log(BaseLogger & baselogger, int level, const char * origin, const std::string & msg)
106  {
107  //printf("ThreadSanitizerLoggerHelper::emit_log(%d, %s, %s) -- NORMAL\n", level, origin, msg.c_str());
108  baselogger.emit_log(level, origin, msg);
109  }
110  static inline bool filter_by_origin(BaseLogger & baselogger, int level, const char * origin)
111  {
112  //printf("ThreadSanitizerLoggerHelper::filter_by_origin(%d, %s) -- NORMAL\n", level, origin);
113  return baselogger.filter_by_origin(level, origin);
114  }
115  };
116 
117  } // namespace tomo_internal
118 
119 
166  template<typename BaseLogger>
167  class ThreadSanitizerLogger : public Logger::LoggerBase<ThreadSanitizerLogger<BaseLogger> >
168  {
169  BaseLogger & _baselogger;
170  public:
171 
172  template<typename... MoreArgs>
173  ThreadSanitizerLogger(BaseLogger & logger, MoreArgs...)
174  // NOTE: pass the baselogger's level on here. The ThreadSanitizerLogger's level is
175  // this one, and is fixed and cannot be changed while running.
177  _baselogger(logger)
178  {
179  // when you have to debug the log mechanism.... lol
180  //printf("ThreadSanitizerLogger(): object created\n");
181  //_baselogger.debug("ThreadSanitizerLogger()", "log from constructor.");
182  //emit_log(Logger::DEBUG, "ThreadSanitizerLogger!", "emit_log from constructor");
183  //LoggerBase<ThreadSanitizerLogger<BaseLogger> >::debug("ThreadSanitizerLogger", "debug from constructor");
184  }
185 
186  ~ThreadSanitizerLogger()
187  {
188  }
189 
190  inline void emit_log(int level, const char * origin, const std::string& msg)
191  {
192  //printf("ThreadSanitizerLogger::emit_log(%d, %s, %s)\n", level, origin, msg.c_str());
193  tomo_internal::ThreadSanitizerLoggerHelper<BaseLogger,
195  ::emit_log(
196  _baselogger, level, origin, msg
197  );
198  }
199 
200  template<bool dummy = true>
202  filter_by_origin(int level, const char * origin) const
203  {
204  return tomo_internal::ThreadSanitizerLoggerHelper<BaseLogger,
206  ::filter_by_origin(
207  _baselogger, level, origin
208  );
209  }
210  };
211 
212 } // namespace OMP
213 } // namespace MultiProc
214 
215 namespace Logger {
222  template<typename BaseLogger>
223  struct LoggerTraits<MultiProc::OMP::ThreadSanitizerLogger<BaseLogger> > : public LoggerTraits<BaseLogger>
224  {
225  enum {
226  // explicitly require our logger instance to store its level. The level cannot be
227  // changed.
228  HasOwnGetLevel = 0,
229  IsThreadSafe = 1
230  };
231  };
232 } // namespace Logger
233 
234 
235 namespace MultiProc {
236 namespace OMP {
237 
240  template<typename TaskStatusReportType>
242  {
244 
253 
271  };
272 
273 
274 
327  template<typename Task_, typename ConstantDataType_, typename ResultsCollector_,
328  typename Logger_, typename CountIntType_ = int,
329  typename TaskLogger_ = ThreadSanitizerLogger<Logger_> >
331  {
332  public:
333  typedef Task_ Task;
334  typedef typename Task::StatusReportType TaskStatusReportType;
335  typedef ConstantDataType_ ConstantDataType;
336  typedef ResultsCollector_ ResultsCollector;
337  typedef Logger_ Logger;
338  typedef CountIntType_ CountIntType;
339  typedef TaskLogger_ TaskLogger;
341 
343 
344  private:
345 
347  struct thread_shared_data {
348  thread_shared_data(const ConstantDataType *pcdata_, ResultsCollector * results_, Logger & logger_,
349  CountIntType num_total_runs_, CountIntType n_chunk_)
350  : pcdata(pcdata_),
351  results(results_),
352  logger(logger_),
353  status_report_underway(false),
354  status_report_initialized(false),
355  status_report_counter(0),
356  status_report_numreportsrecieved(0),
357  status_report_full(),
358  status_report_user_fn(),
359  num_total_runs(num_total_runs_), n_chunk(n_chunk_), num_completed(0),
360  num_active_working_threads(0)
361  { }
362 
363  const ConstantDataType * pcdata;
364  ResultsCollector * results;
365  Logger & logger;
366 
367  bool status_report_underway;
368  bool status_report_initialized;
369  volatile std::sig_atomic_t status_report_counter;
370  CountIntType status_report_numreportsrecieved;
371 
372  FullStatusReport<TaskStatusReportType> status_report_full;
373  FullStatusReportCallbackType status_report_user_fn;
374 
375  CountIntType num_total_runs;
376  CountIntType n_chunk;
377  CountIntType num_completed;
378 
379  CountIntType num_active_working_threads;
380  };
382  struct thread_private_data
383  {
384  thread_shared_data * shared_data;
385 
386  TaskLogger * logger;
387 
388  CountIntType kiter;
389  CountIntType local_status_report_counter;
390 
391  inline bool status_report_requested()
392  {
393  //fprintf(stderr, "status_report_requested(), shared_data=%p\n", shared_data);
394  return (int)local_status_report_counter != (int)shared_data->status_report_counter;
395  }
396 
397  inline void submit_status_report(const TaskStatusReportType &statreport)
398  {
399  if ((int)local_status_report_counter == (int)shared_data->status_report_counter) {
400  // error: task submitted unsollicited report
401  logger->warning("OMP TaskDispatcher/taskmanageriface", "Task submitted unsollicited status report");
402  return;
403  }
404 
405 #pragma omp critical
406  {
407  bool ok = true; // whether to proceed or not
408 
409  // we've reacted to the given "signal"
410  local_status_report_counter = shared_data->status_report_counter;
411 
412  // add our status report to being-prepared status report in the shared data
413  int threadnum = omp_get_thread_num();
414 
415  //
416  // If we're the first reporting thread, we need to initiate the status reporing
417  // procedure and initialize the general data
418  //
419  if (!shared_data->status_report_initialized) {
420 
421  //
422  // Check that we indeed have to submit a status report.
423  //
424  if (shared_data->status_report_underway) {
425  // status report already underway!
426  logger->warning("OMP TaskDispatcher/taskmanageriface",
427  "status report already underway!");
428  ok = false;
429  }
430  if (!shared_data->status_report_user_fn) {
431  // no user handler set
432  logger->warning("OMP TaskDispatcher/taskmanageriface",
433  "no user status report handler set!"
434  " call set_status_report_handler() first.");
435  ok = false;
436  }
437 
438  // since we can't return out of a critical section(?), we use an if block.
439  if (ok) {
440 
441  shared_data->status_report_underway = true;
442  shared_data->status_report_initialized = true;
443 
444  // initialize status report object & overall data
445  shared_data->status_report_full = FullStatusReport<TaskStatusReportType>();
446  shared_data->status_report_full.num_completed = shared_data->num_completed;
447  shared_data->status_report_full.num_total_runs = shared_data->num_total_runs;
448  shared_data->status_report_full.num_active_working_threads = shared_data->num_active_working_threads;
449  int num_threads = omp_get_num_threads();
450  shared_data->status_report_full.num_threads = num_threads;
451 
452  // initialize task-specific reports
453  // fill our lists with default-constructed values & set all running to false.
454  shared_data->status_report_full.tasks_running.clear();
455  shared_data->status_report_full.tasks_reports.clear();
456  shared_data->status_report_full.tasks_running.resize(num_threads, false);
457  shared_data->status_report_full.tasks_reports.resize(num_threads);
458  logger->debug("OMP TaskDispatcher/taskmanageriface", "vectors resized to %lu & %lu, resp.",
459  shared_data->status_report_full.tasks_running.size(),
460  shared_data->status_report_full.tasks_reports.size());
461  shared_data->status_report_numreportsrecieved = 0;
462  }
463 
464  } // status_report_initialized
465 
466  // if we're the first reporting thread, then maybe ok was set to false above, so
467  // check again.
468  if (ok) {
469 
470  //
471  // Report the data corresponding to this thread.
472  //
473  logger->debug("OMP TaskDispatcher/taskmanageriface", "threadnum=%ld, tasks_reports.size()=%ld",
474  (long)threadnum, (long)shared_data->status_report_full.tasks_reports.size());
475 
476  assert(0 <= threadnum && (std::size_t)threadnum < shared_data->status_report_full.tasks_reports.size());
477 
478  shared_data->status_report_full.tasks_running[threadnum] = true;
479  shared_data->status_report_full.tasks_reports[threadnum] = statreport;
480 
481  ++ shared_data->status_report_numreportsrecieved;
482 
483  if (shared_data->status_report_numreportsrecieved == shared_data->num_active_working_threads) {
484  // the report is ready to be transmitted to the user: go!
485  shared_data->status_report_user_fn(shared_data->status_report_full);
486  // all reports recieved: done --> reset our status_report_* flags
487  shared_data->status_report_numreportsrecieved = 0;
488  shared_data->status_report_underway = false;
489  shared_data->status_report_initialized = false;
490  shared_data->status_report_full.tasks_running.clear();
491  shared_data->status_report_full.tasks_reports.clear();
492  }
493  } // if ok
494  } // omp critical
495 
496  }
497  };
498 
499  thread_shared_data shared_data;
500 
501  public:
502  TaskDispatcher(ConstantDataType * pcdata_, ResultsCollector * results_, Logger & logger_,
503  CountIntType num_total_runs_, CountIntType n_chunk_)
504  : shared_data(pcdata_, results_, logger_, num_total_runs_, n_chunk_)
505  {
506  }
507 
508  inline void run()
509  {
510  shared_data.results->init(shared_data.num_total_runs, shared_data.n_chunk, shared_data.pcdata);
511 
512  shared_data.logger.debug("run_omp_tasks()", "About to start parallel section.");
513 
514  // declaring these as "const" causes a weird compiler error
515  // "`n_chunk' is predetermined `shared' for `shared'"
516  CountIntType num_total_runs = shared_data.num_total_runs;
517  CountIntType n_chunk = shared_data.n_chunk;
518  (void)n_chunk; // silence "unused variable" warning when compiling without OMP support
519 
520  CountIntType k = 0;
521 
522  thread_shared_data *shdat = &shared_data;
523  thread_private_data privdat;
524 
525 #pragma omp parallel default(none) private(k, privdat) shared(shdat, num_total_runs, n_chunk)
526  {
527  privdat.shared_data = shdat;
528  privdat.kiter = 0;
529 
530 #pragma omp for schedule(dynamic,n_chunk) nowait
531  for (k = 0; k < num_total_runs; ++k) {
532 
533 #pragma omp critical
534  {
535  ++ shdat->num_active_working_threads;
536  privdat.local_status_report_counter = shdat->status_report_counter;
537  }
538 
539  // construct a thread-safe logger we can use
540  TaskLogger threadsafelogger(shdat->logger, shdat->pcdata, k);
541 
542  // set up our thread-private data
543  privdat.kiter = k;
544  privdat.logger = &threadsafelogger;
545 
546  threadsafelogger.debug("run_omp_tasks()", "Running task #%lu ...", (unsigned long)k);
547 
548  // construct a new task instance
549  Task t(Task::get_input(k, shdat->pcdata), shdat->pcdata, threadsafelogger);
550 
551  // and run it
552  t.run(shdat->pcdata, threadsafelogger, &privdat);
553 
554 #pragma omp critical
555  {
556  shdat->results->collect_result(k, t.getResult(), shdat->pcdata);
557 
558  if ((int)privdat.local_status_report_counter != (int)shdat->status_report_counter) {
559  // status report request missed by task... do as if we had provided a
560  // report, but don't provide report.
561  ++ shdat->status_report_numreportsrecieved;
562  }
563 
564  ++ shdat->num_completed;
565  -- shdat->num_active_working_threads;
566  }
567  }
568  }
569 
570  shared_data.results->runs_finished(num_total_runs, shared_data.pcdata);
571  }
572 
573 
601  template<typename Fn>
602  inline void set_status_report_handler(Fn fnstatus)
603  {
604 #pragma omp critical
605  {
606  shared_data.status_report_user_fn = fnstatus;
607  }
608  }
609 
610  inline void request_status_report()
611  {
612  //
613  // This function can be called from a signal handler. We essentially can't do
614  // anything here because the state of the program can be pretty much anything,
615  // including inside a malloc() or gomp lock. So can't call any function which needs
616  // malloc or a #pragma omp critical.
617  //
618  // So just increment an atomic int.
619  //
620 
621  shared_data.status_report_counter = (shared_data.status_report_counter + 1) & 0x7f;
622 
623  }
624 
625  };
626 
631  template<typename Task_, typename ConstantDataType_, typename ResultsCollector_,
632  typename Logger_, typename CountIntType_ = unsigned int>
633  inline TaskDispatcher<Task_, ConstantDataType_, ResultsCollector_,
634  Logger_, CountIntType_>
635  makeTaskDispatcher(ConstantDataType_ * pcdata_, ResultsCollector_ * results_, Logger_ & logger_,
636  CountIntType_ num_total_runs_, CountIntType_ n_chunk_)
637  {
638  // RVO should be rather obvious to the compiler
639  return TaskDispatcher<Task_, ConstantDataType_, ResultsCollector_,
640  Logger_, CountIntType_>(
641  pcdata_, results_, logger_, num_total_runs_, n_chunk_
642  );
643  }
644 
645 } // namespace OMP
646 } // namespace MultiProc
647 
648 } // namespace Tomographer
649 
650 
651 
652 
653 
654 #endif
int num_total_runs
Total number of tasks to perform.
Definition: multiprocomp.h:248
Base namespace for the Tomographer project.
Definition: dmmhrw.h:51
int num_completed
Number of completed tasks.
Definition: multiprocomp.h:246
int level() const
Get the log level set for this logger.
Definition: loggers.h:777
TaskDispatcher< Task_, ConstantDataType_, ResultsCollector_, Logger_, CountIntType_ > makeTaskDispatcher(ConstantDataType_ *pcdata_, ResultsCollector_ *results_, Logger_ &logger_, CountIntType_ num_total_runs_, CountIntType_ n_chunk_)
Create an OMP task dispatcher. Useful if you want C++'s template parameter deduction mechanism...
Definition: multiprocomp.h:635
std::vector< TaskStatusReportType > tasks_reports
List of length num_threads with the raw report submitted from each individual thread.
Definition: multiprocomp.h:270
Base logger class.
Definition: loggers.h:424
STL class.
int num_threads
Number of spawned threads (some may be idle)
Definition: multiprocomp.h:252
std::vector< bool > tasks_running
List of length num_threads, specifying for each spawned thread whether it is active or not...
Definition: multiprocomp.h:261
Traits template struct to be specialized for specific Logger implementations.
Definition: loggers.h:349
A complete status report of currently running threads.
Definition: multiprocomp.h:241
Wrapper logger to call non-thread-safe loggers from a multithreaded environment.
Definition: multiprocomp.h:167
Dispatches tasks to parallel threads using OpenMP.
Definition: multiprocomp.h:330
int num_active_working_threads
Number of currently active threads (which are actively solving a task)
Definition: multiprocomp.h:250
void set_status_report_handler(Fn fnstatus)
assign a callable to be called whenever a status report is requested
Definition: multiprocomp.h:602