Tomographer  v5.4
Tomographer C++ Framework Documentation
multiprocmpi.h
Go to the documentation of this file.
1 /* This file is part of the Tomographer project, which is distributed under the
2  * terms of the MIT license.
3  *
4  * The MIT License (MIT)
5  *
6  * Copyright (c) 2016 ETH Zurich, Institute for Theoretical Physics, Philippe Faist
7  * Copyright (c) 2017 Caltech, Institute for Quantum Information and Matter, Philippe Faist
8  *
9  * Permission is hereby granted, free of charge, to any person obtaining a copy
10  * of this software and associated documentation files (the "Software"), to deal
11  * in the Software without restriction, including without limitation the rights
12  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
13  * copies of the Software, and to permit persons to whom the Software is
14  * furnished to do so, subject to the following conditions:
15  *
16  * The above copyright notice and this permission notice shall be included in
17  * all copies or substantial portions of the Software.
18  *
19  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
24  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
25  * SOFTWARE.
26  */
27 
28 #ifndef MPI_MULTIPROCMPI_H
29 #define MPI_MULTIPROCMPI_H
30 
31 #include <csignal>
32 
33 #include <string>
34 #include <chrono>
35 #include <exception>
36 #include <algorithm>
37 
38 #include <boost/exception_ptr.hpp>
39 #include <boost/exception/diagnostic_information.hpp>
40 #include <boost/serialization/serialization.hpp>
41 #include <boost/serialization/vector.hpp>
42 
43 #include <tomographer/tools/fmt.h>
45 #include <tomographer/multiproc.h>
46 #include <tomographer/multiprocthreadcommon.h> // TOMOGRAPHER_SLEEP_FOR_MS()
47 
48 #include <boost/mpi/environment.hpp>
49 #include <boost/mpi/communicator.hpp>
50 #include <boost/mpi/collectives.hpp>
51 
52 namespace mpi = boost::mpi;
53 
54 
55 
64 namespace Tomographer {
65 
66 namespace MultiProc {
67 
68 namespace MPI {
69 
70 namespace tomo_internal {
71 //
72 // don't define boost::serialization overloads directly for std::chrono classes,
73 // because the user or another library might be doing that already
74 //
75 template<typename ChronoDurationType>
76 struct SerializableDuration : public ChronoDurationType
77 {
78  SerializableDuration(typename ChronoDurationType::rep count) : ChronoDurationType(count) { }
79  SerializableDuration(const ChronoDurationType & other) : ChronoDurationType(other) { }
80 
81 private:
82  friend boost::serialization::access;
83  template<typename Archive>
84  void save(Archive & ar, const unsigned int /*version*/) const
85  {
86  typename ChronoDurationType::rep thecount = ChronoDurationType::count();
87  ar & thecount;
88  }
89  template<typename Archive>
90  void load(Archive & ar, const unsigned int /*version*/)
91  {
92  typename ChronoDurationType::rep thecount;
93  ar & thecount;
94  *this = ChronoDurationType(thecount);
95  }
96  BOOST_SERIALIZATION_SPLIT_MEMBER()
97 };
98 } // tomo_internal
99 
100 
101 
102 
131 template<typename TaskType_, typename TaskCData_, typename BaseLoggerType_,
132  typename TaskCountIntType_ = int>
133 class TOMOGRAPHER_EXPORT TaskDispatcher
134 {
135 public:
136  typedef TaskType_ TaskType;
137  typedef typename TaskType::StatusReportType TaskStatusReportType;
138  typedef TaskCData_ TaskCData;
139  typedef BaseLoggerType_ BaseLoggerType;
140  typedef TaskCountIntType_ TaskCountIntType;
141 
142  typedef typename TaskType::ResultType TaskResultType;
143 
145 
147 
149 
150 private:
151 
153 
154  struct FullTaskResult
155  {
156  FullTaskResult(TaskCountIntType task_id_ = -1, TaskResultType * t = NULL,
157  std::string errmsg = std::string())
158  : task_id(task_id_), task_result(t), error_msg(errmsg) { }
159 
160  TaskCountIntType task_id;
161  TaskResultType * task_result; // NULL if task was interrupted
162  std::string error_msg;
163 
164  private:
165  friend boost::serialization::access;
166  template<typename Archive>
167  void serialize(Archive & a, unsigned int /*version*/)
168  {
169  a & task_id;
170  a & task_result;
171  a & error_msg;
172  }
173  };
174 
175  struct MasterWorkersController {
176  MasterWorkersController(TaskCountIntType num_total_runs_)
177  : num_total_runs(num_total_runs_),
178  num_tasks_completed(0),
179  num_tasks_launched(0),
180  num_workers_running(0),
181  workers_running(),
182  tasks_start_time(),
183  interrupt_requested(0),
184  interrupt_reacted(0),
185  full_task_results(),
186  task_results()
187  {
188  }
189 
190  inline void start(int num_workers)
191  {
192  // everything is left to do
193  num_tasks_completed = 0;
194  num_tasks_launched = 0;
195  num_workers_running = num_workers;
196  workers_running.resize((std::size_t)num_workers, 0);
197  // interruption flag
198  interrupt_requested = 0;
199  interrupt_reacted = 0;
200  // resize to # of processes
201  full_task_results.resize((std::size_t)num_total_runs, NULL);
202  task_results.resize((std::size_t)num_total_runs, NULL);
203  // et ... top chrono
204  tasks_start_time = StdClockType::now();
205  }
206 
207  inline TaskCountIntType pop_task()
208  {
209  if (num_tasks_launched >= num_total_runs) {
210  return -1;
211  }
212  TaskCountIntType task_id = num_tasks_launched;
213  ++num_tasks_launched;
214  return task_id; // or, as a C guy would say, return num_tasks_launched++
215  }
216 
217  bool get_interrupt_event_and_react()
218  {
219  if (interrupt_requested) {
220  interrupt_reacted = 1;
221  return true;
222  }
223  return false;
224  }
225 
226  const TaskCountIntType num_total_runs;
227  TaskCountIntType num_tasks_completed;
228  TaskCountIntType num_tasks_launched;
229 
230  int num_workers_running;
231  std::vector<int> workers_running;
232 
233  StdClockType::time_point tasks_start_time;
234 
235  volatile std::sig_atomic_t interrupt_requested; // could be written to by signal handler
236  std::sig_atomic_t interrupt_reacted;
237 
238  std::vector<FullTaskResult*> full_task_results;
239  std::vector<TaskResultType*> task_results; // for convenience, same as full_task_results[k].task_result's
240  };
241 
242  struct MasterStatusReportController {
243  MasterStatusReportController()
244  : event_counter(0),
245  reacted_event_counter(0),
246  in_preparation(false),
247  full_report(),
248  num_reports_waiting(0),
249  next_report_time(StdClockType::now()),
250  user_fn(),
251  periodic_interval(0)
252  {
253  }
254 
255  bool get_event_and_react() {
256  if (in_preparation) {
257  return false;
258  }
259  if (event_counter != reacted_event_counter) {
260  reacted_event_counter = event_counter;
261  return true;
262  }
263  return false;
264  }
265 
266  void reset() {
267  in_preparation = false;
268  full_report = FullStatusReportType();
269  num_reports_waiting = 0;
270  }
271 
272  volatile std::sig_atomic_t event_counter; // could be written to by signal handler
273  std::sig_atomic_t reacted_event_counter;
274 
275  bool in_preparation;
276 
277  FullStatusReportType full_report;
278  int num_reports_waiting;
279 
280  StdClockType::time_point next_report_time;
281 
282  FullStatusReportCallbackType user_fn;
283  int periodic_interval;
284  };
285 
286  struct TaskMgrIface {
287  TaskMgrIface(TaskDispatcher * dispatcher_)
288  : dispatcher(dispatcher_)
289  {
290  tomographer_assert(dispatcher != NULL) ;
291  }
292 
293  inline bool statusReportRequested() const
294  {
295  return dispatcher->check_mpi_messages_and_get_statusrequested();
296  }
297 
298  inline void submitStatusReport(const TaskStatusReportType &statreport)
299  {
300  dispatcher->submit_status_report(statreport);
301  }
302 
303  private:
304  TaskDispatcher * dispatcher;
305  friend class TaskDispatcher;
306  };
307 
308  enum {
309  _TAG_offset_number = 199, // our tag #s start from 200
310 
311  TAG_WORKER_REQUEST_NEW_TASK_ID,
312  TAG_MASTER_DELIVER_NEW_TASK_ID,
313 
314  TAG_MASTER_ORDER_INTERRUPT,
315  TAG_MASTER_ORDER_STATUS_REPORT,
316 
317  TAG_WORKER_SUBMIT_STATUS_REPORT,
318  TAG_WORKER_SUBMIT_IDLE_STATUS_REPORT,
319  TAG_WORKER_SUBMIT_RESULT,
320 
321  TAG_WORKER_HELL_YEAH_IM_OUTTA_HERE
322  };
323 
324  TaskCData * pcdata;
325  mpi::communicator & comm;
326  const bool is_master;
327 
329 
330  MasterWorkersController * ctrl;
331  MasterStatusReportController * ctrl_status_report;
332 
334  StdClockType::time_point next_mpi_probe_time;
335 
336  TaskMgrIface mgriface;
337 
338  class interrupt_tasks : public std::exception
339  {
340  std::string msg;
341  public:
342  interrupt_tasks(std::string msg_ = "Task Interrupted") : msg(msg_) { }
343  virtual ~interrupt_tasks() throw() { }
344  const char * what() const throw() { return msg.c_str(); }
345  };
346 
347 public:
355  TaskDispatcher(TaskCData * pcdata_, mpi::communicator & comm_, BaseLoggerType & logger_,
356  int num_task_runs)
357  : pcdata(pcdata_),
358  comm(comm_),
359  is_master(comm_.rank() == 0),
360  llogger("Tomographer::MultiProc::MPI::TaskDispatcher", logger_),
361  ctrl(NULL),
362  ctrl_status_report(NULL),
363  min_mpi_probe_dt(std::chrono::duration_cast<StdClockType::duration>(std::chrono::milliseconds(100))),
364  next_mpi_probe_time(StdClockType::now()),
365  mgriface(this)
366  {
367  if (is_master) {
368  ctrl = new MasterWorkersController(num_task_runs) ;
369  ctrl_status_report = new MasterStatusReportController;
370  }
371  }
372  ~TaskDispatcher()
373  {
374  if (ctrl != NULL) {
375  for ( auto r : ctrl->full_task_results ) {
376  if (r != NULL) {
377  if (r->task_result != NULL) {
378  delete r->task_result;
379  }
380  delete r;
381  }
382  }
383  }
384  }
385 
389  void run()
390  {
391  auto logger = llogger.subLogger(TOMO_ORIGIN) ;
392 
393  // First, the pcdata should have been initialized by the master only. Our
394  // first job is to broadcast the data to all processes.
395  if (is_master) {
396  tomographer_assert(pcdata != NULL) ;
397  } else {
398  tomographer_assert(pcdata == NULL) ;
399  }
400  // broadcast pcdata...
401  mpi::broadcast(comm, pcdata, 0);
402  // other settings to synchronize
403  mpi::broadcast(comm, min_mpi_probe_dt, 0);
404  // now, pcdata and other settings are initialized on all workers
405 
406  // initialize some local settings
407  next_mpi_probe_time = StdClockType::now();
408 
409  logger.longdebug("pcdata is now broadcast; is_master=%c; min_mpi_probe_dt=%u",
410  (is_master?'y':'n'),
411  (unsigned int)std::chrono::duration_cast<std::chrono::milliseconds>(min_mpi_probe_dt).count());
412 
413  if (is_master) {
414  tomographer_assert(ctrl != NULL) ;
415  tomographer_assert(ctrl_status_report != NULL) ;
416  ctrl->start(comm.size());
417  } else {
418  tomographer_assert(ctrl == NULL) ;
419  tomographer_assert(ctrl_status_report == NULL) ;
420  }
421 
422  const auto worker_id = comm.rank();
423 
424  logger.debug([&](std::ostream & stream) {
425  stream << "Worker #" << worker_id << " up and running ...";
426  });
427 
428  bool interrupted = false;
429  std::string error_msg;
430 
431  try {
432 
433  run_worker();
434 
435  } catch (interrupt_tasks & e) {
436  // tasks were interrupted, sorry.
437  interrupted = true;
438  error_msg = e.what();
439  }
440 
441  //
442  // gather the results to the master process
443  //
444  if (is_master) {
445 
446  logger.debug("master done here, waiting for other processes to finish");
447 
448  // we stopped working ourselves
449  -- ctrl->num_workers_running;
450 
451  while ( ctrl->num_workers_running ) {
452  // continue monitoring running processes and gathering results
453  logger.longdebug("num_workers_running = %d", ctrl->num_workers_running);
454  try {
455  check_mpi_messages_and_get_statusrequested();
456  } catch(interrupt_tasks & e) {
457  // nothing to do but wait for others to finish...
458  }
459  // and sleep a bit
460  TOMOGRAPHER_SLEEP_FOR_MS( 100 ) ;
461  }
462 
463  for (std::size_t k = 0; k < ctrl->full_task_results.size(); ++k) {
464  FullTaskResult * r = ctrl->full_task_results[k];
465  if (r->error_msg.size()) {
466  error_msg += "\nIn Worker #" + std::to_string(k) + ":\n" + r->error_msg;
467  }
468  }
469 
470  } else {
471 
472  // Notify master that we're outta here
473  comm.send(0, TAG_WORKER_HELL_YEAH_IM_OUTTA_HERE);
474 
475  }
476 
477  if (interrupted) {
478  throw TasksInterruptedException(error_msg);
479  }
480 
481  // all done
482  logger.debug("all done");
483  }
484 
485 private:
486  friend struct TaskMgrIface;
487 
488  // return TRUE if a status report was requested
489  inline bool check_mpi_messages_and_get_statusrequested(bool force = false)
490  {
491  auto logger = llogger.subLogger(TOMO_ORIGIN) ;
492 
493  auto now = StdClockType::now();
494  if (!force && now < next_mpi_probe_time) {
495  return false; // skip this round, will do next
496  }
497  next_mpi_probe_time = now + min_mpi_probe_dt;
498 
499 
500  bool status_report_requested = false;
501 
502  if (is_master) {
503 
504  if (ctrl->get_interrupt_event_and_react()) {
505 
506  master_order_interrupt();
507  throw interrupt_tasks();
508 
509  }
510 
511  if (!ctrl->interrupt_requested) {
512 
513  // as master, we could still be asked to do some monitoring after an
514  // interrupt was requested, so be careful about this.
515 
516  if ( ctrl_status_report->get_event_and_react()) {
517 
518  logger.longdebug("Status report requested, initiating");
519 
520  status_report_requested = master_initiate_status_report();
521 
522  } else if ( ctrl_status_report->periodic_interval > 0 && ( now > ctrl_status_report->next_report_time ) ) {
523 
524  // enough time has passed since last status report, generate new one
525  logger.longdebug("Time for a new status report, initiating");
526 
527  // Mark when next report should be generated (to stay +/- precise with
528  // respect to given periodic_interval)
529  ctrl_status_report->next_report_time = now + std::chrono::duration_cast<StdClockType::duration>(
530  std::chrono::milliseconds(ctrl_status_report->periodic_interval)
531  );
532 
533  status_report_requested = master_initiate_status_report();
534  }
535 
536  }
537 
538  master_regular_worker_monitoring();
539 
540  } else {
541 
542  // normal worker, not master -- just check for interrupt or status reports
543  // from master (source rank == 0)
544 
545  logger.longdebug("Checking for MPI messages from master");
546 
547  auto maybeorderinterruptmsg = comm.iprobe(0, TAG_MASTER_ORDER_INTERRUPT);
548  if (maybeorderinterruptmsg) {
549  // got an order -- react
550  auto msg = *maybeorderinterruptmsg;
551  tomographer_assert(msg.tag() == TAG_MASTER_ORDER_INTERRUPT);
552  tomographer_assert(msg.source() == 0);
553  logger.longdebug("Receiving an interrupt order from master ... ");
554  comm.recv(msg.source(), msg.tag());
555 
556  // no error message, error message already set in other process
557  throw interrupt_tasks("");
558  }
559 
560  auto maybeorderstatreportmsg = comm.iprobe(0, TAG_MASTER_ORDER_STATUS_REPORT);
561  if (maybeorderstatreportmsg) {
562  // got an order -- react
563  auto msg = *maybeorderstatreportmsg;
564  tomographer_assert(msg.tag() == TAG_MASTER_ORDER_STATUS_REPORT);
565  tomographer_assert(msg.source() == 0);
566  logger.longdebug("Receiving an status report order from master ... ");
567  comm.recv(msg.source(), msg.tag());
568 
569  // we need to send in a status report.
570  status_report_requested = true;
571  }
572 
573  }
574 
575  logger.longdebug("status_report_requested = %c", status_report_requested?'Y':'n') ;
576 
577  return status_report_requested;
578  }
579 
580  inline void submit_status_report(const TaskStatusReportType &statreport)
581  {
582  auto logger = llogger.subLogger(TOMO_ORIGIN) ;
583  if (is_master) {
584  // just handle our own status report
585  logger.longdebug("Master here: handling our own status report");
586  master_handle_incoming_worker_status_report(0, &statreport);
587  } else {
588  logger.longdebug("Sending status report to master");
589  // with isend() we would have to monitor the MPI request (return value) I guess...
590  comm.send(0, // report to master
591  TAG_WORKER_SUBMIT_STATUS_REPORT,
592  statreport) ;
593  }
594  }
595 
596  inline void submit_idle_status_report()
597  {
598  auto logger = llogger.subLogger(TOMO_ORIGIN) ;
599  if (is_master) {
600  master_handle_incoming_worker_status_report(0, NULL);
601  } else {
602  logger.longdebug("Sending idle status report to master");
603  comm.send(0, // report to master
604  TAG_WORKER_SUBMIT_IDLE_STATUS_REPORT) ;
605  }
606  }
607 
608 
609  inline bool master_initiate_status_report()
610  {
611  auto logger = llogger.subLogger(TOMO_ORIGIN) ;
612  tomographer_assert(is_master) ;
613 
614  if (ctrl_status_report->in_preparation) {
615  logger.longdebug("Skipping this report, we're still working on the previous one");
616  return false;
617  }
618 
619  ctrl_status_report->in_preparation = true;
620  ctrl_status_report->num_reports_waiting = 0;
621  ctrl_status_report->full_report = FullStatusReportType();
622  ctrl_status_report->full_report.num_completed = ctrl->num_tasks_completed;
623  ctrl_status_report->full_report.num_total_runs = ctrl->num_total_runs;
624  ctrl_status_report->full_report.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
625  StdClockType::now() - ctrl->tasks_start_time
626  ).count() * 1e-3;
627  std::size_t num_workers = (std::size_t)comm.size();
628  // initialize task-specific reports
629  // fill our lists with default-constructed values & set all running to false.
630  ctrl_status_report->full_report.workers_running.clear();
631  ctrl_status_report->full_report.workers_reports.clear();
632  ctrl_status_report->full_report.workers_running.resize(num_workers, false);
633  ctrl_status_report->full_report.workers_reports.resize(num_workers);
634 
635  // order all workers to report on their status
636  for (int worker_id = 1; worker_id < comm.size(); ++worker_id) {
637  if (ctrl->workers_running[(std::size_t)worker_id]) {
638  // if this worker is running, send it a status report order
639  comm.send(worker_id, TAG_MASTER_ORDER_STATUS_REPORT) ;
640  ++ ctrl_status_report->num_reports_waiting;
641  }
642  }
643  if (ctrl->workers_running[0]) {
644  // make num_reports_waiting account for master's own status report
645  ++ ctrl_status_report->num_reports_waiting;
646  }
647 
648  return true;
649  }
650 
651  inline void master_order_interrupt()
652  {
653  tomographer_assert(is_master) ;
654 
655  // we should be called in reaction to a requested interrupt
656  tomographer_assert(ctrl->interrupt_requested) ;
657 
658  // We stopped because an interrupt exception was caught in the master
659  // process, or a worker reported to the master process that they had failed.
660  // Order all workers to stop now.
661  for (int k = 1; k < comm.size(); ++k) {
662  if (ctrl->workers_running[(std::size_t)k]) {
663  comm.send(k, TAG_MASTER_ORDER_INTERRUPT) ;
664  }
665  }
666  }
667 
668  inline void master_regular_worker_monitoring()
669  {
670  auto logger = llogger.subLogger(TOMO_ORIGIN) ;
671  tomographer_assert(is_master) ;
672 
673  // process at most a number of messages per message type equal to comm.size()
674  int n;
675 
676  // see if we have to deliver a new task to someone
677  n = comm.size();
678  do {
679  auto maybenewtaskmsg = comm.iprobe(mpi::any_source, TAG_WORKER_REQUEST_NEW_TASK_ID);
680  if (maybenewtaskmsg) {
681  logger.longdebug("Treating a new task id request message ... ");
682  auto msg = *maybenewtaskmsg;
683  tomographer_assert(msg.tag() == TAG_WORKER_REQUEST_NEW_TASK_ID);
684  comm.recv(msg.source(), msg.tag());
685 
686  // send the worker a new task id
687  TaskCountIntType task_id = master_get_new_task_id(msg.source());
688  comm.send(msg.source(), TAG_MASTER_DELIVER_NEW_TASK_ID, task_id);
689  --n;
690  } else {
691  n = 0;
692  }
693  } while (n > 0);
694 
695  // see if there is any task results incoming of tasks which have finished
696  n = comm.size();
697  do {
698  auto mayberesultmsg = comm.iprobe(mpi::any_source, TAG_WORKER_SUBMIT_RESULT);
699  if (mayberesultmsg) { // got something
700  logger.longdebug("Treating a result message ... ");
701  auto msg = *mayberesultmsg;
702  tomographer_assert(msg.tag() == TAG_WORKER_SUBMIT_RESULT);
703 
704  FullTaskResult * result = new FullTaskResult;
705  logger.longdebug("Receiving a worker's result from #%d ... ", msg.source());
706  comm.recv(msg.source(), msg.tag(), *result);
707 
708  logger.longdebug("Got result.");
709 
710  master_store_task_result(msg.source(), result);
711  --n;
712  } else {
713  n = 0;
714  }
715  } while (n > 0);
716 
717  // see if there is any task results incoming
718  n = comm.size();
719  do {
720  auto maybestatmsg = comm.iprobe(mpi::any_source, TAG_WORKER_SUBMIT_STATUS_REPORT);
721  if (maybestatmsg) { // got something
722  logger.longdebug("Treating a status report message ... ");
723  auto msg = *maybestatmsg;
724  tomographer_assert(msg.tag() == TAG_WORKER_SUBMIT_STATUS_REPORT);
725 
726  TaskStatusReportType stat;
727  logger.longdebug("Receiving a worker's status report from #%d ... ", msg.source());
728  comm.recv(msg.source(), msg.tag(), stat);
729 
730  master_handle_incoming_worker_status_report(msg.source(), &stat);
731  --n;
732  } else {
733  n = 0;
734  }
735  } while (n > 0);
736 
737  // see if there is any task results incoming reporting idle
738  n = comm.size();
739  do {
740  auto maybeistatmsg = comm.iprobe(mpi::any_source, TAG_WORKER_SUBMIT_IDLE_STATUS_REPORT);
741  if (maybeistatmsg) { // got something
742  logger.longdebug("Treating a status report message ... ");
743  auto msg = *maybeistatmsg;
744  tomographer_assert(msg.tag() == TAG_WORKER_SUBMIT_IDLE_STATUS_REPORT);
745 
746  logger.longdebug("Receiving a worker's idle status report from #%d ... ", msg.source());
747  comm.recv(msg.source(), msg.tag());
748 
749  master_handle_incoming_worker_status_report(msg.source(), NULL);
750  --n;
751  } else {
752  n = 0;
753  }
754  } while (n > 0);
755 
756  // worker finished
757  n = comm.size();
758  do {
759  auto mmsg = comm.iprobe(mpi::any_source, TAG_WORKER_HELL_YEAH_IM_OUTTA_HERE);
760  if (mmsg) { // got something
761  logger.longdebug("Treating a worker's farewell message ... ");
762  auto msg = *mmsg;
763  tomographer_assert(msg.tag() == TAG_WORKER_HELL_YEAH_IM_OUTTA_HERE);
764 
765  comm.recv(msg.source(), msg.tag());
766  logger.debug("Received worker #%d's farewell message. Bye, you did a great job!", msg.source());
767 
768  // we got one less worker running
769  -- ctrl->num_workers_running;
770 
771  --n;
772  } else {
773  n = 0;
774  }
775  } while (n > 0);
776 
777  }
778 
779  inline TaskCountIntType master_get_new_task_id(int worker_id)
780  {
781  auto logger = llogger.subLogger(TOMO_ORIGIN) ;
782 
783  tomographer_assert(is_master) ;
784 
785  TaskCountIntType task_id = ctrl->pop_task();
786 
787  if (task_id >= 0) {
788  // worker got new task_id, will start running again
789  ctrl->workers_running[(std::size_t)worker_id] = 1;
790  }
791 
792  logger.debug([&](std::ostream & stream) {
793  stream << "Got new task_id = " << task_id << " for worker #" << worker_id;
794  }) ;
795 
796  logger.longdebug([&](std::ostream & stream) {
797  stream << "num_workers_running now = " << ctrl->num_workers_running
798  << ", all workers_running = ";
799  for (std::size_t k = 0; k < ctrl->workers_running.size(); ++k) {
800  if (k > 0) {
801  stream << ", ";
802  }
803  stream << ctrl->workers_running[k];
804  }
805  }) ;
806 
807  return task_id;
808  }
809 
810  // we're stealing the pointer here
811  inline void master_store_task_result(int worker_id, FullTaskResult * result)
812  {
813  auto logger = llogger.subLogger(TOMO_ORIGIN) ;
814  tomographer_assert(is_master) ;
815 
816  tomographer_assert(result != NULL) ;
817 
818  TaskCountIntType task_id = result->task_id;
819 
820  logger.debug([&](std::ostream & stream) {
821  stream << "Got result from #" << worker_id << ", task_id=" << task_id;
822  }) ;
823 
824  tomographer_assert(task_id >= 0 && (std::size_t)task_id < ctrl->full_task_results.size()) ;
825  tomographer_assert(ctrl->full_task_results.size() == ctrl->task_results.size()) ;
826 
827  ctrl->full_task_results[(std::size_t)task_id] = result;
828  ctrl->task_results[(std::size_t)task_id] = result->task_result;
829 
830  ++ ctrl->num_tasks_completed;
831 
832  // this worker is now (momentarily) no longer working
833  ctrl->workers_running[(std::size_t)worker_id] = 0;
834 
835  logger.longdebug([&](std::ostream & stream) {
836  stream << "num_workers_running now = " << ctrl->num_workers_running
837  << ", all workers_running = ";
838  for (std::size_t k = 0; k < ctrl->workers_running.size(); ++k) {
839  if (k > 0) {
840  stream << ", ";
841  }
842  stream << ctrl->workers_running[k];
843  }
844  }) ;
845 
846  if (result->error_msg.size()) {
847  // task ran into an error, we should interrupt everything. This flag will
848  // be picked up by us later.
849  ctrl->interrupt_requested = true;
850  }
851 
852  logger.debug("Saved into results.");
853  }
854 
855  inline void master_handle_incoming_worker_status_report(int worker_id, const TaskStatusReportType * stat)
856  {
857  auto logger = llogger.subLogger(TOMO_ORIGIN) ;
858  tomographer_assert(is_master) ;
859 
860  logger.longdebug("incoming report from worker_id=%d", worker_id);
861 
862  --ctrl_status_report->num_reports_waiting;
863 
864  if (stat != NULL) {
865  ctrl_status_report->full_report.workers_running[(std::size_t)worker_id] = 1;
866  ctrl_status_report->full_report.workers_reports[(std::size_t)worker_id] = *stat;
867  } else {
868  // leave the data in the workers_[running|reports] to zero
869  // value/default-initialized, meaning that the worker is in an IDLE state
870  ctrl_status_report->full_report.workers_running[(std::size_t)worker_id] = 0;
871  }
872 
873  if (ctrl_status_report->num_reports_waiting <= 0) {
874  // report is ready, send it.
875  logger.longdebug("Status report is ready to be sent");
876  if (ctrl_status_report->user_fn) {
877  logger.longdebug("Calling status report user function");
878  ctrl_status_report->user_fn(ctrl_status_report->full_report) ;
879  }
880  // reset data
881  ctrl_status_report->reset();
882  logger.longdebug("Status report finished");
883  }
884  }
885 
886 
887  inline void run_worker()
888  {
889  auto logger = llogger.subLogger(TOMO_ORIGIN) ;
890 
891  auto worker_id = comm.rank();
892 
893  TaskCountIntType new_task_id = -1;
894  for (;;) {
895 
896  if (is_master) {
897  // we can get our task id directly
898  new_task_id = master_get_new_task_id(0);
899  logger.debug([&](std::ostream & stream) {
900  stream << "Master worker: got new task id = " << new_task_id ;
901  });
902  } else {
903  logger.debug("Requesting a new task id from master") ;
904  // ask our master for a new task
905  comm.send(0, // destination: master
906  TAG_WORKER_REQUEST_NEW_TASK_ID);
907  comm.recv(0, // from master
908  TAG_MASTER_DELIVER_NEW_TASK_ID,
909  new_task_id);
910  logger.debug([&](std::ostream & stream) {
911  stream << "Worker #" << worker_id << ": got new task id = " << new_task_id ;
912  });
913  }
914 
915  if (new_task_id < 0) {
916  // we're done, we shouldn't run any more tasks
917  break;
918  }
919 
920  // do some bookkeeping here as well -- just in case the task doesn't call
921  // status-report-handling stuff (even though it's supposed to, you can't
922  // ever trust those classes)
923  bool report_requested = check_mpi_messages_and_get_statusrequested(true);
924  if (report_requested) {
925  submit_idle_status_report();
926  }
927 
928  // run the given task
929  run_task(new_task_id);
930  }
931 
932  logger.debug("Worker #%d done treating tasks.", worker_id) ;
933 
934  if (!is_master) {
935  // see if there are any stray interrupt orders
936  auto mmsg = comm.iprobe(0, TAG_MASTER_ORDER_INTERRUPT);
937  if (mmsg) {
938  // got an order -- ignore
939  auto msg = *mmsg;
940  tomographer_assert(msg.tag() == TAG_MASTER_ORDER_INTERRUPT);
941  tomographer_assert(msg.source() == 0);
942  logger.longdebug("Receiving (belatedly) an interrupt order from master ... ");
943  comm.recv(msg.source(), msg.tag());
944 
945  // no error message, error message already set in other process
946  throw interrupt_tasks("");
947  }
948  }
949  }
950 
951  inline void run_task(TaskCountIntType task_id)
952  {
953  auto logger = llogger.subLogger(TOMO_ORIGIN) ;
954 
955  std::string error_msg;
956 
957  TaskResultType * task_result = NULL;
958 
959  try {
960 
961  auto input = pcdata->getTaskInput(task_id);
962 
963  // construct a new task instance
964  TaskType t(std::move(input), pcdata, logger.parentLogger());
965 
966  // and run it
967  t.run(pcdata, logger.parentLogger(), &mgriface);
968 
969  // and collect result.
970  task_result = new TaskResultType(t.stealResult());
971 
972  } catch (interrupt_tasks & e) {
973 
974  error_msg = e.what();
975 
976  } catch (...) {
977 
978  error_msg = std::string("Exception in task: ") +
979  boost::diagnostic_information(boost::current_exception());
980 
981  }
982 
983  // collect the task result, and gather it to the master process
984 
985  if (is_master) {
986 
987  FullTaskResult * fullresult = new FullTaskResult(task_id, task_result, error_msg);
988 
989  // just store our own result
990  master_store_task_result(0, fullresult);
991 
992  } else {
993 
994  int worker_id = comm.rank();
995 
996  // all good, task done. Send our result to the master process.
997  logger.debug("worker #%d done here, sending result to master", worker_id);
998 
999  FullTaskResult wresult(task_id, task_result, error_msg);
1000 
1001  comm.send(0, // destination
1002  TAG_WORKER_SUBMIT_RESULT,
1003  wresult);
1004 
1005  // we won't be needing this any longer, it's been serialized & transmitted
1006  // to master
1007  if (task_result != NULL) {
1008  delete task_result;
1009  }
1010 
1011  // make sure there is no pending status report order which we could pick up
1012  // when starting the next task
1013  //
1014  bool report_requested = check_mpi_messages_and_get_statusrequested(true);
1015  if (report_requested) {
1016  submit_idle_status_report();
1017  }
1018 
1019  }
1020 
1021  if (error_msg.size()) {
1022  throw interrupt_tasks(error_msg);
1023  }
1024  }
1025 
1026 
1027 public:
1028 
1037  inline void setMinMPIProbeDt(int milliseconds)
1038  {
1039  tomographer_assert(is_master);
1040  min_mpi_probe_dt = milliseconds;
1041  }
1042 
1047  inline bool isMaster() const { return is_master; }
1048 
1053  inline TaskCountIntType numTaskRuns() const
1054  {
1055  tomographer_assert(is_master);
1056  tomographer_assert(ctrl != NULL);
1057  return ctrl->num_total_runs;
1058  }
1059 
1066  {
1067  tomographer_assert(is_master);
1068  return ctrl->task_results;
1069  }
1070 
1076  inline const TaskResultType & collectedTaskResult(std::size_t k) const
1077  {
1078  tomographer_assert(is_master);
1079  tomographer_assert(ctrl != NULL) ;
1080  tomographer_assert(k >= 0 && k < ctrl->task_results.size()) ;
1081  return *ctrl->task_results[k];
1082  }
1083 
1084 
1097  template<typename Fn>
1098  inline void setStatusReportHandler(Fn fnstatus)
1099  {
1100  tomographer_assert(is_master);
1101  ctrl_status_report->user_fn = fnstatus;
1102  }
1103 
1117  inline void requestStatusReport()
1118  {
1119  tomographer_assert(is_master);
1120 
1121  ++ ctrl_status_report->event_counter; // will be picked up by master worker
1122  }
1123 
1133  template<typename IntType>
1134  inline void requestPeriodicStatusReport(IntType milliseconds)
1135  {
1136  tomographer_assert(is_master);
1137 
1138  ctrl_status_report->periodic_interval = milliseconds; // will be picked up by master worker
1139  }
1140 
1149  inline void requestInterrupt()
1150  {
1151  tomographer_assert(is_master);
1152 
1153  ctrl->interrupt_requested = 1; // will be picked up by master worker
1154  }
1155 
1156 }; // class TaskDispatcher
1157 
1158 
1159 
1160 template<typename TaskType_, typename TaskCData_,
1161  typename BaseLoggerType_, typename TaskCountIntType_ = int>
1163 mkTaskDispatcher(TaskCData_ * pcdata_, mpi::communicator & comm_, BaseLoggerType_ & baselogger_,
1164  TaskCountIntType_ num_total_runs_)
1165 {
1167  pcdata_, comm_, baselogger_, num_total_runs_
1168  );
1169 }
1170 
1171 
1172 
1173 } // namespace Sequential
1174 } // namespace MultiProc
1175 } // namespace Tomographer
1176 
1177 
1178 #endif
Utilities for formatting strings.
void requestInterrupt()
Interrupt all tasks as soon as possible.
void requestPeriodicStatusReport(IntType milliseconds)
Request a status report periodically.
bool isMaster() const
Whether we are the master process.
Base namespace for the Tomographer project.
Definition: densellh.h:45
void setMinMPIProbeDt(int milliseconds)
Tune the frequency at which MPI messages are probed.
T to_string(T... args)
STL namespace.
T duration_cast(T... args)
void setStatusReportHandler(Fn fnstatus)
assign a callable to be called whenever a status report is requested
TaskDispatcher(TaskCData *pcdata_, mpi::communicator &comm_, BaseLoggerType &logger_, int num_task_runs)
Construct the task dispatcher around the given MPI communicator.
Definition: multiprocmpi.h:355
#define TOMO_ORIGIN
Use this as argument for a Tomographer::Logger::LocalLogger constructor .
Definition: loggers.h:1608
LocalLogger< LocalLogger< BaseLoggerType > > subLogger(std::string new_prefix)
Create a sub-logger.
Definition: loggers.h:1846
STL class.
Handles parallel execution of tasks using MPI.
Definition: multiprocmpi.h:133
TaskCountIntType numTaskRuns() const
The total number of task instances that were run.
#define TOMO_STATIC_ASSERT_EXPR(...)
Tool for static assertions without message.
Definition: cxxdefs.h:77
const std::vector< TaskResultType * > & collectedTaskResults() const
Returns the results of all the tasks.
Basic multiprocessing templates for thread-based Tomographer::MultiProc implementations.
STL class.
T move(T... args)
void requestStatusReport()
Request a status report.
Managing the need for specific overrides to operator new() for some types (especially Eigen types) ...
T size(T... args)
Some common definitions for multiprocessing interfaces.
T c_str(T... args)
const TaskResultType & collectedTaskResult(std::size_t k) const
Returns the result of the given task.
STL class.
#define tomographer_assert(...)
Assertion test macro.
Definition: cxxdefs.h:84
A complete status report, abstract version.
Definition: multiproc.h:97