28 #ifndef MPI_MULTIPROCMPI_H 29 #define MPI_MULTIPROCMPI_H 38 #include <boost/exception_ptr.hpp> 39 #include <boost/exception/diagnostic_information.hpp> 40 #include <boost/serialization/serialization.hpp> 47 #include <boost/mpi/environment.hpp> 48 #include <boost/mpi/communicator.hpp> 49 #include <boost/mpi/collectives.hpp> 51 namespace mpi = boost::mpi;
97 template<
typename TaskType_,
typename TaskCData_,
typename BaseLoggerType_,
98 typename TaskCountIntType_ =
int>
102 typedef TaskType_ TaskType;
103 typedef typename TaskType::StatusReportType TaskStatusReportType;
104 typedef TaskCData_ TaskCData;
105 typedef BaseLoggerType_ BaseLoggerType;
106 typedef TaskCountIntType_ TaskCountIntType;
108 typedef typename TaskType::ResultType TaskResultType;
120 struct FullTaskResult
122 FullTaskResult(TaskCountIntType task_id_ = -1, TaskResultType * t = NULL,
124 : task_id(task_id_), task_result(t), error_msg(errmsg) { }
126 TaskCountIntType task_id;
127 TaskResultType * task_result;
131 friend boost::serialization::access;
132 template<
typename Archive>
133 void serialize(Archive & a,
unsigned int )
141 struct MasterWorkersController {
142 MasterWorkersController(TaskCountIntType num_total_runs_)
143 : num_total_runs(num_total_runs_),
144 num_tasks_completed(0),
145 num_tasks_launched(0),
146 num_workers_running(0),
149 interrupt_requested(0),
150 interrupt_reacted(0),
156 inline void start(
int num_workers)
159 num_tasks_completed = 0;
160 num_tasks_launched = 0;
161 num_workers_running = num_workers;
162 workers_running.resize((
std::size_t)num_workers, 0);
164 interrupt_requested = 0;
165 interrupt_reacted = 0;
167 full_task_results.resize((
std::size_t)num_total_runs, NULL);
168 task_results.resize((
std::size_t)num_total_runs, NULL);
170 tasks_start_time = StdClockType::now();
173 inline TaskCountIntType pop_task()
175 if (num_tasks_launched >= num_total_runs) {
178 TaskCountIntType task_id = num_tasks_launched;
179 ++num_tasks_launched;
183 bool get_interrupt_event_and_react()
185 if (interrupt_requested) {
186 interrupt_reacted = 1;
192 const TaskCountIntType num_total_runs;
193 TaskCountIntType num_tasks_completed;
194 TaskCountIntType num_tasks_launched;
196 int num_workers_running;
199 StdClockType::time_point tasks_start_time;
208 struct MasterStatusReportController {
209 MasterStatusReportController()
211 reacted_event_counter(0),
212 in_preparation(
false),
214 num_reports_waiting(0),
215 last_report_time(StdClockType::now()),
221 bool get_event_and_react() {
222 if (in_preparation) {
225 if (event_counter != reacted_event_counter) {
226 reacted_event_counter = event_counter;
233 in_preparation =
false;
234 full_report = FullStatusReportType();
235 num_reports_waiting = 0;
243 FullStatusReportType full_report;
244 int num_reports_waiting;
246 StdClockType::time_point last_report_time;
248 FullStatusReportCallbackType user_fn;
249 int periodic_interval;
252 struct TaskMgrIface {
254 : dispatcher(dispatcher_)
259 inline bool statusReportRequested()
const 261 return dispatcher->do_bookkeeping();
264 inline void submitStatusReport(
const TaskStatusReportType &statreport)
266 dispatcher->submit_status_report(statreport);
275 _TAG_offset_number = 199,
277 TAG_WORKER_REQUEST_NEW_TASK_ID,
278 TAG_MASTER_DELIVER_NEW_TASK_ID,
280 TAG_MASTER_ORDER_INTERRUPT,
281 TAG_MASTER_ORDER_STATUS_REPORT,
283 TAG_WORKER_SUBMIT_STATUS_REPORT,
284 TAG_WORKER_SUBMIT_IDLE_STATUS_REPORT,
285 TAG_WORKER_SUBMIT_RESULT,
287 TAG_WORKER_HELL_YEAH_IM_OUTTA_HERE
291 mpi::communicator & comm;
292 const bool is_master;
296 MasterWorkersController * ctrl;
297 MasterStatusReportController * ctrl_status_report;
299 TaskMgrIface mgriface;
305 interrupt_tasks(
std::string msg_ =
"Task Interrupted") : msg(msg_) { }
306 virtual ~interrupt_tasks()
throw() { }
307 const char * what()
const throw() {
return msg.
c_str(); }
318 TaskDispatcher(TaskCData * pcdata_, mpi::communicator & comm_, BaseLoggerType & logger_,
int num_task_runs)
321 is_master(comm_.rank() == 0),
322 llogger(
"Tomographer::MultiProc::MPI::TaskDispatcher", logger_),
324 ctrl_status_report(NULL),
328 ctrl =
new MasterWorkersController(num_task_runs) ;
329 ctrl_status_report =
new MasterStatusReportController;
335 for (
auto r : ctrl->full_task_results ) {
337 if (r->task_result != NULL) {
338 delete r->task_result;
361 mpi::broadcast(comm, pcdata, 0);
364 logger.longdebug(
"pcdata is now broadcast; is_master=%c", (is_master?
'y':
'n'));
369 ctrl->start(comm.size());
375 const auto worker_id = comm.rank();
378 stream <<
"Worker #" << worker_id <<
" up and running ...";
381 bool interrupted =
false;
388 }
catch (interrupt_tasks & e) {
391 error_msg = e.what();
399 logger.debug(
"master done here, waiting for other processes to finish");
402 -- ctrl->num_workers_running;
404 while ( ctrl->num_workers_running ) {
406 logger.longdebug(
"num_workers_running = %d", ctrl->num_workers_running);
409 }
catch(interrupt_tasks & e) {
413 TOMOGRAPHER_SLEEP_FOR_MS( 100 ) ;
416 for (
std::size_t k = 0; k < ctrl->full_task_results.size(); ++k) {
417 FullTaskResult * r = ctrl->full_task_results[k];
418 if (r->error_msg.size()) {
419 error_msg +=
"\nIn Worker #" +
std::to_string(k) +
":\n" + r->error_msg;
426 comm.send(0, TAG_WORKER_HELL_YEAH_IM_OUTTA_HERE);
435 logger.debug(
"all done");
439 friend struct TaskMgrIface;
441 inline bool do_bookkeeping()
445 bool status_report_requested =
false;
449 if (ctrl->get_interrupt_event_and_react()) {
451 master_order_interrupt();
452 throw interrupt_tasks();
456 if (!ctrl->interrupt_requested) {
461 if ( ctrl_status_report->get_event_and_react()) {
463 logger.longdebug(
"Status report requested, initiating");
465 status_report_requested = master_initiate_status_report();
467 }
else if ( ctrl_status_report->periodic_interval > 0 &&
468 ( std::chrono::duration_cast<std::chrono::milliseconds>(
469 StdClockType::now() - ctrl_status_report->last_report_time
470 ).count() > ctrl_status_report->periodic_interval ) ) {
474 logger.longdebug(
"Time for a new status report, initiating");
476 status_report_requested = master_initiate_status_report();
482 master_regular_worker_monitoring();
489 auto maybeorderinterruptmsg = comm.iprobe(0, TAG_MASTER_ORDER_INTERRUPT);
490 if (maybeorderinterruptmsg) {
492 auto msg = *maybeorderinterruptmsg;
495 logger.longdebug(
"Receiving an interrupt order from master ... ");
496 comm.recv(msg.source(), msg.tag());
499 throw interrupt_tasks(
"");
502 auto maybeorderstatreportmsg = comm.iprobe(0, TAG_MASTER_ORDER_STATUS_REPORT);
503 if (maybeorderstatreportmsg) {
505 auto msg = *maybeorderstatreportmsg;
508 logger.longdebug(
"Receiving an status report order from master ... ");
509 comm.recv(msg.source(), msg.tag());
512 status_report_requested =
true;
518 return status_report_requested;
521 inline void submit_status_report(
const TaskStatusReportType &statreport)
525 master_handle_incoming_worker_status_report(0, &statreport);
529 TAG_WORKER_SUBMIT_STATUS_REPORT,
535 inline bool master_initiate_status_report()
540 if (ctrl_status_report->in_preparation) {
541 logger.longdebug(
"Skipping this report, we're still working on the previous one");
545 ctrl_status_report->in_preparation =
true;
546 ctrl_status_report->num_reports_waiting = 0;
547 ctrl_status_report->full_report = FullStatusReportType();
548 ctrl_status_report->full_report.num_completed = ctrl->num_tasks_completed;
549 ctrl_status_report->full_report.num_total_runs = ctrl->num_total_runs;
551 StdClockType::now() - ctrl->tasks_start_time
556 ctrl_status_report->full_report.workers_running.clear();
557 ctrl_status_report->full_report.workers_reports.clear();
558 ctrl_status_report->full_report.workers_running.resize(num_workers,
false);
559 ctrl_status_report->full_report.workers_reports.resize(num_workers);
562 for (
int worker_id = 1; worker_id < comm.size(); ++worker_id) {
563 if (ctrl->workers_running[(
std::size_t)worker_id]) {
565 comm.send(worker_id, TAG_MASTER_ORDER_STATUS_REPORT) ;
566 ++ ctrl_status_report->num_reports_waiting;
569 if (ctrl->workers_running[0]) {
571 ++ ctrl_status_report->num_reports_waiting;
577 inline void master_order_interrupt()
587 for (
int k = 1; k < comm.size(); ++k) {
589 comm.send(k, TAG_MASTER_ORDER_INTERRUPT) ;
594 inline void master_regular_worker_monitoring()
600 {
auto maybenewtaskmsg = comm.iprobe(mpi::any_source, TAG_WORKER_REQUEST_NEW_TASK_ID);
601 if (maybenewtaskmsg) {
602 logger.longdebug(
"Treating a new task id request message ... ");
603 auto msg = *maybenewtaskmsg;
605 comm.recv(msg.source(), msg.tag());
608 TaskCountIntType task_id = master_get_new_task_id(msg.source());
609 comm.send(msg.source(), TAG_MASTER_DELIVER_NEW_TASK_ID, task_id);
614 {
auto mayberesultmsg = comm.iprobe(mpi::any_source, TAG_WORKER_SUBMIT_RESULT);
615 if (mayberesultmsg) {
616 logger.longdebug(
"Treating a result message ... ");
617 auto msg = *mayberesultmsg;
620 FullTaskResult * result =
new FullTaskResult;
621 logger.longdebug(
"Receiving a worker's result from #%d ... ", msg.source());
622 comm.recv(msg.source(), msg.tag(), *result);
624 logger.longdebug(
"Got result.");
626 master_store_task_result(msg.source(), result);
631 {
auto maybestatmsg = comm.iprobe(mpi::any_source, TAG_WORKER_SUBMIT_STATUS_REPORT);
633 logger.longdebug(
"Treating a status report message ... ");
634 auto msg = *maybestatmsg;
637 TaskStatusReportType stat;
638 logger.longdebug(
"Receiving a worker's status report from #%d ... ", msg.source());
639 comm.recv(msg.source(), msg.tag(), stat);
641 master_handle_incoming_worker_status_report(msg.source(), &stat);
646 {
auto maybeistatmsg = comm.iprobe(mpi::any_source, TAG_WORKER_SUBMIT_IDLE_STATUS_REPORT);
648 logger.longdebug(
"Treating a status report message ... ");
649 auto msg = *maybeistatmsg;
652 logger.longdebug(
"Receiving a worker's idle status report from #%d ... ", msg.source());
653 comm.recv(msg.source(), msg.tag());
655 master_handle_incoming_worker_status_report(msg.source(), NULL);
660 {
auto mmsg = comm.iprobe(mpi::any_source, TAG_WORKER_HELL_YEAH_IM_OUTTA_HERE);
662 logger.longdebug(
"Treating a worker's farewell message ... ");
666 comm.recv(msg.source(), msg.tag());
667 logger.debug(
"Received worker #%d's farewell message. Bye, you did a great job!", msg.source());
670 -- ctrl->num_workers_running;
677 inline TaskCountIntType master_get_new_task_id(
int worker_id)
683 TaskCountIntType task_id = ctrl->pop_task();
691 stream <<
"Got new task_id = " << task_id <<
" for worker #" << worker_id;
695 stream <<
"num_workers_running now = " << ctrl->num_workers_running
696 <<
", all workers_running = ";
697 for (
std::size_t k = 0; k < ctrl->workers_running.size(); ++k) {
701 stream << ctrl->workers_running[k];
709 inline void master_store_task_result(
int worker_id, FullTaskResult * result)
716 TaskCountIntType task_id = result->task_id;
719 stream <<
"Got result from #" << worker_id <<
", task_id=" << task_id;
725 ctrl->full_task_results[(
std::size_t)task_id] = result;
726 ctrl->task_results[(
std::size_t)task_id] = result->task_result;
728 ++ ctrl->num_tasks_completed;
734 stream <<
"num_workers_running now = " << ctrl->num_workers_running
735 <<
", all workers_running = ";
736 for (
std::size_t k = 0; k < ctrl->workers_running.size(); ++k) {
740 stream << ctrl->workers_running[k];
744 if (result->error_msg.size()) {
747 ctrl->interrupt_requested =
true;
750 logger.debug(
"Saved into results.");
753 inline void master_handle_incoming_worker_status_report(
int worker_id,
const TaskStatusReportType * stat)
758 logger.longdebug(
"incoming report from worker_id=%d", worker_id);
760 --ctrl_status_report->num_reports_waiting;
763 ctrl_status_report->full_report.workers_running[(
std::size_t)worker_id] = 1;
764 ctrl_status_report->full_report.workers_reports[(
std::size_t)worker_id] = *stat;
768 ctrl_status_report->full_report.workers_running[(
std::size_t)worker_id] = 0;
771 if (ctrl_status_report->num_reports_waiting <= 0) {
773 logger.longdebug(
"Status report is ready to be sent");
774 if (ctrl_status_report->user_fn) {
775 logger.longdebug(
"Calling status report user function");
776 ctrl_status_report->user_fn(ctrl_status_report->full_report) ;
779 ctrl_status_report->reset();
780 ctrl_status_report->last_report_time = StdClockType::now();
781 logger.longdebug(
"Status report finished");
786 inline void run_worker()
790 auto worker_id = comm.rank();
792 TaskCountIntType new_task_id = -1;
797 new_task_id = master_get_new_task_id(0);
799 stream <<
"Master worker: got new task id = " << new_task_id ;
802 logger.debug(
"Requesting a new task id from master") ;
805 TAG_WORKER_REQUEST_NEW_TASK_ID);
807 TAG_MASTER_DELIVER_NEW_TASK_ID,
810 stream <<
"Worker #" << worker_id <<
": got new task id = " << new_task_id ;
814 if (new_task_id < 0) {
825 run_task(new_task_id);
828 logger.debug(
"Worker #%d done treating tasks.", worker_id) ;
832 auto mmsg = comm.iprobe(0, TAG_MASTER_ORDER_INTERRUPT);
838 logger.longdebug(
"Receiving (belatedly) an interrupt order from master ... ");
839 comm.recv(msg.source(), msg.tag());
842 throw interrupt_tasks(
"");
847 inline void run_task(TaskCountIntType task_id)
853 TaskResultType * task_result = NULL;
857 auto input = pcdata->getTaskInput(task_id);
860 TaskType t(
std::move(input), pcdata, logger.parentLogger());
863 t.run(pcdata, logger.parentLogger(), &mgriface);
866 task_result =
new TaskResultType(t.stealResult());
868 }
catch (interrupt_tasks & e) {
870 error_msg = e.what();
875 boost::diagnostic_information(boost::current_exception());
883 FullTaskResult * fullresult =
new FullTaskResult(task_id, task_result, error_msg);
886 master_store_task_result(0, fullresult);
890 int worker_id = comm.rank();
893 logger.debug(
"worker #%d done here, sending result to master", worker_id);
895 FullTaskResult wresult(task_id, task_result, error_msg);
898 TAG_WORKER_SUBMIT_RESULT,
903 if (task_result != NULL) {
910 auto maybeorderstatreportmsg = comm.iprobe(0, TAG_MASTER_ORDER_STATUS_REPORT);
911 if (maybeorderstatreportmsg) {
914 auto msg = *maybeorderstatreportmsg;
917 logger.longdebug(
"Receiving an status report order from master ... ");
918 comm.recv(msg.source(), msg.tag());
922 TAG_WORKER_SUBMIT_IDLE_STATUS_REPORT);
926 if (error_msg.
size()) {
927 throw interrupt_tasks(error_msg);
947 return ctrl->num_total_runs;
958 return ctrl->task_results;
971 return *ctrl->task_results[k];
987 template<
typename Fn>
991 ctrl_status_report->user_fn = fnstatus;
1011 ++ ctrl_status_report->event_counter;
1023 template<
typename IntType>
1028 ctrl_status_report->periodic_interval = milliseconds;
1043 ctrl->interrupt_requested = 1;
1050 template<
typename TaskType_,
typename TaskCData_,
1051 typename BaseLoggerType_,
typename TaskCountIntType_ =
int>
1053 mkTaskDispatcher(TaskCData_ * pcdata_, mpi::communicator & comm_, BaseLoggerType_ & baselogger_,
1054 TaskCountIntType_ num_total_runs_)
1057 pcdata_, comm_, baselogger_, num_total_runs_
Utilities for formatting strings.
void requestInterrupt()
Interrupt all tasks as soon as possible.
void requestPeriodicStatusReport(IntType milliseconds)
Request a status report periodically.
bool isMaster() const
Whether we are the master process.
Base namespace for the Tomographer project.
T duration_cast(T... args)
void setStatusReportHandler(Fn fnstatus)
assign a callable to be called whenever a status report is requested
TaskDispatcher(TaskCData *pcdata_, mpi::communicator &comm_, BaseLoggerType &logger_, int num_task_runs)
Construct the task dispatcher around the given MPI communicator.
#define TOMO_ORIGIN
Use this as argument for a Tomographer::Logger::LocalLogger constructor .
LocalLogger< LocalLogger< BaseLoggerType > > subLogger(std::string new_prefix)
Create a sub-logger.
Handles parallel execution of tasks using MPI.
TaskCountIntType numTaskRuns() const
The total number of task instances that were run.
#define TOMO_STATIC_ASSERT_EXPR(...)
Tool for static assertions without message.
const std::vector< TaskResultType * > & collectedTaskResults() const
Returns the results of all the tasks.
Basic multiprocessing templates for thread-based Tomographer::MultiProc implementations.
void requestStatusReport()
Request a status report.
Managing the need for specific overrides to operator new() for some types (especially Eigen types) ...
Some common definitions for multiprocessing interfaces.
const TaskResultType & collectedTaskResult(std::size_t k) const
Returns the result of the given task.
#define tomographer_assert(...)
Assertion test macro.
A complete status report, abstract version.