69 : fraction_done(0), msg(
"<unknown>")
71 TaskStatusReport(
double fraction_done_,
std::string msg_)
72 : fraction_done(fraction_done_), msg(
std::move(msg_))
84 template<
typename TaskStatusReportType,
typename TaskCountIntType>
140 double f = num_completed;
142 if (workers_running[k]) {
144 f += workers_reports[k].fraction_done;
148 return f / num_total_runs;
158 (std::chrono::milliseconds::rep)(elapsed*1000)
161 ss <<
"=========================== Intermediate Progress Report ============================\n" 163 << elapsed_s <<
"s elapsed" 165 << num_completed <<
"/" << num_total_runs <<
" runs completed" 170 if (workers_running.
size() == 0) {
172 }
else if (workers_running.
size() == 1) {
173 if (workers_running[0]) {
174 ss <<
"--> " << workers_reports[0].msg <<
"\n";
177 ss <<
"Current Run(s) information (workers working/spawned " 179 <<
"/" << workers_running.
size() <<
"):\n";
181 ss <<
"=== " <<
std::setw(2) << k <<
": ";
182 if (!workers_running[k]) {
185 ss << workers_reports[k].msg <<
"\n";
189 ss <<
"=====================================================================================\n";
202 const char * what()
const throw() {
return msg_.c_str(); }
207 namespace Sequential {
228 template<
typename TaskType_,
typename TaskCData_,
229 typename LoggerType_,
typename TaskCountIntType_ =
int>
233 typedef TaskType_ TaskType;
234 typedef typename TaskType::StatusReportType TaskStatusReportType;
235 typedef TaskCData_ TaskCData;
236 typedef LoggerType_ LoggerType;
237 typedef TaskCountIntType_ TaskCountIntType;
241 typedef typename TaskType::ResultType TaskResultType;
249 const TaskCData * pcdata;
253 TaskCountIntType num_total_runs;
258 TaskCountIntType task_k;
261 #if defined(__GNUC__) && __GNUC__ == 4 && __GNUC_MINOR__ <= 6 && !defined(__clang__) 262 std::chrono::monotonic_clock
269 struct TaskMgrIface {
271 : dispatcher(dispatcher_),
272 interrupt_requested(0),
273 status_report_requested(0),
274 status_report_user_fn(),
275 _tasks_start_time(StdClockType::now()),
276 _last_status_report(StdClockType::now()),
277 _status_report_periodic_interval(0)
286 FullStatusReportCallbackType status_report_user_fn;
288 const StdClockType::time_point _tasks_start_time;
289 StdClockType::time_point _last_status_report;
290 StdClockType::duration _status_report_periodic_interval;
294 inline void _request_status_report() {
295 status_report_requested = 1;
297 inline void _request_interrupt() {
298 interrupt_requested = 1;
300 template<
typename IntType>
301 inline void _request_periodic_status_report(IntType milliseconds) {
302 if ( milliseconds >= 0 ) {
307 _status_report_periodic_interval = StdClockType::duration(0);
312 inline bool statusReportRequested()
const 314 if (interrupt_requested) {
317 if (_status_report_periodic_interval.count() > 0
318 && (StdClockType::now() - (_last_status_report + _status_report_periodic_interval)).count() > 0) {
321 return (
bool) status_report_requested;
324 inline void submitStatusReport(
const TaskStatusReportType &statreport)
326 FullStatusReportType fullstatus;
343 StdClockType::now() - _tasks_start_time
346 status_report_user_fn(
std::move(fullstatus));
348 status_report_requested =
false;
349 _last_status_report = StdClockType::now();
354 TaskMgrIface mgriface;
357 TaskDispatcher(TaskCData * pcdata_, LoggerType & logger_, TaskCountIntType num_total_runs_)
358 : pcdata(pcdata_), results(), logger(logger_), num_total_runs(num_total_runs_),
363 for (
auto r : results ) {
378 logger.debug(
"MultiProc::Sequential::TaskDispatcher::run()",
"preparing for sequential runs");
380 for (task_k = 0; task_k < num_total_runs; ++task_k) {
382 logger.debug(
"Tomographer::MultiProc::Sequential::TaskDispatcher::run()",
383 [&](
std::ostream & stream) { stream <<
"Running task #" << task_k <<
" ..."; });
385 auto input = pcdata->getTaskInput(task_k);
388 TaskType t(input, pcdata, logger);
391 t.run(pcdata, logger, &mgriface);
394 logger.longdebug(
"MultiProc::Sequential::TaskDispatcher::run()",
"collecting result");
395 results[(
std::size_t)task_k] =
new TaskResultType(t.stealResult());
399 logger.debug(
"MultiProc::Sequential::TaskDispatcher::run()",
"all done");
405 inline TaskCountIntType
numTaskRuns()
const {
return num_total_runs; }
430 template<
typename Fn>
433 mgriface.status_report_user_fn = fnstatus;
449 mgriface._request_status_report();
459 template<
typename IntType>
462 mgriface._request_periodic_status_report(milliseconds);
473 mgriface._request_interrupt();
480 template<
typename TaskType_,
typename TaskCData_,
481 typename LoggerType_,
typename TaskCountIntType_ =
int>
483 mkTaskDispatcher(TaskCData_ * pcdata_, LoggerType_ & logger_, TaskCountIntType_ num_total_runs_)
486 pcdata_, logger_, num_total_runs_
Utilities for formatting strings.
const std::vector< TaskResultType * > & collectedTaskResults() const
Returns the results of all the tasks.
Base namespace for the Tomographer project.
std::vector< TaskStatusReportType, typename Tools::NeedOwnOperatorNew< TaskStatusReportType >::AllocatorType > workers_reports
List with the raw report submitted from each individual thread.
std::vector< bool > workers_running
List specifying for each worker (e.g. a spawned thread) whether it is active or not.
TaskCountIntType num_completed
Number of completed tasks.
std::string getHumanReport() const
Produce a text-based human-readable short representation of the status report.
T duration_cast(T... args)
void requestPeriodicStatusReport(IntType milliseconds)
Request a status report periodically.
double totalFractionDone() const
The total fraction of the job completed.
Basic status report class.
Managing the need for specific overrides to operator new() for some types (especially Eigen types) ...
TaskCountIntType num_total_runs
Total number of tasks to perform.
TaskCountIntType numTaskRuns() const
The total number of task instances that were run.
Executes multiple tasks sequentially.
T setprecision(T... args)
void requestStatusReport()
Request a status report.
double elapsed
Number of seconds elapsed since launching the tasks.
void requestInterrupt()
Interrupt all tasks as soon as possible.
void setStatusReportHandler(Fn fnstatus)
assign a callable to be called whenever a status report is requested
const TaskResultType & collectedTaskResult(std::size_t k) const
Returns the result of the given task.
A complete status report, abstract version.