28 #ifndef MPI_MULTIPROCMPI_H 29 #define MPI_MULTIPROCMPI_H 38 #include <boost/exception_ptr.hpp> 39 #include <boost/exception/diagnostic_information.hpp> 40 #include <boost/serialization/serialization.hpp> 41 #include <boost/serialization/vector.hpp> 48 #include <boost/mpi/environment.hpp> 49 #include <boost/mpi/communicator.hpp> 50 #include <boost/mpi/collectives.hpp> 52 namespace mpi = boost::mpi;
70 namespace tomo_internal {
75 template<
typename ChronoDurationType>
82 friend boost::serialization::access;
83 template<
typename Archive>
84 void save(Archive & ar,
const unsigned int )
const 86 typename ChronoDurationType::rep thecount = ChronoDurationType::count();
89 template<
typename Archive>
90 void load(Archive & ar,
const unsigned int )
92 typename ChronoDurationType::rep thecount;
94 *
this = ChronoDurationType(thecount);
96 BOOST_SERIALIZATION_SPLIT_MEMBER()
131 template<
typename TaskType_,
typename TaskCData_,
typename BaseLoggerType_,
132 typename TaskCountIntType_ =
int>
136 typedef TaskType_ TaskType;
137 typedef typename TaskType::StatusReportType TaskStatusReportType;
138 typedef TaskCData_ TaskCData;
139 typedef BaseLoggerType_ BaseLoggerType;
140 typedef TaskCountIntType_ TaskCountIntType;
142 typedef typename TaskType::ResultType TaskResultType;
154 struct FullTaskResult
156 FullTaskResult(TaskCountIntType task_id_ = -1, TaskResultType * t = NULL,
158 : task_id(task_id_), task_result(t), error_msg(errmsg) { }
160 TaskCountIntType task_id;
161 TaskResultType * task_result;
165 friend boost::serialization::access;
166 template<
typename Archive>
167 void serialize(Archive & a,
unsigned int )
175 struct MasterWorkersController {
176 MasterWorkersController(TaskCountIntType num_total_runs_)
177 : num_total_runs(num_total_runs_),
178 num_tasks_completed(0),
179 num_tasks_launched(0),
180 num_workers_running(0),
183 interrupt_requested(0),
184 interrupt_reacted(0),
190 inline void start(
int num_workers)
193 num_tasks_completed = 0;
194 num_tasks_launched = 0;
195 num_workers_running = num_workers;
196 workers_running.resize((
std::size_t)num_workers, 0);
198 interrupt_requested = 0;
199 interrupt_reacted = 0;
201 full_task_results.resize((
std::size_t)num_total_runs, NULL);
202 task_results.resize((
std::size_t)num_total_runs, NULL);
204 tasks_start_time = StdClockType::now();
207 inline TaskCountIntType pop_task()
209 if (num_tasks_launched >= num_total_runs) {
212 TaskCountIntType task_id = num_tasks_launched;
213 ++num_tasks_launched;
217 bool get_interrupt_event_and_react()
219 if (interrupt_requested) {
220 interrupt_reacted = 1;
226 const TaskCountIntType num_total_runs;
227 TaskCountIntType num_tasks_completed;
228 TaskCountIntType num_tasks_launched;
230 int num_workers_running;
233 StdClockType::time_point tasks_start_time;
242 struct MasterStatusReportController {
243 MasterStatusReportController()
245 reacted_event_counter(0),
246 in_preparation(
false),
248 num_reports_waiting(0),
249 next_report_time(StdClockType::now()),
255 bool get_event_and_react() {
256 if (in_preparation) {
259 if (event_counter != reacted_event_counter) {
260 reacted_event_counter = event_counter;
267 in_preparation =
false;
268 full_report = FullStatusReportType();
269 num_reports_waiting = 0;
277 FullStatusReportType full_report;
278 int num_reports_waiting;
280 StdClockType::time_point next_report_time;
282 FullStatusReportCallbackType user_fn;
283 int periodic_interval;
286 struct TaskMgrIface {
288 : dispatcher(dispatcher_)
293 inline bool statusReportRequested()
const 295 return dispatcher->check_mpi_messages_and_get_statusrequested();
298 inline void submitStatusReport(
const TaskStatusReportType &statreport)
300 dispatcher->submit_status_report(statreport);
309 _TAG_offset_number = 199,
311 TAG_WORKER_REQUEST_NEW_TASK_ID,
312 TAG_MASTER_DELIVER_NEW_TASK_ID,
314 TAG_MASTER_ORDER_INTERRUPT,
315 TAG_MASTER_ORDER_STATUS_REPORT,
317 TAG_WORKER_SUBMIT_STATUS_REPORT,
318 TAG_WORKER_SUBMIT_IDLE_STATUS_REPORT,
319 TAG_WORKER_SUBMIT_RESULT,
321 TAG_WORKER_HELL_YEAH_IM_OUTTA_HERE
325 mpi::communicator & comm;
326 const bool is_master;
330 MasterWorkersController * ctrl;
331 MasterStatusReportController * ctrl_status_report;
334 StdClockType::time_point next_mpi_probe_time;
336 TaskMgrIface mgriface;
342 interrupt_tasks(
std::string msg_ =
"Task Interrupted") : msg(msg_) { }
343 virtual ~interrupt_tasks()
throw() { }
344 const char * what()
const throw() {
return msg.
c_str(); }
355 TaskDispatcher(TaskCData * pcdata_, mpi::communicator & comm_, BaseLoggerType & logger_,
359 is_master(comm_.rank() == 0),
360 llogger(
"Tomographer::MultiProc::MPI::TaskDispatcher", logger_),
362 ctrl_status_report(NULL),
363 min_mpi_probe_dt(
std::chrono::duration_cast<StdClockType::duration>(
std::chrono::milliseconds(100))),
364 next_mpi_probe_time(StdClockType::now()),
368 ctrl =
new MasterWorkersController(num_task_runs) ;
369 ctrl_status_report =
new MasterStatusReportController;
375 for (
auto r : ctrl->full_task_results ) {
377 if (r->task_result != NULL) {
378 delete r->task_result;
401 mpi::broadcast(comm, pcdata, 0);
403 mpi::broadcast(comm, min_mpi_probe_dt, 0);
407 next_mpi_probe_time = StdClockType::now();
409 logger.longdebug(
"pcdata is now broadcast; is_master=%c; min_mpi_probe_dt=%u",
411 (
unsigned int)std::chrono::duration_cast<std::chrono::milliseconds>(min_mpi_probe_dt).count());
416 ctrl->start(comm.size());
422 const auto worker_id = comm.rank();
425 stream <<
"Worker #" << worker_id <<
" up and running ...";
428 bool interrupted =
false;
435 }
catch (interrupt_tasks & e) {
438 error_msg = e.what();
446 logger.debug(
"master done here, waiting for other processes to finish");
449 -- ctrl->num_workers_running;
451 while ( ctrl->num_workers_running ) {
453 logger.longdebug(
"num_workers_running = %d", ctrl->num_workers_running);
455 check_mpi_messages_and_get_statusrequested();
456 }
catch(interrupt_tasks & e) {
460 TOMOGRAPHER_SLEEP_FOR_MS( 100 ) ;
463 for (
std::size_t k = 0; k < ctrl->full_task_results.size(); ++k) {
464 FullTaskResult * r = ctrl->full_task_results[k];
465 if (r->error_msg.size()) {
466 error_msg +=
"\nIn Worker #" +
std::to_string(k) +
":\n" + r->error_msg;
473 comm.send(0, TAG_WORKER_HELL_YEAH_IM_OUTTA_HERE);
482 logger.debug(
"all done");
486 friend struct TaskMgrIface;
489 inline bool check_mpi_messages_and_get_statusrequested(
bool force =
false)
493 auto now = StdClockType::now();
494 if (!force && now < next_mpi_probe_time) {
497 next_mpi_probe_time = now + min_mpi_probe_dt;
500 bool status_report_requested =
false;
504 if (ctrl->get_interrupt_event_and_react()) {
506 master_order_interrupt();
507 throw interrupt_tasks();
511 if (!ctrl->interrupt_requested) {
516 if ( ctrl_status_report->get_event_and_react()) {
518 logger.longdebug(
"Status report requested, initiating");
520 status_report_requested = master_initiate_status_report();
522 }
else if ( ctrl_status_report->periodic_interval > 0 && ( now > ctrl_status_report->next_report_time ) ) {
525 logger.longdebug(
"Time for a new status report, initiating");
533 status_report_requested = master_initiate_status_report();
538 master_regular_worker_monitoring();
545 logger.longdebug(
"Checking for MPI messages from master");
547 auto maybeorderinterruptmsg = comm.iprobe(0, TAG_MASTER_ORDER_INTERRUPT);
548 if (maybeorderinterruptmsg) {
550 auto msg = *maybeorderinterruptmsg;
553 logger.longdebug(
"Receiving an interrupt order from master ... ");
554 comm.recv(msg.source(), msg.tag());
557 throw interrupt_tasks(
"");
560 auto maybeorderstatreportmsg = comm.iprobe(0, TAG_MASTER_ORDER_STATUS_REPORT);
561 if (maybeorderstatreportmsg) {
563 auto msg = *maybeorderstatreportmsg;
566 logger.longdebug(
"Receiving an status report order from master ... ");
567 comm.recv(msg.source(), msg.tag());
570 status_report_requested =
true;
575 logger.longdebug(
"status_report_requested = %c", status_report_requested?
'Y':
'n') ;
577 return status_report_requested;
580 inline void submit_status_report(
const TaskStatusReportType &statreport)
585 logger.longdebug(
"Master here: handling our own status report");
586 master_handle_incoming_worker_status_report(0, &statreport);
588 logger.longdebug(
"Sending status report to master");
591 TAG_WORKER_SUBMIT_STATUS_REPORT,
596 inline void submit_idle_status_report()
600 master_handle_incoming_worker_status_report(0, NULL);
602 logger.longdebug(
"Sending idle status report to master");
604 TAG_WORKER_SUBMIT_IDLE_STATUS_REPORT) ;
609 inline bool master_initiate_status_report()
614 if (ctrl_status_report->in_preparation) {
615 logger.longdebug(
"Skipping this report, we're still working on the previous one");
619 ctrl_status_report->in_preparation =
true;
620 ctrl_status_report->num_reports_waiting = 0;
621 ctrl_status_report->full_report = FullStatusReportType();
622 ctrl_status_report->full_report.num_completed = ctrl->num_tasks_completed;
623 ctrl_status_report->full_report.num_total_runs = ctrl->num_total_runs;
625 StdClockType::now() - ctrl->tasks_start_time
630 ctrl_status_report->full_report.workers_running.clear();
631 ctrl_status_report->full_report.workers_reports.clear();
632 ctrl_status_report->full_report.workers_running.resize(num_workers,
false);
633 ctrl_status_report->full_report.workers_reports.resize(num_workers);
636 for (
int worker_id = 1; worker_id < comm.size(); ++worker_id) {
637 if (ctrl->workers_running[(
std::size_t)worker_id]) {
639 comm.send(worker_id, TAG_MASTER_ORDER_STATUS_REPORT) ;
640 ++ ctrl_status_report->num_reports_waiting;
643 if (ctrl->workers_running[0]) {
645 ++ ctrl_status_report->num_reports_waiting;
651 inline void master_order_interrupt()
661 for (
int k = 1; k < comm.size(); ++k) {
663 comm.send(k, TAG_MASTER_ORDER_INTERRUPT) ;
668 inline void master_regular_worker_monitoring()
679 auto maybenewtaskmsg = comm.iprobe(mpi::any_source, TAG_WORKER_REQUEST_NEW_TASK_ID);
680 if (maybenewtaskmsg) {
681 logger.longdebug(
"Treating a new task id request message ... ");
682 auto msg = *maybenewtaskmsg;
684 comm.recv(msg.source(), msg.tag());
687 TaskCountIntType task_id = master_get_new_task_id(msg.source());
688 comm.send(msg.source(), TAG_MASTER_DELIVER_NEW_TASK_ID, task_id);
698 auto mayberesultmsg = comm.iprobe(mpi::any_source, TAG_WORKER_SUBMIT_RESULT);
699 if (mayberesultmsg) {
700 logger.longdebug(
"Treating a result message ... ");
701 auto msg = *mayberesultmsg;
704 FullTaskResult * result =
new FullTaskResult;
705 logger.longdebug(
"Receiving a worker's result from #%d ... ", msg.source());
706 comm.recv(msg.source(), msg.tag(), *result);
708 logger.longdebug(
"Got result.");
710 master_store_task_result(msg.source(), result);
720 auto maybestatmsg = comm.iprobe(mpi::any_source, TAG_WORKER_SUBMIT_STATUS_REPORT);
722 logger.longdebug(
"Treating a status report message ... ");
723 auto msg = *maybestatmsg;
726 TaskStatusReportType stat;
727 logger.longdebug(
"Receiving a worker's status report from #%d ... ", msg.source());
728 comm.recv(msg.source(), msg.tag(), stat);
730 master_handle_incoming_worker_status_report(msg.source(), &stat);
740 auto maybeistatmsg = comm.iprobe(mpi::any_source, TAG_WORKER_SUBMIT_IDLE_STATUS_REPORT);
742 logger.longdebug(
"Treating a status report message ... ");
743 auto msg = *maybeistatmsg;
746 logger.longdebug(
"Receiving a worker's idle status report from #%d ... ", msg.source());
747 comm.recv(msg.source(), msg.tag());
749 master_handle_incoming_worker_status_report(msg.source(), NULL);
759 auto mmsg = comm.iprobe(mpi::any_source, TAG_WORKER_HELL_YEAH_IM_OUTTA_HERE);
761 logger.longdebug(
"Treating a worker's farewell message ... ");
765 comm.recv(msg.source(), msg.tag());
766 logger.debug(
"Received worker #%d's farewell message. Bye, you did a great job!", msg.source());
769 -- ctrl->num_workers_running;
779 inline TaskCountIntType master_get_new_task_id(
int worker_id)
785 TaskCountIntType task_id = ctrl->pop_task();
793 stream <<
"Got new task_id = " << task_id <<
" for worker #" << worker_id;
797 stream <<
"num_workers_running now = " << ctrl->num_workers_running
798 <<
", all workers_running = ";
799 for (
std::size_t k = 0; k < ctrl->workers_running.size(); ++k) {
803 stream << ctrl->workers_running[k];
811 inline void master_store_task_result(
int worker_id, FullTaskResult * result)
818 TaskCountIntType task_id = result->task_id;
821 stream <<
"Got result from #" << worker_id <<
", task_id=" << task_id;
827 ctrl->full_task_results[(
std::size_t)task_id] = result;
828 ctrl->task_results[(
std::size_t)task_id] = result->task_result;
830 ++ ctrl->num_tasks_completed;
836 stream <<
"num_workers_running now = " << ctrl->num_workers_running
837 <<
", all workers_running = ";
838 for (
std::size_t k = 0; k < ctrl->workers_running.size(); ++k) {
842 stream << ctrl->workers_running[k];
846 if (result->error_msg.size()) {
849 ctrl->interrupt_requested =
true;
852 logger.debug(
"Saved into results.");
855 inline void master_handle_incoming_worker_status_report(
int worker_id,
const TaskStatusReportType * stat)
860 logger.longdebug(
"incoming report from worker_id=%d", worker_id);
862 --ctrl_status_report->num_reports_waiting;
865 ctrl_status_report->full_report.workers_running[(
std::size_t)worker_id] = 1;
866 ctrl_status_report->full_report.workers_reports[(
std::size_t)worker_id] = *stat;
870 ctrl_status_report->full_report.workers_running[(
std::size_t)worker_id] = 0;
873 if (ctrl_status_report->num_reports_waiting <= 0) {
875 logger.longdebug(
"Status report is ready to be sent");
876 if (ctrl_status_report->user_fn) {
877 logger.longdebug(
"Calling status report user function");
878 ctrl_status_report->user_fn(ctrl_status_report->full_report) ;
881 ctrl_status_report->reset();
882 logger.longdebug(
"Status report finished");
887 inline void run_worker()
891 auto worker_id = comm.rank();
893 TaskCountIntType new_task_id = -1;
898 new_task_id = master_get_new_task_id(0);
900 stream <<
"Master worker: got new task id = " << new_task_id ;
903 logger.debug(
"Requesting a new task id from master") ;
906 TAG_WORKER_REQUEST_NEW_TASK_ID);
908 TAG_MASTER_DELIVER_NEW_TASK_ID,
911 stream <<
"Worker #" << worker_id <<
": got new task id = " << new_task_id ;
915 if (new_task_id < 0) {
923 bool report_requested = check_mpi_messages_and_get_statusrequested(
true);
924 if (report_requested) {
925 submit_idle_status_report();
929 run_task(new_task_id);
932 logger.debug(
"Worker #%d done treating tasks.", worker_id) ;
936 auto mmsg = comm.iprobe(0, TAG_MASTER_ORDER_INTERRUPT);
942 logger.longdebug(
"Receiving (belatedly) an interrupt order from master ... ");
943 comm.recv(msg.source(), msg.tag());
946 throw interrupt_tasks(
"");
951 inline void run_task(TaskCountIntType task_id)
957 TaskResultType * task_result = NULL;
961 auto input = pcdata->getTaskInput(task_id);
964 TaskType t(
std::move(input), pcdata, logger.parentLogger());
967 t.run(pcdata, logger.parentLogger(), &mgriface);
970 task_result =
new TaskResultType(t.stealResult());
972 }
catch (interrupt_tasks & e) {
974 error_msg = e.what();
979 boost::diagnostic_information(boost::current_exception());
987 FullTaskResult * fullresult =
new FullTaskResult(task_id, task_result, error_msg);
990 master_store_task_result(0, fullresult);
994 int worker_id = comm.rank();
997 logger.debug(
"worker #%d done here, sending result to master", worker_id);
999 FullTaskResult wresult(task_id, task_result, error_msg);
1002 TAG_WORKER_SUBMIT_RESULT,
1007 if (task_result != NULL) {
1014 bool report_requested = check_mpi_messages_and_get_statusrequested(
true);
1015 if (report_requested) {
1016 submit_idle_status_report();
1021 if (error_msg.
size()) {
1022 throw interrupt_tasks(error_msg);
1040 min_mpi_probe_dt = milliseconds;
1057 return ctrl->num_total_runs;
1068 return ctrl->task_results;
1081 return *ctrl->task_results[k];
1097 template<
typename Fn>
1101 ctrl_status_report->user_fn = fnstatus;
1121 ++ ctrl_status_report->event_counter;
1133 template<
typename IntType>
1138 ctrl_status_report->periodic_interval = milliseconds;
1153 ctrl->interrupt_requested = 1;
1160 template<
typename TaskType_,
typename TaskCData_,
1161 typename BaseLoggerType_,
typename TaskCountIntType_ =
int>
1163 mkTaskDispatcher(TaskCData_ * pcdata_, mpi::communicator & comm_, BaseLoggerType_ & baselogger_,
1164 TaskCountIntType_ num_total_runs_)
1167 pcdata_, comm_, baselogger_, num_total_runs_
Utilities for formatting strings.
void requestInterrupt()
Interrupt all tasks as soon as possible.
void requestPeriodicStatusReport(IntType milliseconds)
Request a status report periodically.
bool isMaster() const
Whether we are the master process.
Base namespace for the Tomographer project.
void setMinMPIProbeDt(int milliseconds)
Tune the frequency at which MPI messages are probed.
T duration_cast(T... args)
void setStatusReportHandler(Fn fnstatus)
assign a callable to be called whenever a status report is requested
TaskDispatcher(TaskCData *pcdata_, mpi::communicator &comm_, BaseLoggerType &logger_, int num_task_runs)
Construct the task dispatcher around the given MPI communicator.
#define TOMO_ORIGIN
Use this as argument for a Tomographer::Logger::LocalLogger constructor .
LocalLogger< LocalLogger< BaseLoggerType > > subLogger(std::string new_prefix)
Create a sub-logger.
Handles parallel execution of tasks using MPI.
TaskCountIntType numTaskRuns() const
The total number of task instances that were run.
#define TOMO_STATIC_ASSERT_EXPR(...)
Tool for static assertions without message.
const std::vector< TaskResultType * > & collectedTaskResults() const
Returns the results of all the tasks.
Basic multiprocessing templates for thread-based Tomographer::MultiProc implementations.
void requestStatusReport()
Request a status report.
Managing the need for specific overrides to operator new() for some types (especially Eigen types) ...
Some common definitions for multiprocessing interfaces.
const TaskResultType & collectedTaskResult(std::size_t k) const
Returns the result of the given task.
#define tomographer_assert(...)
Assertion test macro.
A complete status report, abstract version.