source: src/Fragmentation/Automation/SpecificFragmentController_ReceiveResultContainer_impl.hpp@ fba720

Candidate_v1.6.1 ChemicalSpaceEvaluator TremoloParser_IncreasedPrecision
Last change on this file since fba720 was 456f19, checked in by Frederik Heber <frederik.heber@…>, 7 years ago

SpecificFragmentController checks whether there are still workers left alive.

  • this avoids all workers having crashed over a job but the server is still present and thus the action hangs indefinitely.
  • Property mode set to 100644
File size: 3.2 KB
RevLine 
[732507]1/*
[d4eaf1]2 * SpecificFragmentController_ReceiveResultContainer_impl.hpp
[732507]3 *
4 * Created on: Aug 27, 2012
5 * Author: heber
6 */
7
[d4eaf1]8#ifndef SPECIFICFRAGMENTCONTROLLER_RECEIVERESULTCONTAINER_IMPL_HPP_
9#define SPECIFICFRAGMENTCONTROLLER_RECEIVERESULTCONTAINER_IMPL_HPP_
[732507]10
11
12// include config.h
13#ifdef HAVE_CONFIG_H
14#include <config.h>
15#endif
16
17#include "SpecificFragmentController.hpp"
18
19#include "CodePatterns/Assert.hpp"
20#include "CodePatterns/toString.hpp"
21
[456f19]22#include <numeric>
[732507]23
24template <typename T>
[d4eaf1]25size_t SpecificFragmentController::ReceiveResultContainer<T>::receiveResults(
[732507]26 SpecificFragmentController &callback)
27{
28 // receive (and remove the respective ids)
29 callback.receiveResults(callback.host, callback.port);
30 callback.RunService("Requesting results");
31 std::vector<FragmentResult::ptr> fragmentresults = callback.getReceivedResults();
32
33 // convert
34 std::vector<T> fragmentData;
[fe77df]35 SpecificFragmentController::ReceiveResultContainer<T>::ConvertFragmentResultTo(
36 fragmentresults,
37 fragmentData);
[732507]38
39 // insert into map
[fe77df]40 SpecificFragmentController::ReceiveResultContainer<T>::insertResults(
41 fragmentresults,
42 fragmentData);
[d4eaf1]43
[732507]44 return fragmentData.size();
45}
46
47template <typename T>
[d4eaf1]48void SpecificFragmentController::ReceiveResultContainer<T>::waitforResults(
[732507]49 const size_t NoExpectedResults,
50 boost::asio::io_service &io_service,
51 SpecificFragmentController &callback)
52{
53 // wait but receive all results that are already done
54 size_t NoReceivedResults = 0;
[ca09be]55 while ((NoReceivedResults != NoExpectedResults)
56 && (callback.getExitflag() == 0)) {
[732507]57 // wait a bit
58 boost::asio::deadline_timer timer(io_service);
59 timer.expires_from_now(boost::posix_time::milliseconds(500));
60 timer.wait();
61 // then request status
62 callback.checkResults(callback.host, callback.port);
63 callback.RunService("Checking on results");
[ca09be]64 if (callback.getExitflag() != 0)
65 break;
[732507]66
67 const std::pair<size_t, size_t> JobStatus = callback.getJobStatus();
68 const size_t NoCalculatedResults = JobStatus.second;
69 // if some are done, get them
70 if (NoCalculatedResults != 0) {
71 NoReceivedResults += receiveResults(callback);
[d9f2b3]72 callback.handler(NoReceivedResults, NoExpectedResults);
73 LOG(1, "INFO: #" << JobStatus.first << " are waiting in the queue and #" << NoReceivedResults << " of " << NoExpectedResults << " jobs are calculated so far.");
[732507]74 }
[456f19]75
76 // if not all jobs are done yet, check number of present workers
77 if (NoReceivedResults != NoExpectedResults) {
78 callback.checkEnrolledWorkers(callback.host, callback.port);
79 callback.RunService("Checking on number of workers");
80 if (callback.getExitflag() != 0)
81 break;
82 const std::vector<size_t>& NumberWorkers = callback.getNumberOfWorkers();
83 const int TotalNumberWorkers = std::accumulate(
84 NumberWorkers.begin(), NumberWorkers.end(), 0);
85 if (TotalNumberWorkers == 0) {
86 ELOG(0, "Not all jobs are finished, but no workers are present working on it?!");
87 callback.setExitflag(ExitflagContainer::ErrorFlag);
88 break;
89 } else {
90 LOG(4, "There are " << NumberWorkers << " workers enrolled.");
91 }
92 }
93}
[732507]94}
95
[d4eaf1]96#endif /* SPECIFICFRAGMENTCONTROLLER_RECEIVERESULTCONTAINER_IMPL_HPP_ */
Note: See TracBrowser for help on using the repository browser.