source: src/Fragmentation/Automation/FragmentQueue.cpp@ fe95b7

Action_Thermostats Add_AtomRandomPerturbation Add_FitFragmentPartialChargesAction Add_RotateAroundBondAction Add_SelectAtomByNameAction Added_ParseSaveFragmentResults AddingActions_SaveParseParticleParameters Adding_Graph_to_ChangeBondActions Adding_MD_integration_tests Adding_ParticleName_to_Atom Adding_StructOpt_integration_tests AtomFragments Automaking_mpqc_open AutomationFragmentation_failures Candidate_v1.5.4 Candidate_v1.6.0 Candidate_v1.6.1 ChangeBugEmailaddress ChangingTestPorts ChemicalSpaceEvaluator CombiningParticlePotentialParsing Combining_Subpackages Debian_Package_split Debian_package_split_molecuildergui_only Disabling_MemDebug Docu_Python_wait EmpiricalPotential_contain_HomologyGraph EmpiricalPotential_contain_HomologyGraph_documentation Enable_parallel_make_install Enhance_userguide Enhanced_StructuralOptimization Enhanced_StructuralOptimization_continued Example_ManyWaysToTranslateAtom Exclude_Hydrogens_annealWithBondGraph FitPartialCharges_GlobalError Fix_BoundInBox_CenterInBox_MoleculeActions Fix_ChargeSampling_PBC Fix_ChronosMutex Fix_FitPartialCharges Fix_FitPotential_needs_atomicnumbers Fix_ForceAnnealing Fix_IndependentFragmentGrids Fix_ParseParticles Fix_ParseParticles_split_forward_backward_Actions Fix_PopActions Fix_QtFragmentList_sorted_selection Fix_Restrictedkeyset_FragmentMolecule Fix_StatusMsg Fix_StepWorldTime_single_argument Fix_Verbose_Codepatterns Fix_fitting_potentials Fixes ForceAnnealing_goodresults ForceAnnealing_oldresults ForceAnnealing_tocheck ForceAnnealing_with_BondGraph ForceAnnealing_with_BondGraph_continued ForceAnnealing_with_BondGraph_continued_betteresults ForceAnnealing_with_BondGraph_contraction-expansion FragmentAction_writes_AtomFragments FragmentMolecule_checks_bonddegrees GeometryObjects Gui_Fixes Gui_displays_atomic_force_velocity ImplicitCharges IndependentFragmentGrids IndependentFragmentGrids_IndividualZeroInstances IndependentFragmentGrids_IntegrationTest IndependentFragmentGrids_Sole_NN_Calculation JobMarket_RobustOnKillsSegFaults JobMarket_StableWorkerPool JobMarket_unresolvable_hostname_fix MoreRobust_FragmentAutomation ODR_violation_mpqc_open PartialCharges_OrthogonalSummation PdbParser_setsAtomName PythonUI_with_named_parameters QtGui_reactivate_TimeChanged_changes Recreated_GuiChecks Rewrite_FitPartialCharges RotateToPrincipalAxisSystem_UndoRedo SaturateAtoms_findBestMatching SaturateAtoms_singleDegree StoppableMakroAction Subpackage_CodePatterns Subpackage_JobMarket Subpackage_LinearAlgebra Subpackage_levmar Subpackage_mpqc_open Subpackage_vmg Switchable_LogView ThirdParty_MPQC_rebuilt_buildsystem TrajectoryDependenant_MaxOrder TremoloParser_IncreasedPrecision TremoloParser_MultipleTimesteps TremoloParser_setsAtomName Ubuntu_1604_changes stable
Last change on this file since fe95b7 was fe95b7, checked in by Frederik Heber <heber@…>, 13 years ago

FragmentQueue can now resubmit jobs.

  • we keep an internal list of jobs currently being worked on. If the worker returns with a failure, they can be resubmit to the queue.
  • there are Max_Attempts (currently set to 1) tries to resubmit.
  • added unit test function on this.
  • added regression test Fragmentation/Automation resubmitjobs.
  • Property mode set to 100644
