Loading [MathJax]/jax/output/HTML-CSS/config.js
Tomographerv5.3
Tomographer C++ Framework Documentation
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
multiprocmpi.h
Go to the documentation of this file.
1 /* This file is part of the Tomographer project, which is distributed under the
2  * terms of the MIT license.
3  *
4  * The MIT License (MIT)
5  *
6  * Copyright (c) 2016 ETH Zurich, Institute for Theoretical Physics, Philippe Faist
7  * Copyright (c) 2017 Caltech, Institute for Quantum Information and Matter, Philippe Faist
8  *
9  * Permission is hereby granted, free of charge, to any person obtaining a copy
10  * of this software and associated documentation files (the "Software"), to deal
11  * in the Software without restriction, including without limitation the rights
12  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
13  * copies of the Software, and to permit persons to whom the Software is
14  * furnished to do so, subject to the following conditions:
15  *
16  * The above copyright notice and this permission notice shall be included in
17  * all copies or substantial portions of the Software.
18  *
19  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
24  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
25  * SOFTWARE.
26  */
27 
28 #ifndef MPI_MULTIPROCMPI_H
29 #define MPI_MULTIPROCMPI_H
30 
31 #include <csignal>
32 
33 #include <string>
34 #include <chrono>
35 #include <exception>
36 #include <algorithm>
37 
38 #include <boost/exception_ptr.hpp>
39 #include <boost/exception/diagnostic_information.hpp>
40 #include <boost/serialization/serialization.hpp>
41 
42 #include <tomographer/tools/fmt.h>
44 #include <tomographer/multiproc.h>
45 #include <tomographer/multiprocthreadcommon.h> // TOMOGRAPHER_SLEEP_FOR_MS()
46 
47 #include <boost/mpi/environment.hpp>
48 #include <boost/mpi/communicator.hpp>
49 #include <boost/mpi/collectives.hpp>
50 
51 namespace mpi = boost::mpi;
52 
53 
54 
63 namespace Tomographer {
64 
65 namespace MultiProc {
66 
67 namespace MPI {
68 
97 template<typename TaskType_, typename TaskCData_, typename BaseLoggerType_,
98  typename TaskCountIntType_ = int>
99 class TOMOGRAPHER_EXPORT TaskDispatcher
100 {
101 public:
102  typedef TaskType_ TaskType;
103  typedef typename TaskType::StatusReportType TaskStatusReportType;
104  typedef TaskCData_ TaskCData;
105  typedef BaseLoggerType_ BaseLoggerType;
106  typedef TaskCountIntType_ TaskCountIntType;
107 
108  typedef typename TaskType::ResultType TaskResultType;
109 
111 
113 
115 
116 private:
117 
119 
120  struct FullTaskResult
121  {
122  FullTaskResult(TaskCountIntType task_id_ = -1, TaskResultType * t = NULL,
123  std::string errmsg = std::string())
124  : task_id(task_id_), task_result(t), error_msg(errmsg) { }
125 
126  TaskCountIntType task_id;
127  TaskResultType * task_result; // NULL if task was interrupted
128  std::string error_msg;
129 
130  private:
131  friend boost::serialization::access;
132  template<typename Archive>
133  void serialize(Archive & a, unsigned int /*version*/)
134  {
135  a & task_id;
136  a & task_result;
137  a & error_msg;
138  }
139  };
140 
141  struct MasterWorkersController {
142  MasterWorkersController(TaskCountIntType num_total_runs_)
143  : num_total_runs(num_total_runs_),
144  num_tasks_completed(0),
145  num_tasks_launched(0),
146  num_workers_running(0),
147  workers_running(),
148  tasks_start_time(),
149  interrupt_requested(0),
150  interrupt_reacted(0),
151  full_task_results(),
152  task_results()
153  {
154  }
155 
156  inline void start(int num_workers)
157  {
158  // everything is left to do
159  num_tasks_completed = 0;
160  num_tasks_launched = 0;
161  num_workers_running = num_workers;
162  workers_running.resize((std::size_t)num_workers, 0);
163  // interruption flag
164  interrupt_requested = 0;
165  interrupt_reacted = 0;
166  // resize to # of processes
167  full_task_results.resize((std::size_t)num_total_runs, NULL);
168  task_results.resize((std::size_t)num_total_runs, NULL);
169  // et ... top chrono
170  tasks_start_time = StdClockType::now();
171  }
172 
173  inline TaskCountIntType pop_task()
174  {
175  if (num_tasks_launched >= num_total_runs) {
176  return -1;
177  }
178  TaskCountIntType task_id = num_tasks_launched;
179  ++num_tasks_launched;
180  return task_id; // or, as a C guy would say, return num_tasks_launched++
181  }
182 
183  bool get_interrupt_event_and_react()
184  {
185  if (interrupt_requested) {
186  interrupt_reacted = 1;
187  return true;
188  }
189  return false;
190  }
191 
192  const TaskCountIntType num_total_runs;
193  TaskCountIntType num_tasks_completed;
194  TaskCountIntType num_tasks_launched;
195 
196  int num_workers_running;
197  std::vector<int> workers_running;
198 
199  StdClockType::time_point tasks_start_time;
200 
201  volatile std::sig_atomic_t interrupt_requested; // could be written to by signal handler
202  std::sig_atomic_t interrupt_reacted;
203 
204  std::vector<FullTaskResult*> full_task_results;
205  std::vector<TaskResultType*> task_results; // for convenience, same as full_task_results[k].task_result's
206  };
207 
208  struct MasterStatusReportController {
209  MasterStatusReportController()
210  : event_counter(0),
211  reacted_event_counter(0),
212  in_preparation(false),
213  full_report(),
214  num_reports_waiting(0),
215  last_report_time(StdClockType::now()),
216  user_fn(),
217  periodic_interval(0)
218  {
219  }
220 
221  bool get_event_and_react() {
222  if (in_preparation) {
223  return false;
224  }
225  if (event_counter != reacted_event_counter) {
226  reacted_event_counter = event_counter;
227  return true;
228  }
229  return false;
230  }
231 
232  void reset() {
233  in_preparation = false;
234  full_report = FullStatusReportType();
235  num_reports_waiting = 0;
236  }
237 
238  volatile std::sig_atomic_t event_counter; // could be written to by signal handler
239  std::sig_atomic_t reacted_event_counter;
240 
241  bool in_preparation;
242 
243  FullStatusReportType full_report;
244  int num_reports_waiting;
245 
246  StdClockType::time_point last_report_time;
247 
248  FullStatusReportCallbackType user_fn;
249  int periodic_interval;
250  };
251 
252  struct TaskMgrIface {
253  TaskMgrIface(TaskDispatcher * dispatcher_)
254  : dispatcher(dispatcher_)
255  {
256  tomographer_assert(dispatcher != NULL) ;
257  }
258 
259  inline bool statusReportRequested() const
260  {
261  return dispatcher->do_bookkeeping();
262  }
263 
264  inline void submitStatusReport(const TaskStatusReportType &statreport)
265  {
266  dispatcher->submit_status_report(statreport);
267  }
268 
269  private:
270  TaskDispatcher * dispatcher;
271  friend class TaskDispatcher;
272  };
273 
274  enum {
275  _TAG_offset_number = 199, // our tag #s start from 200
276 
277  TAG_WORKER_REQUEST_NEW_TASK_ID,
278  TAG_MASTER_DELIVER_NEW_TASK_ID,
279 
280  TAG_MASTER_ORDER_INTERRUPT,
281  TAG_MASTER_ORDER_STATUS_REPORT,
282 
283  TAG_WORKER_SUBMIT_STATUS_REPORT,
284  TAG_WORKER_SUBMIT_IDLE_STATUS_REPORT,
285  TAG_WORKER_SUBMIT_RESULT,
286 
287  TAG_WORKER_HELL_YEAH_IM_OUTTA_HERE
288  };
289 
290  TaskCData * pcdata;
291  mpi::communicator & comm;
292  const bool is_master;
293 
295 
296  MasterWorkersController * ctrl;
297  MasterStatusReportController * ctrl_status_report;
298 
299  TaskMgrIface mgriface;
300 
301  class interrupt_tasks : public std::exception
302  {
303  std::string msg;
304  public:
305  interrupt_tasks(std::string msg_ = "Task Interrupted") : msg(msg_) { }
306  virtual ~interrupt_tasks() throw() { }
307  const char * what() const throw() { return msg.c_str(); }
308  };
309 
310 public:
318  TaskDispatcher(TaskCData * pcdata_, mpi::communicator & comm_, BaseLoggerType & logger_, int num_task_runs)
319  : pcdata(pcdata_),
320  comm(comm_),
321  is_master(comm_.rank() == 0),
322  llogger("Tomographer::MultiProc::MPI::TaskDispatcher", logger_),
323  ctrl(NULL),
324  ctrl_status_report(NULL),
325  mgriface(this)
326  {
327  if (is_master) {
328  ctrl = new MasterWorkersController(num_task_runs) ;
329  ctrl_status_report = new MasterStatusReportController;
330  }
331  }
332  ~TaskDispatcher()
333  {
334  if (ctrl != NULL) {
335  for ( auto r : ctrl->full_task_results ) {
336  if (r != NULL) {
337  if (r->task_result != NULL) {
338  delete r->task_result;
339  }
340  delete r;
341  }
342  }
343  }
344  }
345 
349  void run()
350  {
351  auto logger = llogger.subLogger(TOMO_ORIGIN) ;
352 
353  // First, the pcdata should have been initialized by the master only. Our
354  // first job is to broadcast the data to all processes.
355  if (is_master) {
356  tomographer_assert(pcdata != NULL) ;
357  } else {
358  tomographer_assert(pcdata == NULL) ;
359  }
360  // broadcast pcdata...
361  mpi::broadcast(comm, pcdata, 0);
362  // now, pcdata is initialized everywhere.
363 
364  logger.longdebug("pcdata is now broadcast; is_master=%c", (is_master?'y':'n'));
365 
366  if (is_master) {
367  tomographer_assert(ctrl != NULL) ;
368  tomographer_assert(ctrl_status_report != NULL) ;
369  ctrl->start(comm.size());
370  } else {
371  tomographer_assert(ctrl == NULL) ;
372  tomographer_assert(ctrl_status_report == NULL) ;
373  }
374 
375  const auto worker_id = comm.rank();
376 
377  logger.debug([&](std::ostream & stream) {
378  stream << "Worker #" << worker_id << " up and running ...";
379  });
380 
381  bool interrupted = false;
382  std::string error_msg;
383 
384  try {
385 
386  run_worker();
387 
388  } catch (interrupt_tasks & e) {
389  // tasks were interrupted, sorry.
390  interrupted = true;
391  error_msg = e.what();
392  }
393 
394  //
395  // gather the results to the master process
396  //
397  if (is_master) {
398 
399  logger.debug("master done here, waiting for other processes to finish");
400 
401  // we stopped working ourselves
402  -- ctrl->num_workers_running;
403 
404  while ( ctrl->num_workers_running ) {
405  // continue monitoring running processes and gathering results
406  logger.longdebug("num_workers_running = %d", ctrl->num_workers_running);
407  try {
408  do_bookkeeping();
409  } catch(interrupt_tasks & e) {
410  // nothing to do but wait for others to finish...
411  }
412  // and sleep a bit
413  TOMOGRAPHER_SLEEP_FOR_MS( 100 ) ;
414  }
415 
416  for (std::size_t k = 0; k < ctrl->full_task_results.size(); ++k) {
417  FullTaskResult * r = ctrl->full_task_results[k];
418  if (r->error_msg.size()) {
419  error_msg += "\nIn Worker #" + std::to_string(k) + ":\n" + r->error_msg;
420  }
421  }
422 
423  } else {
424 
425  // Notify master that we're outta here
426  comm.send(0, TAG_WORKER_HELL_YEAH_IM_OUTTA_HERE);
427 
428  }
429 
430  if (interrupted) {
431  throw TasksInterruptedException(error_msg);
432  }
433 
434  // all done
435  logger.debug("all done");
436  }
437 
438 private:
439  friend struct TaskMgrIface;
440 
441  inline bool do_bookkeeping() // return TRUE if a status report was requested
442  {
443  auto logger = llogger.subLogger(TOMO_ORIGIN) ;
444 
445  bool status_report_requested = false;
446 
447  if (is_master) {
448 
449  if (ctrl->get_interrupt_event_and_react()) {
450 
451  master_order_interrupt();
452  throw interrupt_tasks();
453 
454  }
455 
456  if (!ctrl->interrupt_requested) {
457 
458  // as master, we could still be asked to do some monitoring after an
459  // interrupt was requested, so be careful about this.
460 
461  if ( ctrl_status_report->get_event_and_react()) {
462 
463  logger.longdebug("Status report requested, initiating");
464 
465  status_report_requested = master_initiate_status_report();
466 
467  } else if ( ctrl_status_report->periodic_interval > 0 &&
468  ( std::chrono::duration_cast<std::chrono::milliseconds>(
469  StdClockType::now() - ctrl_status_report->last_report_time
470  ).count() > ctrl_status_report->periodic_interval ) ) {
471 
472  // enough time has passed since last status report, generate new one
473 
474  logger.longdebug("Time for a new status report, initiating");
475 
476  status_report_requested = master_initiate_status_report();
477 
478  }
479 
480  }
481 
482  master_regular_worker_monitoring();
483 
484  } else {
485 
486  // normal worker -- just check for interrupt or status reports from master
487  // (source rank == 0)
488 
489  auto maybeorderinterruptmsg = comm.iprobe(0, TAG_MASTER_ORDER_INTERRUPT);
490  if (maybeorderinterruptmsg) {
491  // got an order -- react
492  auto msg = *maybeorderinterruptmsg;
493  tomographer_assert(msg.tag() == TAG_MASTER_ORDER_INTERRUPT);
494  tomographer_assert(msg.source() == 0);
495  logger.longdebug("Receiving an interrupt order from master ... ");
496  comm.recv(msg.source(), msg.tag());
497 
498  // no error message, error message already set in other process
499  throw interrupt_tasks("");
500  }
501 
502  auto maybeorderstatreportmsg = comm.iprobe(0, TAG_MASTER_ORDER_STATUS_REPORT);
503  if (maybeorderstatreportmsg) {
504  // got an order -- react
505  auto msg = *maybeorderstatreportmsg;
506  tomographer_assert(msg.tag() == TAG_MASTER_ORDER_STATUS_REPORT);
507  tomographer_assert(msg.source() == 0);
508  logger.longdebug("Receiving an status report order from master ... ");
509  comm.recv(msg.source(), msg.tag());
510 
511  // we need to send in a status report.
512  status_report_requested = true;
513 
514  }
515 
516  }
517 
518  return status_report_requested;
519  }
520 
521  inline void submit_status_report(const TaskStatusReportType &statreport)
522  {
523  if (is_master) {
524  // just handle our own status report
525  master_handle_incoming_worker_status_report(0, &statreport);
526  } else {
527  // with isend() we would have to monitor the MPI request (return value) I guess...
528  comm.send(0, // report to master
529  TAG_WORKER_SUBMIT_STATUS_REPORT,
530  statreport) ;
531  }
532  }
533 
534 
535  inline bool master_initiate_status_report()
536  {
537  auto logger = llogger.subLogger(TOMO_ORIGIN) ;
538  tomographer_assert(is_master) ;
539 
540  if (ctrl_status_report->in_preparation) {
541  logger.longdebug("Skipping this report, we're still working on the previous one");
542  return false;
543  }
544 
545  ctrl_status_report->in_preparation = true;
546  ctrl_status_report->num_reports_waiting = 0;
547  ctrl_status_report->full_report = FullStatusReportType();
548  ctrl_status_report->full_report.num_completed = ctrl->num_tasks_completed;
549  ctrl_status_report->full_report.num_total_runs = ctrl->num_total_runs;
550  ctrl_status_report->full_report.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
551  StdClockType::now() - ctrl->tasks_start_time
552  ).count() * 1e-3;
553  std::size_t num_workers = (std::size_t)comm.size();
554  // initialize task-specific reports
555  // fill our lists with default-constructed values & set all running to false.
556  ctrl_status_report->full_report.workers_running.clear();
557  ctrl_status_report->full_report.workers_reports.clear();
558  ctrl_status_report->full_report.workers_running.resize(num_workers, false);
559  ctrl_status_report->full_report.workers_reports.resize(num_workers);
560 
561  // order all workers to report on their status
562  for (int worker_id = 1; worker_id < comm.size(); ++worker_id) {
563  if (ctrl->workers_running[(std::size_t)worker_id]) {
564  // if this worker is running, send it a status report order
565  comm.send(worker_id, TAG_MASTER_ORDER_STATUS_REPORT) ;
566  ++ ctrl_status_report->num_reports_waiting;
567  }
568  }
569  if (ctrl->workers_running[0]) {
570  // make num_reports_waiting account for master's own status report
571  ++ ctrl_status_report->num_reports_waiting;
572  }
573 
574  return true;
575  }
576 
577  inline void master_order_interrupt()
578  {
579  tomographer_assert(is_master) ;
580 
581  // we should be called in reaction to a requested interrupt
582  tomographer_assert(ctrl->interrupt_requested) ;
583 
584  // We stopped because an interrupt exception was caught in the master
585  // process, or a worker reported to the master process that they had failed.
586  // Order all workers to stop now.
587  for (int k = 1; k < comm.size(); ++k) {
588  if (ctrl->workers_running[(std::size_t)k]) {
589  comm.send(k, TAG_MASTER_ORDER_INTERRUPT) ;
590  }
591  }
592  }
593 
594  inline void master_regular_worker_monitoring()
595  {
596  auto logger = llogger.subLogger(TOMO_ORIGIN) ;
597  tomographer_assert(is_master) ;
598 
599  // see if we have to deliver a new task to someone
600  { auto maybenewtaskmsg = comm.iprobe(mpi::any_source, TAG_WORKER_REQUEST_NEW_TASK_ID);
601  if (maybenewtaskmsg) {
602  logger.longdebug("Treating a new task id request message ... ");
603  auto msg = *maybenewtaskmsg;
604  tomographer_assert(msg.tag() == TAG_WORKER_REQUEST_NEW_TASK_ID);
605  comm.recv(msg.source(), msg.tag());
606 
607  // send the worker a new task id
608  TaskCountIntType task_id = master_get_new_task_id(msg.source());
609  comm.send(msg.source(), TAG_MASTER_DELIVER_NEW_TASK_ID, task_id);
610  }
611  }
612 
613  // see if there is any task results incoming of tasks which have finished
614  { auto mayberesultmsg = comm.iprobe(mpi::any_source, TAG_WORKER_SUBMIT_RESULT);
615  if (mayberesultmsg) { // got something
616  logger.longdebug("Treating a result message ... ");
617  auto msg = *mayberesultmsg;
618  tomographer_assert(msg.tag() == TAG_WORKER_SUBMIT_RESULT);
619 
620  FullTaskResult * result = new FullTaskResult;
621  logger.longdebug("Receiving a worker's result from #%d ... ", msg.source());
622  comm.recv(msg.source(), msg.tag(), *result);
623 
624  logger.longdebug("Got result.");
625 
626  master_store_task_result(msg.source(), result);
627  }
628  }
629 
630  // see if there is any task results incoming
631  { auto maybestatmsg = comm.iprobe(mpi::any_source, TAG_WORKER_SUBMIT_STATUS_REPORT);
632  if (maybestatmsg) { // got something
633  logger.longdebug("Treating a status report message ... ");
634  auto msg = *maybestatmsg;
635  tomographer_assert(msg.tag() == TAG_WORKER_SUBMIT_STATUS_REPORT);
636 
637  TaskStatusReportType stat;
638  logger.longdebug("Receiving a worker's status report from #%d ... ", msg.source());
639  comm.recv(msg.source(), msg.tag(), stat);
640 
641  master_handle_incoming_worker_status_report(msg.source(), &stat);
642  }
643  }
644 
645  // see if there is any task results incoming reporting idle
646  { auto maybeistatmsg = comm.iprobe(mpi::any_source, TAG_WORKER_SUBMIT_IDLE_STATUS_REPORT);
647  if (maybeistatmsg) { // got something
648  logger.longdebug("Treating a status report message ... ");
649  auto msg = *maybeistatmsg;
650  tomographer_assert(msg.tag() == TAG_WORKER_SUBMIT_IDLE_STATUS_REPORT);
651 
652  logger.longdebug("Receiving a worker's idle status report from #%d ... ", msg.source());
653  comm.recv(msg.source(), msg.tag());
654 
655  master_handle_incoming_worker_status_report(msg.source(), NULL);
656  }
657  }
658 
659  // worker finished
660  { auto mmsg = comm.iprobe(mpi::any_source, TAG_WORKER_HELL_YEAH_IM_OUTTA_HERE);
661  if (mmsg) { // got something
662  logger.longdebug("Treating a worker's farewell message ... ");
663  auto msg = *mmsg;
664  tomographer_assert(msg.tag() == TAG_WORKER_HELL_YEAH_IM_OUTTA_HERE);
665 
666  comm.recv(msg.source(), msg.tag());
667  logger.debug("Received worker #%d's farewell message. Bye, you did a great job!", msg.source());
668 
669  // we got one less worker running
670  -- ctrl->num_workers_running;
671  }
672  }
673 
674 
675  }
676 
677  inline TaskCountIntType master_get_new_task_id(int worker_id)
678  {
679  auto logger = llogger.subLogger(TOMO_ORIGIN) ;
680 
681  tomographer_assert(is_master) ;
682 
683  TaskCountIntType task_id = ctrl->pop_task();
684 
685  if (task_id >= 0) {
686  // worker got new task_id, will start running again
687  ctrl->workers_running[(std::size_t)worker_id] = 1;
688  }
689 
690  logger.debug([&](std::ostream & stream) {
691  stream << "Got new task_id = " << task_id << " for worker #" << worker_id;
692  }) ;
693 
694  logger.longdebug([&](std::ostream & stream) {
695  stream << "num_workers_running now = " << ctrl->num_workers_running
696  << ", all workers_running = ";
697  for (std::size_t k = 0; k < ctrl->workers_running.size(); ++k) {
698  if (k > 0) {
699  stream << ", ";
700  }
701  stream << ctrl->workers_running[k];
702  }
703  }) ;
704 
705  return task_id;
706  }
707 
708  // we're stealing the pointer here
709  inline void master_store_task_result(int worker_id, FullTaskResult * result)
710  {
711  auto logger = llogger.subLogger(TOMO_ORIGIN) ;
712  tomographer_assert(is_master) ;
713 
714  tomographer_assert(result != NULL) ;
715 
716  TaskCountIntType task_id = result->task_id;
717 
718  logger.debug([&](std::ostream & stream) {
719  stream << "Got result from #" << worker_id << ", task_id=" << task_id;
720  }) ;
721 
722  tomographer_assert(task_id >= 0 && (std::size_t)task_id < ctrl->full_task_results.size()) ;
723  tomographer_assert(ctrl->full_task_results.size() == ctrl->task_results.size()) ;
724 
725  ctrl->full_task_results[(std::size_t)task_id] = result;
726  ctrl->task_results[(std::size_t)task_id] = result->task_result;
727 
728  ++ ctrl->num_tasks_completed;
729 
730  // this worker is now (momentarily) no longer working
731  ctrl->workers_running[(std::size_t)worker_id] = 0;
732 
733  logger.longdebug([&](std::ostream & stream) {
734  stream << "num_workers_running now = " << ctrl->num_workers_running
735  << ", all workers_running = ";
736  for (std::size_t k = 0; k < ctrl->workers_running.size(); ++k) {
737  if (k > 0) {
738  stream << ", ";
739  }
740  stream << ctrl->workers_running[k];
741  }
742  }) ;
743 
744  if (result->error_msg.size()) {
745  // task ran into an error, we should interrupt everything. This flag will
746  // be picked up by us later.
747  ctrl->interrupt_requested = true;
748  }
749 
750  logger.debug("Saved into results.");
751  }
752 
753  inline void master_handle_incoming_worker_status_report(int worker_id, const TaskStatusReportType * stat)
754  {
755  auto logger = llogger.subLogger(TOMO_ORIGIN) ;
756  tomographer_assert(is_master) ;
757 
758  logger.longdebug("incoming report from worker_id=%d", worker_id);
759 
760  --ctrl_status_report->num_reports_waiting;
761 
762  if (stat != NULL) {
763  ctrl_status_report->full_report.workers_running[(std::size_t)worker_id] = 1;
764  ctrl_status_report->full_report.workers_reports[(std::size_t)worker_id] = *stat;
765  } else {
766  // leave the data in the workers_[running|reports] to zero
767  // value/default-initialized, meaning that the worker is in an IDLE state
768  ctrl_status_report->full_report.workers_running[(std::size_t)worker_id] = 0;
769  }
770 
771  if (ctrl_status_report->num_reports_waiting <= 0) {
772  // report is ready, send it.
773  logger.longdebug("Status report is ready to be sent");
774  if (ctrl_status_report->user_fn) {
775  logger.longdebug("Calling status report user function");
776  ctrl_status_report->user_fn(ctrl_status_report->full_report) ;
777  }
778  // reset data
779  ctrl_status_report->reset();
780  ctrl_status_report->last_report_time = StdClockType::now();
781  logger.longdebug("Status report finished");
782  }
783  }
784 
785 
786  inline void run_worker()
787  {
788  auto logger = llogger.subLogger(TOMO_ORIGIN) ;
789 
790  auto worker_id = comm.rank();
791 
792  TaskCountIntType new_task_id = -1;
793  for (;;) {
794 
795  if (is_master) {
796  // we can get our task id directly
797  new_task_id = master_get_new_task_id(0);
798  logger.debug([&](std::ostream & stream) {
799  stream << "Master worker: got new task id = " << new_task_id ;
800  });
801  } else {
802  logger.debug("Requesting a new task id from master") ;
803  // ask our master for a new task
804  comm.send(0, // destination: master
805  TAG_WORKER_REQUEST_NEW_TASK_ID);
806  comm.recv(0, // from master
807  TAG_MASTER_DELIVER_NEW_TASK_ID,
808  new_task_id);
809  logger.debug([&](std::ostream & stream) {
810  stream << "Worker #" << worker_id << ": got new task id = " << new_task_id ;
811  });
812  }
813 
814  if (new_task_id < 0) {
815  // we're done, we shouldn't run any more tasks
816  break;
817  }
818 
819  // do some bookkeeping here as well -- just in case the task doesn't call
820  // status-report-handling stuff (even though it's supposed to, you can't
821  // ever trust those classes)
822  do_bookkeeping();
823 
824  // run the given task
825  run_task(new_task_id);
826  }
827 
828  logger.debug("Worker #%d done treating tasks.", worker_id) ;
829 
830  if (!is_master) {
831  // see if there are any stray interrupt orders
832  auto mmsg = comm.iprobe(0, TAG_MASTER_ORDER_INTERRUPT);
833  if (mmsg) {
834  // got an order -- ignore
835  auto msg = *mmsg;
836  tomographer_assert(msg.tag() == TAG_MASTER_ORDER_INTERRUPT);
837  tomographer_assert(msg.source() == 0);
838  logger.longdebug("Receiving (belatedly) an interrupt order from master ... ");
839  comm.recv(msg.source(), msg.tag());
840 
841  // no error message, error message already set in other process
842  throw interrupt_tasks("");
843  }
844  }
845  }
846 
847  inline void run_task(TaskCountIntType task_id)
848  {
849  auto logger = llogger.subLogger(TOMO_ORIGIN) ;
850 
851  std::string error_msg;
852 
853  TaskResultType * task_result = NULL;
854 
855  try {
856 
857  auto input = pcdata->getTaskInput(task_id);
858 
859  // construct a new task instance
860  TaskType t(std::move(input), pcdata, logger.parentLogger());
861 
862  // and run it
863  t.run(pcdata, logger.parentLogger(), &mgriface);
864 
865  // and collect result.
866  task_result = new TaskResultType(t.stealResult());
867 
868  } catch (interrupt_tasks & e) {
869 
870  error_msg = e.what();
871 
872  } catch (...) {
873 
874  error_msg = std::string("Exception in task: ") +
875  boost::diagnostic_information(boost::current_exception());
876 
877  }
878 
879  // collect the task result, and gather it to the master process
880 
881  if (is_master) {
882 
883  FullTaskResult * fullresult = new FullTaskResult(task_id, task_result, error_msg);
884 
885  // just store our own result
886  master_store_task_result(0, fullresult);
887 
888  } else {
889 
890  int worker_id = comm.rank();
891 
892  // all good, task done. Send our result to the master process.
893  logger.debug("worker #%d done here, sending result to master", worker_id);
894 
895  FullTaskResult wresult(task_id, task_result, error_msg);
896 
897  comm.send(0, // destination
898  TAG_WORKER_SUBMIT_RESULT,
899  wresult);
900 
901  // we won't be needing this any longer, it's been serialized & transmitted
902  // to master
903  if (task_result != NULL) {
904  delete task_result;
905  }
906 
907  // make sure there is no pending status report order which we could pick up
908  // when starting the next task
909 
910  auto maybeorderstatreportmsg = comm.iprobe(0, TAG_MASTER_ORDER_STATUS_REPORT);
911  if (maybeorderstatreportmsg) {
912  // got an order -- we can't ignore it, because the master is waiting for
913  // our report
914  auto msg = *maybeorderstatreportmsg;
915  tomographer_assert(msg.tag() == TAG_MASTER_ORDER_STATUS_REPORT);
916  tomographer_assert(msg.source() == 0);
917  logger.longdebug("Receiving an status report order from master ... ");
918  comm.recv(msg.source(), msg.tag());
919 
920  // report that we're idling for now
921  comm.send(0, // report to master
922  TAG_WORKER_SUBMIT_IDLE_STATUS_REPORT);
923  }
924  }
925 
926  if (error_msg.size()) {
927  throw interrupt_tasks(error_msg);
928  }
929  }
930 
931 
932 public:
937  inline bool isMaster() const { return is_master; }
938 
943  inline TaskCountIntType numTaskRuns() const
944  {
945  tomographer_assert(is_master);
946  tomographer_assert(ctrl != NULL);
947  return ctrl->num_total_runs;
948  }
949 
956  {
957  tomographer_assert(is_master);
958  return ctrl->task_results;
959  }
960 
966  inline const TaskResultType & collectedTaskResult(std::size_t k) const
967  {
968  tomographer_assert(is_master);
969  tomographer_assert(ctrl != NULL) ;
970  tomographer_assert(k >= 0 && k < ctrl->task_results.size()) ;
971  return *ctrl->task_results[k];
972  }
973 
974 
987  template<typename Fn>
988  inline void setStatusReportHandler(Fn fnstatus)
989  {
990  tomographer_assert(is_master);
991  ctrl_status_report->user_fn = fnstatus;
992  }
993 
1007  inline void requestStatusReport()
1008  {
1009  tomographer_assert(is_master);
1010 
1011  ++ ctrl_status_report->event_counter; // will be picked up by master worker
1012  }
1013 
1023  template<typename IntType>
1024  inline void requestPeriodicStatusReport(IntType milliseconds)
1025  {
1026  tomographer_assert(is_master);
1027 
1028  ctrl_status_report->periodic_interval = milliseconds; // will be picked up by master worker
1029  }
1030 
1039  inline void requestInterrupt()
1040  {
1041  tomographer_assert(is_master);
1042 
1043  ctrl->interrupt_requested = 1; // will be picked up by master worker
1044  }
1045 
1046 }; // class TaskDispatcher
1047 
1048 
1049 
1050 template<typename TaskType_, typename TaskCData_,
1051  typename BaseLoggerType_, typename TaskCountIntType_ = int>
1053 mkTaskDispatcher(TaskCData_ * pcdata_, mpi::communicator & comm_, BaseLoggerType_ & baselogger_,
1054  TaskCountIntType_ num_total_runs_)
1055 {
1057  pcdata_, comm_, baselogger_, num_total_runs_
1058  );
1059 }
1060 
1061 
1062 
1063 } // namespace Sequential
1064 } // namespace MultiProc
1065 } // namespace Tomographer
1066 
1067 
1068 #endif
Utilities for formatting strings.
void requestInterrupt()
Interrupt all tasks as soon as possible.
void requestPeriodicStatusReport(IntType milliseconds)
Request a status report periodically.
bool isMaster() const
Whether we are the master process.
Definition: multiprocmpi.h:937
Base namespace for the Tomographer project.
Definition: densellh.h:45
T to_string(T... args)
T duration_cast(T... args)
void setStatusReportHandler(Fn fnstatus)
assign a callable to be called whenever a status report is requested
Definition: multiprocmpi.h:988
TaskDispatcher(TaskCData *pcdata_, mpi::communicator &comm_, BaseLoggerType &logger_, int num_task_runs)
Construct the task dispatcher around the given MPI communicator.
Definition: multiprocmpi.h:318
#define TOMO_ORIGIN
Use this as argument for a Tomographer::Logger::LocalLogger constructor .
Definition: loggers.h:1608
LocalLogger< LocalLogger< BaseLoggerType > > subLogger(std::string new_prefix)
Create a sub-logger.
Definition: loggers.h:1846
STL class.
Handles parallel execution of tasks using MPI.
Definition: multiprocmpi.h:99
TaskCountIntType numTaskRuns() const
The total number of task instances that were run.
Definition: multiprocmpi.h:943
#define TOMO_STATIC_ASSERT_EXPR(...)
Tool for static assertions without message.
Definition: cxxdefs.h:77
const std::vector< TaskResultType * > & collectedTaskResults() const
Returns the results of all the tasks.
Definition: multiprocmpi.h:955
Basic multiprocessing templates for thread-based Tomographer::MultiProc implementations.
STL class.
T move(T... args)
void requestStatusReport()
Request a status report.
Managing the need for specific overrides to operator new() for some types (especially Eigen types) ...
T size(T... args)
Some common definitions for multiprocessing interfaces.
T c_str(T... args)
const TaskResultType & collectedTaskResult(std::size_t k) const
Returns the result of the given task.
Definition: multiprocmpi.h:966
STL class.
#define tomographer_assert(...)
Assertion test macro.
Definition: cxxdefs.h:84
A complete status report, abstract version.
Definition: multiproc.h:97