source: src/Fragmentation/Automation/SpecificFragmentController_ReceiveResultContainer_impl.hpp

Candidate_v1.6.1
Last change on this file was 456f19, checked in by Frederik Heber <frederik.heber@…>, 7 years ago

SpecificFragmentController checks whether there are still workers left alive.

  • this avoids all workers having crashed over a job but the server is still present and thus the action hangs indefinitely.
  • Property mode set to 100644
File size: 3.2 KB
Line 
1/*
2 * SpecificFragmentController_ReceiveResultContainer_impl.hpp
3 *
4 * Created on: Aug 27, 2012
5 * Author: heber
6 */
7
8#ifndef SPECIFICFRAGMENTCONTROLLER_RECEIVERESULTCONTAINER_IMPL_HPP_
9#define SPECIFICFRAGMENTCONTROLLER_RECEIVERESULTCONTAINER_IMPL_HPP_
10
11
12// include config.h
13#ifdef HAVE_CONFIG_H
14#include <config.h>
15#endif
16
17#include "SpecificFragmentController.hpp"
18
19#include "CodePatterns/Assert.hpp"
20#include "CodePatterns/toString.hpp"
21
22#include <numeric>
23
24template <typename T>
25size_t SpecificFragmentController::ReceiveResultContainer<T>::receiveResults(
26 SpecificFragmentController &callback)
27{
28 // receive (and remove the respective ids)
29 callback.receiveResults(callback.host, callback.port);
30 callback.RunService("Requesting results");
31 std::vector<FragmentResult::ptr> fragmentresults = callback.getReceivedResults();
32
33 // convert
34 std::vector<T> fragmentData;
35 SpecificFragmentController::ReceiveResultContainer<T>::ConvertFragmentResultTo(
36 fragmentresults,
37 fragmentData);
38
39 // insert into map
40 SpecificFragmentController::ReceiveResultContainer<T>::insertResults(
41 fragmentresults,
42 fragmentData);
43
44 return fragmentData.size();
45}
46
47template <typename T>
48void SpecificFragmentController::ReceiveResultContainer<T>::waitforResults(
49 const size_t NoExpectedResults,
50 boost::asio::io_service &io_service,
51 SpecificFragmentController &callback)
52{
53 // wait but receive all results that are already done
54 size_t NoReceivedResults = 0;
55 while ((NoReceivedResults != NoExpectedResults)
56 && (callback.getExitflag() == 0)) {
57 // wait a bit
58 boost::asio::deadline_timer timer(io_service);
59 timer.expires_from_now(boost::posix_time::milliseconds(500));
60 timer.wait();
61 // then request status
62 callback.checkResults(callback.host, callback.port);
63 callback.RunService("Checking on results");
64 if (callback.getExitflag() != 0)
65 break;
66
67 const std::pair<size_t, size_t> JobStatus = callback.getJobStatus();
68 const size_t NoCalculatedResults = JobStatus.second;
69 // if some are done, get them
70 if (NoCalculatedResults != 0) {
71 NoReceivedResults += receiveResults(callback);
72 callback.handler(NoReceivedResults, NoExpectedResults);
73 LOG(1, "INFO: #" << JobStatus.first << " are waiting in the queue and #" << NoReceivedResults << " of " << NoExpectedResults << " jobs are calculated so far.");
74 }
75
76 // if not all jobs are done yet, check number of present workers
77 if (NoReceivedResults != NoExpectedResults) {
78 callback.checkEnrolledWorkers(callback.host, callback.port);
79 callback.RunService("Checking on number of workers");
80 if (callback.getExitflag() != 0)
81 break;
82 const std::vector<size_t>& NumberWorkers = callback.getNumberOfWorkers();
83 const int TotalNumberWorkers = std::accumulate(
84 NumberWorkers.begin(), NumberWorkers.end(), 0);
85 if (TotalNumberWorkers == 0) {
86 ELOG(0, "Not all jobs are finished, but no workers are present working on it?!");
87 callback.setExitflag(ExitflagContainer::ErrorFlag);
88 break;
89 } else {
90 LOG(4, "There are " << NumberWorkers << " workers enrolled.");
91 }
92 }
93}
94}
95
96#endif /* SPECIFICFRAGMENTCONTROLLER_RECEIVERESULTCONTAINER_IMPL_HPP_ */
Note: See TracBrowser for help on using the repository browser.