File size: 8.2 KB
Line 
1/*
2 * Project: MoleCuilder
3 * Description: creates and alters molecular systems
4 * Copyright (C) 2011 University of Bonn. All rights reserved.
5 * Please see the LICENSE file or "Copyright notice" in builder.cpp for details.
6 */
7
8/*
9 * FragmentQueue.cpp
10 *
11 * Created on: Oct 19, 2011
12 * Author: heber
13 */
14
15// include config.h
16#ifdef HAVE_CONFIG_H
17#include <config.h>
18#endif
19
20#include "CodePatterns/MemDebug.hpp"
21
22#include "FragmentQueue.hpp"
23
24#include "CodePatterns/Assert.hpp"
25#include "CodePatterns/Log.hpp"
26
27FragmentResult::ptr FragmentQueue::NoResult( new FragmentResult(-1) );
28FragmentResult::ptr FragmentQueue::NoResultQueued( new FragmentResult(-2) );
29FragmentResult::ptr FragmentQueue::ResultDelivered( new FragmentResult(-3) );
30size_t FragmentQueue::Max_Attempts = (size_t)1;
31
32/** Constructor for class FragmentQueue.
33 *
34 */
35FragmentQueue::FragmentQueue()
36{}
37
38/** Destructor for class FragmentQueue.
39 *
40 */
41FragmentQueue::~FragmentQueue()
42{
43 jobs.clear();
44 results.clear();
45}
46
47/** Checks whether there are jobs in the queue at all.
48 * \return true - jobs present, false - queue is empty
49 */
50bool FragmentQueue::isJobPresent() const
51{
52 return !jobs.empty();
53}
54
55/** Pushes a FragmentJob into the internal queue for delivery to server.
56 *
57 * \note we throw assertion when jobid has already been used.
58 *
59 * \param job job to enter into queue
60 */
61void FragmentQueue::pushJob(FragmentJob::ptr job)
62{
63 ASSERT(job->getId() != JobId::IllegalJob,
64 "FragmentQueue::pushJob() - job to push has IllegalJob id.");
65 ASSERT(!results.count(job->getId()),
66 "FragmentQueue::pushJob() - job id "+toString(job->getId())+" has already been used.");
67 results.insert( std::make_pair(job->getId(), NoResult));
68 jobs.push_back(job);
69}
70
71/** Pushes a bunch of FragmentJob's into the internal queue for delivery to server.
72 *
73 * \note we throw assertion when jobid has already been used.
74 *
75 * \sa pushJob()
76 *
77 * \param _jobs jobs to enter into queue
78 */
79void FragmentQueue::pushJobs(std::vector<FragmentJob::ptr> &_jobs)
80{
81 for (std::vector<FragmentJob::ptr>::iterator iter = _jobs.begin();
82 iter != _jobs.end(); ++iter)
83 pushJob(*iter);
84}
85
86/** Pops top-most FragmentJob from internal queue.
87 *
88 * From here on, we expect a result in FragmentQueue::results.
89 *
90 * \return job topmost job from queue
91 */
92FragmentJob::ptr FragmentQueue::popJob()
93{
94 ASSERT(jobs.size(),
95 "FragmentQueue::popJob() - there are no jobs on the queue.");
96 FragmentJob::ptr job = jobs.front();
97#ifndef NDEBUG
98 std::pair< BackupMap::iterator, bool> inserter =
99#endif
100 backup.insert( std::make_pair( job->getId(), job ));
101 ASSERT (inserter.second,
102 "FragmentQueue::popJob() - job "+toString(job->getId())+
103 " is already in the backup.");
104 ResultMap::iterator iter = results.find(job->getId());
105 ASSERT(iter != results.end(),
106 "FragmentQueue::popJob() - for job "+toString(job->getId())+" no result place has been stored.");
107 iter->second = NoResultQueued;
108 jobs.pop_front();
109 return job;
110}
111
112/** Internal function to check whether result is not one of static entities.
113 *
114 * @param result result to check against
115 * @return true - result is a present, valid result, false - result is one of the statics
116 */
117bool FragmentQueue::isPresentResult(const FragmentResult::ptr result) const
118{
119 return (*result != *NoResult)
120 && (*result != *NoResultQueued)
121 && (*result != *ResultDelivered);
122}
123
124/** Queries whether a job has already been finished and the result is present.
125 *
126 * \param jobid id of job to query
127 * \return true - result is present, false - result is not present
128 */
129bool FragmentQueue::isResultPresent(JobId_t jobid) const
130{
131 ResultMap::const_iterator iter = results.find(jobid);
132 return ((iter != results.end())
133 && isPresentResult(iter->second));
134}
135
136/** Counts the number of jobs for which we have a calculated result present.
137 *
138 * \return number of calculated results
139 */
140size_t FragmentQueue::getDoneJobs() const
141{
142 size_t doneJobs = 0;
143 for (ResultMap::const_iterator iter = results.begin();
144 iter != results.end(); ++iter)
145 if (isPresentResult(iter->second))
146 ++doneJobs;
147 return doneJobs;
148}
149
150/** Counts the number of jobs for which still have to be calculated.
151 *
152 * \return number of jobs to be calculated
153 */
154size_t FragmentQueue::getPresentJobs() const
155{
156 const size_t presentJobs = jobs.size();
157 return presentJobs;
158}
159
160/** Delivers result for a finished job.
161 *
162 * \note we throw assertion if not present
163 *
164 * \param jobid id of job
165 * \return result for job of given \a jobid
166 */
167FragmentResult::ptr FragmentQueue::getResult(JobId_t jobid)
168{
169 ResultMap::iterator iter = results.find(jobid);
170 ASSERT(iter != results.end(),
171 "FragmentQueue::pushResult() - job "+toString(jobid)+" is not known to us.");
172 ASSERT(*iter->second != *NoResult,
173 "FragmentQueue::pushResult() - job "+toString(jobid)+" has not been request for calculation yet.");
174 ASSERT(*iter->second != *NoResultQueued,
175 "FragmentQueue::pushResult() - job "+toString(jobid)+"'s calculation is underway but not result has arrived yet.");
176 ASSERT(*iter->second != *ResultDelivered,
177 "FragmentQueue::pushResult() - job "+toString(jobid)+"'s result has already been delivered.");
178 /// store result
179 FragmentResult::ptr _result = iter->second;
180 /// mark as delivered in map
181 iter->second = ResultDelivered;
182 /// and return result
183 return _result;
184}
185
186std::vector<FragmentResult::ptr> FragmentQueue::getAllResults()
187{
188 std::vector<FragmentResult::ptr> returnresults;
189 for (ResultMap::iterator iter = results.begin();
190 iter != results.end(); ++iter) {
191 if (isPresentResult(iter->second)) {
192 returnresults.push_back(getResult(iter->first));
193 iter = results.begin();
194 }
195 }
196
197 return returnresults;
198}
199
200/** Pushes a result for a finished job.
201 *
202 * \note we throw assertion if job already has result or is not known.
203 *
204 * \param result result of job to store
205 */
206void FragmentQueue::pushResult(FragmentResult::ptr &_result)
207{
208 const JobId_t id = _result->getId();
209 /// check for presence
210 ResultMap::iterator iter = results.find(id);
211 ASSERT(iter != results.end(),
212 "FragmentQueue::pushResult() - job "+toString(id)+" is not known to us.");
213 ASSERT(*iter->second == *NoResultQueued,
214 "FragmentQueue::pushResult() - is not waiting for the result of job "+toString(id)+".");
215 // check whether this is a resubmitted job
216 AttemptsMap::iterator attemptiter = attempts.find(id);
217 // check whether succeeded or (finally) failed
218 if ((_result->exitflag == 0) || ((attemptiter != attempts.end()) && (attemptiter->second >= Max_Attempts))) {
219 // give notice if it is resubmitted job
220 if (attemptiter != attempts.end()) {
221 if (attemptiter->second >= Max_Attempts)
222 ELOG(1, "Job #" << id << " failed on " << Max_Attempts << "th attempt for the last time.");
223 else
224 LOG(1, "INFO: Job #" << id << " succeeded on " << attemptiter->second << "th attempt.");
225 }
226 // remove in attempts
227 if (attemptiter != attempts.end())
228 attempts.erase(attemptiter);
229 // remove in backup map
230 BackupMap::iterator backupiter = backup.find(id);
231 ASSERT( backupiter != backup.end(),
232 "FragmentQueue::pushResult() - cannot find job "+toString(id)
233 +" in backup.");
234 backup.erase(backupiter);
235 /// and overwrite NoResult in found entry
236 iter->second = _result;
237 } else {
238 LOG(1, "Job " << id << " failed, resubmitting.");
239 // increase attempts
240 if (attemptiter != attempts.end())
241 ++(attemptiter->second);
242 else
243 attempts.insert( std::make_pair(id, (size_t)1) );
244 // resubmit job
245 resubmitJob(id);
246 }
247}
248
249/** Resubmit a job which a worker failed to calculate.
250 *
251 * @param jobid id of the failed job
252 */
253void FragmentQueue::resubmitJob(const JobId_t jobid)
254{
255 BackupMap::iterator iter = backup.find(jobid);
256 ASSERT( iter != backup.end(),
257 "FragmentQueue::resubmitJob() - job id "+toString(jobid)
258 +" not stored in backup.");
259 if (iter != backup.end()) {
260 // remove result
261 ResultMap::iterator resiter = results.find(jobid);
262 ASSERT( resiter != results.end(),
263 "FragmentQueue::resubmitJob() - resubmitting job "+toString(jobid)
264 +" for which no result is present.");
265 results.erase(resiter);
266 pushJob(iter->second);
267 backup.erase(iter);
268 }
269}
270
Note: See TracBrowser for help on using the repository browser.