source: src/Fragmentation/Automation/Pool/WorkerPool.cpp@ 3eb035

Action_Thermostats Add_AtomRandomPerturbation Add_FitFragmentPartialChargesAction Add_RotateAroundBondAction Add_SelectAtomByNameAction Added_ParseSaveFragmentResults AddingActions_SaveParseParticleParameters Adding_Graph_to_ChangeBondActions Adding_MD_integration_tests Adding_ParticleName_to_Atom Adding_StructOpt_integration_tests AtomFragments Automaking_mpqc_open AutomationFragmentation_failures Candidate_v1.5.4 Candidate_v1.6.0 Candidate_v1.6.1 ChangeBugEmailaddress ChangingTestPorts ChemicalSpaceEvaluator CombiningParticlePotentialParsing Combining_Subpackages Debian_Package_split Debian_package_split_molecuildergui_only Disabling_MemDebug Docu_Python_wait EmpiricalPotential_contain_HomologyGraph EmpiricalPotential_contain_HomologyGraph_documentation Enable_parallel_make_install Enhance_userguide Enhanced_StructuralOptimization Enhanced_StructuralOptimization_continued Example_ManyWaysToTranslateAtom Exclude_Hydrogens_annealWithBondGraph FitPartialCharges_GlobalError Fix_BoundInBox_CenterInBox_MoleculeActions Fix_ChargeSampling_PBC Fix_ChronosMutex Fix_FitPartialCharges Fix_FitPotential_needs_atomicnumbers Fix_ForceAnnealing Fix_IndependentFragmentGrids Fix_ParseParticles Fix_ParseParticles_split_forward_backward_Actions Fix_PopActions Fix_QtFragmentList_sorted_selection Fix_Restrictedkeyset_FragmentMolecule Fix_StatusMsg Fix_StepWorldTime_single_argument Fix_Verbose_Codepatterns Fix_fitting_potentials Fixes ForceAnnealing_goodresults ForceAnnealing_oldresults ForceAnnealing_tocheck ForceAnnealing_with_BondGraph ForceAnnealing_with_BondGraph_continued ForceAnnealing_with_BondGraph_continued_betteresults ForceAnnealing_with_BondGraph_contraction-expansion FragmentAction_writes_AtomFragments FragmentMolecule_checks_bonddegrees GeometryObjects Gui_Fixes Gui_displays_atomic_force_velocity ImplicitCharges IndependentFragmentGrids IndependentFragmentGrids_IndividualZeroInstances IndependentFragmentGrids_IntegrationTest IndependentFragmentGrids_Sole_NN_Calculation JobMarket_RobustOnKillsSegFaults JobMarket_StableWorkerPool JobMarket_unresolvable_hostname_fix MoreRobust_FragmentAutomation ODR_violation_mpqc_open PartialCharges_OrthogonalSummation PdbParser_setsAtomName PythonUI_with_named_parameters QtGui_reactivate_TimeChanged_changes Recreated_GuiChecks Rewrite_FitPartialCharges RotateToPrincipalAxisSystem_UndoRedo SaturateAtoms_findBestMatching SaturateAtoms_singleDegree StoppableMakroAction Subpackage_CodePatterns Subpackage_JobMarket Subpackage_LinearAlgebra Subpackage_levmar Subpackage_mpqc_open Subpackage_vmg Switchable_LogView ThirdParty_MPQC_rebuilt_buildsystem TrajectoryDependenant_MaxOrder TremoloParser_IncreasedPrecision TremoloParser_MultipleTimesteps TremoloParser_setsAtomName Ubuntu_1604_changes stable
Last change on this file since 3eb035 was 3eb035, checked in by Frederik Heber <heber@…>, 13 years ago

FIX: Removed channel WorkerRemoved in WorkerPool as it may cause cyclic updates in FragmentScheduler.

  • If jobs are already present in the queue and an idle worker arrices, then marking a worker busy will cause an update inside the callback that is used to send the job to the new idle worker. This will trigger the same notification that is not yet removed to be called in notifyAll(). This causes a cycle in the updates.
  • Property mode set to 100644
