38 #include <boost/serialization/serialization.hpp> 72 : fraction_done(0), msg(
"<unknown>")
74 TaskStatusReport(
double fraction_done_,
std::string msg_)
75 : fraction_done(fraction_done_), msg(
std::move(msg_))
82 friend class boost::serialization::access;
83 template<
typename Archive>
84 void serialize(Archive & a,
unsigned int )
96 template<
typename TaskStatusReportType,
typename TaskCountIntType>
152 double f = num_completed;
154 if (workers_running[k]) {
156 f += workers_reports[k].fraction_done;
160 return f / num_total_runs;
170 (std::chrono::milliseconds::rep)(elapsed*1000)
173 ss <<
"=========================== Intermediate Progress Report ============================\n" 175 << elapsed_s <<
"s elapsed" 177 << num_completed <<
"/" << num_total_runs <<
" runs completed" 182 if (workers_running.
size() == 0) {
184 }
else if (workers_running.
size() == 1) {
185 if (workers_running[0]) {
186 ss <<
"--> " << workers_reports[0].msg <<
"\n";
189 ss <<
"Current Run(s) information (workers working/spawned " 191 <<
"/" << workers_running.
size() <<
"):\n";
193 ss <<
"=== " <<
std::setw(2) << k <<
": ";
194 if (!workers_running[k]) {
197 ss << workers_reports[k].msg <<
"\n";
201 ss <<
"=====================================================================================\n";
207 friend class boost::serialization::access;
208 template<
typename Archive>
209 void serialize(Archive & a,
unsigned int )
227 const char * what()
const throw() {
return msg_.c_str(); }
232 namespace Sequential {
253 template<
typename TaskType_,
typename TaskCData_,
254 typename LoggerType_,
typename TaskCountIntType_ =
int>
258 typedef TaskType_ TaskType;
259 typedef typename TaskType::StatusReportType TaskStatusReportType;
260 typedef TaskCData_ TaskCData;
261 typedef LoggerType_ LoggerType;
262 typedef TaskCountIntType_ TaskCountIntType;
266 typedef typename TaskType::ResultType TaskResultType;
274 const TaskCData * pcdata;
278 TaskCountIntType num_total_runs;
283 TaskCountIntType task_k;
286 #if defined(__GNUC__) && __GNUC__ == 4 && __GNUC_MINOR__ <= 6 && !defined(__clang__) 287 std::chrono::monotonic_clock
294 struct TaskMgrIface {
296 : dispatcher(dispatcher_),
297 interrupt_requested(0),
298 status_report_requested(0),
299 status_report_user_fn(),
300 _tasks_start_time(StdClockType::now()),
301 _last_status_report(StdClockType::now()),
302 _status_report_periodic_interval(0)
311 FullStatusReportCallbackType status_report_user_fn;
313 const StdClockType::time_point _tasks_start_time;
314 StdClockType::time_point _last_status_report;
315 StdClockType::duration _status_report_periodic_interval;
319 inline void _request_status_report() {
320 status_report_requested = 1;
322 inline void _request_interrupt() {
323 interrupt_requested = 1;
325 template<
typename IntType>
326 inline void _request_periodic_status_report(IntType milliseconds) {
327 if ( milliseconds >= 0 ) {
332 _status_report_periodic_interval = StdClockType::duration(0);
337 inline bool statusReportRequested()
const 339 if (interrupt_requested) {
342 if (_status_report_periodic_interval.count() > 0
343 && (StdClockType::now() - (_last_status_report + _status_report_periodic_interval)).count() > 0) {
346 return (
bool) status_report_requested;
349 inline void submitStatusReport(
const TaskStatusReportType &statreport)
351 FullStatusReportType fullstatus;
368 StdClockType::now() - _tasks_start_time
371 status_report_user_fn(
std::move(fullstatus));
373 status_report_requested =
false;
374 _last_status_report = StdClockType::now();
379 TaskMgrIface mgriface;
382 TaskDispatcher(TaskCData * pcdata_, LoggerType & logger_, TaskCountIntType num_total_runs_)
383 : pcdata(pcdata_), results(), logger(logger_), num_total_runs(num_total_runs_),
388 for (
auto r : results ) {
403 logger.debug(
"MultiProc::Sequential::TaskDispatcher::run()",
"preparing for sequential runs");
405 for (task_k = 0; task_k < num_total_runs; ++task_k) {
407 logger.debug(
"Tomographer::MultiProc::Sequential::TaskDispatcher::run()",
408 [&](
std::ostream & stream) { stream <<
"Running task #" << task_k <<
" ..."; });
410 auto input = pcdata->getTaskInput(task_k);
413 TaskType t(input, pcdata, logger);
416 t.run(pcdata, logger, &mgriface);
419 logger.longdebug(
"MultiProc::Sequential::TaskDispatcher::run()",
"collecting result");
420 results[(
std::size_t)task_k] =
new TaskResultType(t.stealResult());
424 logger.debug(
"MultiProc::Sequential::TaskDispatcher::run()",
"all done");
430 inline TaskCountIntType
numTaskRuns()
const {
return num_total_runs; }
455 template<
typename Fn>
458 mgriface.status_report_user_fn = fnstatus;
474 mgriface._request_status_report();
484 template<
typename IntType>
487 mgriface._request_periodic_status_report(milliseconds);
498 mgriface._request_interrupt();
505 template<
typename TaskType_,
typename TaskCData_,
506 typename LoggerType_,
typename TaskCountIntType_ =
int>
508 mkTaskDispatcher(TaskCData_ * pcdata_, LoggerType_ & logger_, TaskCountIntType_ num_total_runs_)
511 pcdata_, logger_, num_total_runs_
Utilities for formatting strings.
const std::vector< TaskResultType * > & collectedTaskResults() const
Returns the results of all the tasks.
Base namespace for the Tomographer project.
std::vector< TaskStatusReportType, typename Tools::NeedOwnOperatorNew< TaskStatusReportType >::AllocatorType > workers_reports
List with the raw report submitted from each individual thread.
std::vector< bool > workers_running
List specifying for each worker (e.g. a spawned thread) whether it is active or not.
TaskCountIntType num_completed
Number of completed tasks.
std::string getHumanReport() const
Produce a text-based human-readable short representation of the status report.
T duration_cast(T... args)
void requestPeriodicStatusReport(IntType milliseconds)
Request a status report periodically.
double totalFractionDone() const
The total fraction of the job completed.
Basic status report class.
Managing the need for specific overrides to operator new() for some types (especially Eigen types) ...
TaskCountIntType num_total_runs
Total number of tasks to perform.
TaskCountIntType numTaskRuns() const
The total number of task instances that were run.
Executes multiple tasks sequentially.
T setprecision(T... args)
void requestStatusReport()
Request a status report.
double elapsed
Number of seconds elapsed since launching the tasks.
void requestInterrupt()
Interrupt all tasks as soon as possible.
void setStatusReportHandler(Fn fnstatus)
assign a callable to be called whenever a status report is requested
const TaskResultType & collectedTaskResult(std::size_t k) const
Returns the result of the given task.
A complete status report, abstract version.