File size: 6.3 KB
Line 
1/*
2 * Project: MoleCuilder
3 * Description: creates and alters molecular systems
4 * Copyright (C) 2011 University of Bonn. All rights reserved.
5 * Please see the LICENSE file or "Copyright notice" in builder.cpp for details.
6 */
7
8/*
9 * WorkerPool.cpp
10 *
11 * Created on: 22.02.2012
12 * Author: heber
13 */
14
15// include config.h
16#ifdef HAVE_CONFIG_H
17#include <config.h>
18#endif
19
20// boost asio needs specific operator new
21#include <boost/asio.hpp>
22
23#include "CodePatterns/MemDebug.hpp"
24
25#include "WorkerPool.hpp"
26
27#include "CodePatterns/Assert.hpp"
28#include "CodePatterns/Info.hpp"
29#include "CodePatterns/Log.hpp"
30#include "CodePatterns/Observer/Channels.hpp"
31#include "Connection.hpp"
32
33WorkerPool::priority_t WorkerPool::default_priority = 0;
34WorkerAddress WorkerPool::emptyAddress("empty", "empty");
35
36/** Constructor for class WorkerPool.
37 *
38 */
39WorkerPool::WorkerPool() :
40 Observable("WorkerPool")
41{
42 // observable stuff
43 Channels *OurChannel = new Channels;
44 NotificationChannels.insert( std::make_pair(this, OurChannel) );
45 // add instance for each notification type
46 for (size_t type = 0; type < NotificationType_MAX; ++type)
47 OurChannel->addChannel(type);
48}
49
50/** Destructor for class WorkerPool.
51 *
52 */
53WorkerPool::~WorkerPool()
54{}
55
56/** Helper function to check whether an address is already in the pool.
57 *
58 * @param address worker address to check
59 * @return true - address is present, false - else
60 */
61bool WorkerPool::presentInPool(const WorkerAddress &address) const
62{
63 return pool.find(address) != pool.end();
64}
65
66/** Get address of next idle worker.
67 *
68 * Note that worker is automatically marked as busy, \sa WorkerPool::markWorkerBusy()
69 *
70 * @return address of idle worker
71 */
72WorkerAddress WorkerPool::getNextIdleWorker()
73{
74 // get first idle worker
75 ASSERT( presentIdleWorkers(),
76 "WorkerPool::getNextIdleWorker() - there is no idle worker.");
77 if (!presentIdleWorkers())
78 return emptyAddress;
79 Idle_Queue_t::iterator iter = idle_queue.begin();
80 const WorkerAddress returnaddress = iter->second;
81
82 // enter in busy queue
83 markWorkerBusy( iter );
84
85 // return address
86 return returnaddress;
87}
88
89WorkerPool::Idle_Queue_t::iterator WorkerPool::getIdleWorker(const WorkerAddress &address)
90{
91 Idle_Queue_t::iterator idleiter = idle_queue.begin();
92 while (idleiter != idle_queue.end()) {
93 if (idleiter->second == address) {
94 break;
95 }
96 ++idleiter;
97 }
98 return idleiter;
99}
100
101/** Checks whether a worker is busy or not.
102 *
103 * @param address address of worker to check
104 */
105bool WorkerPool::isWorkerBusy(const WorkerAddress &address) const
106{
107 Busy_Queue_t::const_iterator iter = busy_queue.find(address);
108 if (iter != busy_queue.end())
109 return true;
110#ifndef NDEBUG
111 else {
112 Idle_Queue_t::const_iterator iter = idle_queue.begin();
113 for(;iter != idle_queue.end(); ++iter)
114 if (iter->second == address)
115 break;
116 ASSERT( iter != idle_queue.end(),
117 "WorkerPool::isWorkerBusy() - worker "+toString(address)
118 +" is neither busy nor idle.");
119
120 }
121#endif
122 return false;
123}
124
125/** Adds another worker to the pool by noting down its address.
126 *
127 * @param address host and service address of the listening worker
128 * @return true - added successfully, false - not added
129 */
130bool WorkerPool::addWorker(const WorkerAddress& address)
131{
132 OBSERVE;
133 NOTIFY(WorkerAdded);
134 std::pair<Pool_t::iterator, bool> inserter =
135 pool.insert( address );
136 if (inserter.second) { // if new also add to queue
137 LOG(1, "INFO: Successfully added "+toString(address)+" to pool.");
138 idle_queue.insert( make_pair( default_priority, address ) );
139 NOTIFY(WorkerIdle);
140 return true;
141 } else {
142 LOG(1, "INFO: "+toString(address)+" is already present pool.");
143 return false;
144 }
145}
146
147/** Removes a worker from the pool.
148 *
149 * @param address host and service address of the listening worker
150 * @return true - removed successfully, false - not removed
151 */
152bool WorkerPool::removeWorker(const WorkerAddress& address)
153{
154 Pool_t::iterator iter = pool.find( address );
155 if (iter != pool.end()) {
156 Idle_Queue_t::iterator idleiter = getIdleWorker(address);
157 if (idleiter != idle_queue.end())
158 idle_queue.erase(idleiter);
159 Busy_Queue_t::iterator busyiter = busy_queue.find(address);
160 if (busyiter != busy_queue.end())
161 busy_queue.erase(busyiter);
162 ASSERT( idleiter != idle_queue.end() || busyiter != busy_queue.end(),
163 "WorkerPool::removeWorker() - Worker "+toString(address)
164 +" is in pool but neither idle nor busy!");
165 ASSERT( !(idleiter != idle_queue.end() && busyiter != busy_queue.end()),
166 "WorkerPool::removeWorker() - Worker "+toString(address)
167 +" is in pool and both idle and busy!");
168 pool.erase(iter);
169 LOG(1, "INFO: Removed worker " << address << " from pool.");
170 return true;
171 } else {
172 ELOG(1, "Worker "+toString(address)+" is not present pool.");
173 return false;
174 }
175}
176
177/** Sends shutdown to all current workers in the pool.
178 *
179 */
180void WorkerPool::removeAllWorkers()
181{
182 // empty pool and queue
183 idle_queue.clear();
184 busy_queue.clear();
185 pool.clear();
186}
187
188/** Helper function to mark a worker as busy.
189 *
190 * Removes from idle_queue and places into busy_queue.
191 * Sets \a iter to Idle_Queue_t::end().
192 *
193 * @param iter iterator on idle worker
194 */
195void WorkerPool::markWorkerBusy(Idle_Queue_t::iterator &iter)
196{
197 const WorkerAddress returnaddress = iter->second;
198 if (isWorkerBusy(returnaddress))
199 return;
200 const priority_t priority = iter->first;
201
202 // remove from idle queue
203 idle_queue.erase(iter);
204
205 // insert into busy queue
206#ifndef NDEBUG
207 std::pair< Busy_Queue_t::iterator, bool > inserter =
208#endif
209 busy_queue.insert( make_pair(returnaddress, priority) );
210 ASSERT( inserter.second,
211 "WorkerPool::sendJobToWorker() - Worker "+toString(inserter.first->first)+" is already busy.");
212
213 LOG(1, "INFO: Worker " << returnaddress << " is now marked busy.");
214}
215
216/** Helper function to unmark a worker as busy.
217 *
218 * Removes worker from busy_queue and returns it to idle_queue.
219 *
220 * @param address address of worker
221 */
222void WorkerPool::unmarkWorkerBusy(const WorkerAddress &address)
223{
224 if (isWorkerBusy(address)) {
225 OBSERVE;
226 NOTIFY(WorkerIdle);
227 Busy_Queue_t::const_iterator iter = busy_queue.find(address);
228 const priority_t priority = iter->second;
229 busy_queue.erase(address);
230 idle_queue.insert( make_pair( priority, address) );
231
232 LOG(1, "INFO: Worker " << address << " is now marked idle.");
233 }
234}
Note: See TracBrowser for help on using the repository browser.